mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 19:51:05 +00:00
Introduce CollectCollationInfo runtime api (#443)
* Introduce `CollectCollationInfo` runtime api Instead of using well known keys to communicate information about a collation between the runtime and the collator, we now use a runtime api for this. * Fixes bug * Apply suggestions from code review Co-authored-by: Sergei Shulepov <sergei@parity.io> * Doc update Co-authored-by: Sergei Shulepov <sergei@parity.io>
This commit is contained in:
@@ -10,7 +10,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
# Polkadot dependencies
|
||||
|
||||
+44
-113
@@ -17,18 +17,16 @@
|
||||
//! Cumulus Collator implementation for Substrate.
|
||||
|
||||
use cumulus_client_network::WaitToAnnounce;
|
||||
use cumulus_primitives_core::{
|
||||
well_known_keys, OutboundHrmpMessage, ParachainBlockData, PersistedValidationData,
|
||||
};
|
||||
use cumulus_primitives_core::{CollectCollationInfo, ParachainBlockData, PersistedValidationData};
|
||||
|
||||
use sc_client_api::BlockBackend;
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_consensus::BlockStatus;
|
||||
use sp_core::traits::SpawnNamed;
|
||||
use sp_runtime::{
|
||||
generic::BlockId,
|
||||
traits::{Block as BlockT, Header as HeaderT, Zero},
|
||||
};
|
||||
use sp_state_machine::InspectState;
|
||||
|
||||
use cumulus_client_consensus_common::ParachainConsensus;
|
||||
use polkadot_node_primitives::{
|
||||
@@ -36,9 +34,7 @@ use polkadot_node_primitives::{
|
||||
};
|
||||
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
|
||||
use polkadot_overseer::OverseerHandler;
|
||||
use polkadot_primitives::v1::{
|
||||
BlockNumber as PBlockNumber, CollatorPair, Hash as PHash, HeadData, Id as ParaId, UpwardMessage,
|
||||
};
|
||||
use polkadot_primitives::v1::{CollatorPair, Hash as PHash, HeadData, Id as ParaId};
|
||||
|
||||
use codec::{Decode, Encode};
|
||||
use futures::{channel::oneshot, FutureExt};
|
||||
@@ -50,36 +46,37 @@ use tracing::Instrument;
|
||||
const LOG_TARGET: &str = "cumulus-collator";
|
||||
|
||||
/// The implementation of the Cumulus `Collator`.
|
||||
pub struct Collator<Block: BlockT, BS, Backend> {
|
||||
pub struct Collator<Block: BlockT, BS, RA> {
|
||||
block_status: Arc<BS>,
|
||||
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
|
||||
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
|
||||
backend: Arc<Backend>,
|
||||
runtime_api: Arc<RA>,
|
||||
}
|
||||
|
||||
impl<Block: BlockT, BS, Backend> Clone for Collator<Block, BS, Backend> {
|
||||
impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
block_status: self.block_status.clone(),
|
||||
wait_to_announce: self.wait_to_announce.clone(),
|
||||
backend: self.backend.clone(),
|
||||
parachain_consensus: self.parachain_consensus.clone(),
|
||||
runtime_api: self.runtime_api.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block, BS, Backend> Collator<Block, BS, Backend>
|
||||
impl<Block, BS, RA> Collator<Block, BS, RA>
|
||||
where
|
||||
Block: BlockT,
|
||||
BS: BlockBackend<Block>,
|
||||
Backend: sc_client_api::Backend<Block> + 'static,
|
||||
RA: ProvideRuntimeApi<Block>,
|
||||
RA::Api: CollectCollationInfo<Block>,
|
||||
{
|
||||
/// Create a new instance.
|
||||
fn new(
|
||||
block_status: Arc<BS>,
|
||||
spawner: Arc<dyn SpawnNamed + Send + Sync>,
|
||||
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
|
||||
backend: Arc<Backend>,
|
||||
runtime_api: Arc<RA>,
|
||||
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
|
||||
) -> Self {
|
||||
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block)));
|
||||
@@ -87,7 +84,7 @@ where
|
||||
Self {
|
||||
block_status,
|
||||
wait_to_announce,
|
||||
backend,
|
||||
runtime_api,
|
||||
parachain_consensus,
|
||||
}
|
||||
}
|
||||
@@ -154,103 +151,35 @@ where
|
||||
&mut self,
|
||||
block: ParachainBlockData<Block>,
|
||||
block_hash: Block::Hash,
|
||||
relay_block_number: PBlockNumber,
|
||||
) -> Option<Collation> {
|
||||
let block_data = BlockData(block.encode());
|
||||
let header = block.into_header();
|
||||
let head_data = HeadData(header.encode());
|
||||
|
||||
let state = match self.backend.state_at(BlockId::Hash(block_hash)) {
|
||||
Ok(state) => state,
|
||||
let collation_info = match self
|
||||
.runtime_api
|
||||
.runtime_api()
|
||||
.collect_collation_info(&BlockId::Hash(block_hash))
|
||||
{
|
||||
Ok(ci) => ci,
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Failed to get state of the freshly built block.",
|
||||
"Failed to collect collation info.",
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
state.inspect_state(|| {
|
||||
let upward_messages = sp_io::storage::get(well_known_keys::UPWARD_MESSAGES);
|
||||
let upward_messages =
|
||||
match upward_messages.map(|v| Vec::<UpwardMessage>::decode(&mut &v[..])) {
|
||||
Some(Ok(msgs)) => msgs,
|
||||
Some(Err(e)) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Failed to decode upward messages from the build block.",
|
||||
);
|
||||
return None;
|
||||
}
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
let new_validation_code = sp_io::storage::get(well_known_keys::NEW_VALIDATION_CODE);
|
||||
|
||||
let processed_downward_messages =
|
||||
sp_io::storage::get(well_known_keys::PROCESSED_DOWNWARD_MESSAGES);
|
||||
let processed_downward_messages =
|
||||
match processed_downward_messages.map(|v| u32::decode(&mut &v[..])) {
|
||||
Some(Ok(processed_cnt)) => processed_cnt,
|
||||
Some(Err(e)) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Failed to decode the count of processed downward message.",
|
||||
);
|
||||
return None;
|
||||
}
|
||||
None => 0,
|
||||
};
|
||||
|
||||
let horizontal_messages = sp_io::storage::get(well_known_keys::HRMP_OUTBOUND_MESSAGES);
|
||||
let horizontal_messages = match horizontal_messages
|
||||
.map(|v| Vec::<OutboundHrmpMessage>::decode(&mut &v[..]))
|
||||
{
|
||||
Some(Ok(horizontal_messages)) => horizontal_messages,
|
||||
Some(Err(e)) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Failed to decode the horizontal messages.",
|
||||
);
|
||||
return None;
|
||||
}
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
let hrmp_watermark = sp_io::storage::get(well_known_keys::HRMP_WATERMARK);
|
||||
let hrmp_watermark = match hrmp_watermark.map(|v| PBlockNumber::decode(&mut &v[..])) {
|
||||
Some(Ok(hrmp_watermark)) => hrmp_watermark,
|
||||
Some(Err(e)) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Failed to decode the HRMP watermark."
|
||||
);
|
||||
return None;
|
||||
}
|
||||
None => {
|
||||
// If the runtime didn't set `HRMP_WATERMARK`, then it means no messages were
|
||||
// supplied via the message ingestion inherent. Assuming that the PVF/runtime
|
||||
// checks that legitly there are no pending messages we can therefore move the
|
||||
// watermark up to the relay-block number.
|
||||
relay_block_number
|
||||
}
|
||||
};
|
||||
|
||||
Some(Collation {
|
||||
upward_messages,
|
||||
new_validation_code: new_validation_code.map(Into::into),
|
||||
head_data,
|
||||
proof_of_validity: PoV { block_data },
|
||||
processed_downward_messages,
|
||||
horizontal_messages,
|
||||
hrmp_watermark,
|
||||
})
|
||||
Some(Collation {
|
||||
upward_messages: collation_info.upward_messages,
|
||||
new_validation_code: collation_info.new_validation_code,
|
||||
processed_downward_messages: collation_info.processed_downward_messages,
|
||||
horizontal_messages: collation_info.horizontal_messages,
|
||||
hrmp_watermark: collation_info.hrmp_watermark,
|
||||
head_data,
|
||||
proof_of_validity: PoV { block_data },
|
||||
})
|
||||
}
|
||||
|
||||
@@ -308,7 +237,7 @@ where
|
||||
);
|
||||
|
||||
let block_hash = b.header().hash();
|
||||
let collation = self.build_collation(b, block_hash, validation_data.relay_parent_number)?;
|
||||
let collation = self.build_collation(b, block_hash)?;
|
||||
|
||||
let (result_sender, signed_stmt_recv) = oneshot::channel();
|
||||
|
||||
@@ -330,9 +259,9 @@ where
|
||||
}
|
||||
|
||||
/// Parameters for [`start_collator`].
|
||||
pub struct StartCollatorParams<Block: BlockT, Backend, BS, Spawner> {
|
||||
pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {
|
||||
pub para_id: ParaId,
|
||||
pub backend: Arc<Backend>,
|
||||
pub runtime_api: Arc<RA>,
|
||||
pub block_status: Arc<BS>,
|
||||
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
|
||||
pub overseer_handler: OverseerHandler,
|
||||
@@ -342,7 +271,7 @@ pub struct StartCollatorParams<Block: BlockT, Backend, BS, Spawner> {
|
||||
}
|
||||
|
||||
/// Start the collator.
|
||||
pub async fn start_collator<Block, Backend, BS, Spawner>(
|
||||
pub async fn start_collator<Block, RA, BS, Spawner>(
|
||||
StartCollatorParams {
|
||||
para_id,
|
||||
block_status,
|
||||
@@ -351,19 +280,20 @@ pub async fn start_collator<Block, Backend, BS, Spawner>(
|
||||
spawner,
|
||||
key,
|
||||
parachain_consensus,
|
||||
backend,
|
||||
}: StartCollatorParams<Block, Backend, BS, Spawner>,
|
||||
runtime_api,
|
||||
}: StartCollatorParams<Block, RA, BS, Spawner>,
|
||||
) where
|
||||
Block: BlockT,
|
||||
Backend: sc_client_api::Backend<Block> + 'static,
|
||||
BS: BlockBackend<Block> + Send + Sync + 'static,
|
||||
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
|
||||
RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||
RA::Api: CollectCollationInfo<Block>,
|
||||
{
|
||||
let collator = Collator::new(
|
||||
block_status,
|
||||
Arc::new(spawner),
|
||||
announce_block,
|
||||
backend,
|
||||
runtime_api,
|
||||
parachain_consensus,
|
||||
);
|
||||
|
||||
@@ -400,13 +330,15 @@ mod tests {
|
||||
use cumulus_test_runtime::{Block, Header};
|
||||
use futures::{channel::mpsc, executor::block_on, StreamExt};
|
||||
use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
|
||||
use polkadot_overseer::{AllSubsystems, Overseer, HeadSupportsParachains};
|
||||
use polkadot_overseer::{AllSubsystems, HeadSupportsParachains, Overseer};
|
||||
use sp_consensus::BlockOrigin;
|
||||
use sp_core::{testing::TaskExecutor, Pair};
|
||||
|
||||
struct AlwaysSupportsParachains;
|
||||
impl HeadSupportsParachains for AlwaysSupportsParachains {
|
||||
fn head_supports_parachains(&self, _head: &PHash) -> bool { true }
|
||||
fn head_supports_parachains(&self, _head: &PHash) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -450,9 +382,7 @@ mod tests {
|
||||
let spawner = TaskExecutor::new();
|
||||
let para_id = ParaId::from(100);
|
||||
let announce_block = |_, _| ();
|
||||
let client_builder = TestClientBuilder::new();
|
||||
let backend = client_builder.backend();
|
||||
let client = Arc::new(client_builder.build());
|
||||
let client = Arc::new(TestClientBuilder::new().build());
|
||||
let header = client.header(&BlockId::Number(0)).unwrap().unwrap();
|
||||
|
||||
let (sub_tx, sub_rx) = mpsc::channel(64);
|
||||
@@ -465,12 +395,13 @@ mod tests {
|
||||
None,
|
||||
AlwaysSupportsParachains,
|
||||
spawner.clone(),
|
||||
).expect("Creates overseer");
|
||||
)
|
||||
.expect("Creates overseer");
|
||||
|
||||
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
|
||||
|
||||
let collator_start = start_collator(StartCollatorParams {
|
||||
backend,
|
||||
runtime_api: client.clone(),
|
||||
block_status: client.clone(),
|
||||
announce_block: Arc::new(announce_block),
|
||||
overseer_handler: handler,
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
//! Provides functions for starting a collator node or a normal full node.
|
||||
|
||||
use cumulus_client_consensus_common::ParachainConsensus;
|
||||
use cumulus_primitives_core::ParaId;
|
||||
use cumulus_primitives_core::{CollectCollationInfo, ParaId};
|
||||
use futures::FutureExt;
|
||||
use polkadot_primitives::v1::{Block as PBlock, CollatorPair};
|
||||
use polkadot_service::{AbstractClient, Client as PClient, ClientHandle, RuntimeApiCollection};
|
||||
@@ -28,6 +28,7 @@ use sc_client_api::{
|
||||
};
|
||||
use sc_service::{error::Result as ServiceResult, Configuration, Role, TaskManager};
|
||||
use sc_telemetry::TelemetryWorkerHandle;
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_blockchain::HeaderBackend;
|
||||
use sp_consensus::BlockImport;
|
||||
use sp_core::traits::SpawnNamed;
|
||||
@@ -40,8 +41,7 @@ pub mod genesis;
|
||||
type RFullNode<C> = polkadot_service::NewFull<C>;
|
||||
|
||||
/// Parameters given to [`start_collator`].
|
||||
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, Backend, Spawner, RClient> {
|
||||
pub backend: Arc<Backend>,
|
||||
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, Spawner, RClient> {
|
||||
pub block_status: Arc<BS>,
|
||||
pub client: Arc<Client>,
|
||||
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
|
||||
@@ -60,7 +60,6 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, Backend, Spawner,
|
||||
/// parachain validator for validation and inclusion into the relay chain.
|
||||
pub async fn start_collator<'a, Block, BS, Client, Backend, Spawner, RClient>(
|
||||
StartCollatorParams {
|
||||
backend,
|
||||
block_status,
|
||||
client,
|
||||
announce_block,
|
||||
@@ -70,7 +69,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, Spawner, RClient>(
|
||||
task_manager,
|
||||
relay_chain_full_node,
|
||||
parachain_consensus,
|
||||
}: StartCollatorParams<'a, Block, BS, Client, Backend, Spawner, RClient>,
|
||||
}: StartCollatorParams<'a, Block, BS, Client, Spawner, RClient>,
|
||||
) -> sc_service::error::Result<()>
|
||||
where
|
||||
Block: BlockT,
|
||||
@@ -82,11 +81,13 @@ where
|
||||
+ Sync
|
||||
+ BlockBackend<Block>
|
||||
+ BlockchainEvents<Block>
|
||||
+ ProvideRuntimeApi<Block>
|
||||
+ 'static,
|
||||
Client::Api: CollectCollationInfo<Block>,
|
||||
for<'b> &'b Client: BlockImport<Block>,
|
||||
Backend: BackendT<Block> + 'static,
|
||||
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
|
||||
RClient: ClientHandle,
|
||||
Backend: BackendT<Block> + 'static,
|
||||
{
|
||||
relay_chain_full_node.client.execute_with(StartConsensus {
|
||||
para_id,
|
||||
@@ -97,7 +98,7 @@ where
|
||||
})?;
|
||||
|
||||
cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
|
||||
backend,
|
||||
runtime_api: client.clone(),
|
||||
block_status,
|
||||
announce_block,
|
||||
overseer_handler: relay_chain_full_node
|
||||
|
||||
Reference in New Issue
Block a user