Introduce CollectCollationInfo runtime api (#443)

* Introduce `CollectCollationInfo` runtime api

Instead of using well known keys to communicate information about a
collation between the runtime and the collator, we now use a runtime api
for this.

* Fixes bug

* Apply suggestions from code review

Co-authored-by: Sergei Shulepov <sergei@parity.io>

* Doc update

Co-authored-by: Sergei Shulepov <sergei@parity.io>
This commit is contained in:
Bastian Köcher
2021-05-17 16:33:33 +02:00
committed by GitHub
parent 3bcd7f0fd8
commit d458d2622b
14 changed files with 218 additions and 279 deletions
+44 -113
View File
@@ -17,18 +17,16 @@
//! Cumulus Collator implementation for Substrate.
use cumulus_client_network::WaitToAnnounce;
use cumulus_primitives_core::{
well_known_keys, OutboundHrmpMessage, ParachainBlockData, PersistedValidationData,
};
use cumulus_primitives_core::{CollectCollationInfo, ParachainBlockData, PersistedValidationData};
use sc_client_api::BlockBackend;
use sp_api::ProvideRuntimeApi;
use sp_consensus::BlockStatus;
use sp_core::traits::SpawnNamed;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT, Zero},
};
use sp_state_machine::InspectState;
use cumulus_client_consensus_common::ParachainConsensus;
use polkadot_node_primitives::{
@@ -36,9 +34,7 @@ use polkadot_node_primitives::{
};
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_overseer::OverseerHandler;
use polkadot_primitives::v1::{
BlockNumber as PBlockNumber, CollatorPair, Hash as PHash, HeadData, Id as ParaId, UpwardMessage,
};
use polkadot_primitives::v1::{CollatorPair, Hash as PHash, HeadData, Id as ParaId};
use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};
@@ -50,36 +46,37 @@ use tracing::Instrument;
const LOG_TARGET: &str = "cumulus-collator";
/// The implementation of the Cumulus `Collator`.
pub struct Collator<Block: BlockT, BS, Backend> {
pub struct Collator<Block: BlockT, BS, RA> {
block_status: Arc<BS>,
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
backend: Arc<Backend>,
runtime_api: Arc<RA>,
}
impl<Block: BlockT, BS, Backend> Clone for Collator<Block, BS, Backend> {
impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {
fn clone(&self) -> Self {
Self {
block_status: self.block_status.clone(),
wait_to_announce: self.wait_to_announce.clone(),
backend: self.backend.clone(),
parachain_consensus: self.parachain_consensus.clone(),
runtime_api: self.runtime_api.clone(),
}
}
}
impl<Block, BS, Backend> Collator<Block, BS, Backend>
impl<Block, BS, RA> Collator<Block, BS, RA>
where
Block: BlockT,
BS: BlockBackend<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RA: ProvideRuntimeApi<Block>,
RA::Api: CollectCollationInfo<Block>,
{
/// Create a new instance.
fn new(
block_status: Arc<BS>,
spawner: Arc<dyn SpawnNamed + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
backend: Arc<Backend>,
runtime_api: Arc<RA>,
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
) -> Self {
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block)));
@@ -87,7 +84,7 @@ where
Self {
block_status,
wait_to_announce,
backend,
runtime_api,
parachain_consensus,
}
}
@@ -154,103 +151,35 @@ where
&mut self,
block: ParachainBlockData<Block>,
block_hash: Block::Hash,
relay_block_number: PBlockNumber,
) -> Option<Collation> {
let block_data = BlockData(block.encode());
let header = block.into_header();
let head_data = HeadData(header.encode());
let state = match self.backend.state_at(BlockId::Hash(block_hash)) {
Ok(state) => state,
let collation_info = match self
.runtime_api
.runtime_api()
.collect_collation_info(&BlockId::Hash(block_hash))
{
Ok(ci) => ci,
Err(e) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to get state of the freshly built block.",
"Failed to collect collation info.",
);
return None;
}
};
state.inspect_state(|| {
let upward_messages = sp_io::storage::get(well_known_keys::UPWARD_MESSAGES);
let upward_messages =
match upward_messages.map(|v| Vec::<UpwardMessage>::decode(&mut &v[..])) {
Some(Ok(msgs)) => msgs,
Some(Err(e)) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to decode upward messages from the build block.",
);
return None;
}
None => Vec::new(),
};
let new_validation_code = sp_io::storage::get(well_known_keys::NEW_VALIDATION_CODE);
let processed_downward_messages =
sp_io::storage::get(well_known_keys::PROCESSED_DOWNWARD_MESSAGES);
let processed_downward_messages =
match processed_downward_messages.map(|v| u32::decode(&mut &v[..])) {
Some(Ok(processed_cnt)) => processed_cnt,
Some(Err(e)) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to decode the count of processed downward message.",
);
return None;
}
None => 0,
};
let horizontal_messages = sp_io::storage::get(well_known_keys::HRMP_OUTBOUND_MESSAGES);
let horizontal_messages = match horizontal_messages
.map(|v| Vec::<OutboundHrmpMessage>::decode(&mut &v[..]))
{
Some(Ok(horizontal_messages)) => horizontal_messages,
Some(Err(e)) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to decode the horizontal messages.",
);
return None;
}
None => Vec::new(),
};
let hrmp_watermark = sp_io::storage::get(well_known_keys::HRMP_WATERMARK);
let hrmp_watermark = match hrmp_watermark.map(|v| PBlockNumber::decode(&mut &v[..])) {
Some(Ok(hrmp_watermark)) => hrmp_watermark,
Some(Err(e)) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to decode the HRMP watermark."
);
return None;
}
None => {
// If the runtime didn't set `HRMP_WATERMARK`, then it means no messages were
// supplied via the message ingestion inherent. Assuming that the PVF/runtime
// checks that legitly there are no pending messages we can therefore move the
// watermark up to the relay-block number.
relay_block_number
}
};
Some(Collation {
upward_messages,
new_validation_code: new_validation_code.map(Into::into),
head_data,
proof_of_validity: PoV { block_data },
processed_downward_messages,
horizontal_messages,
hrmp_watermark,
})
Some(Collation {
upward_messages: collation_info.upward_messages,
new_validation_code: collation_info.new_validation_code,
processed_downward_messages: collation_info.processed_downward_messages,
horizontal_messages: collation_info.horizontal_messages,
hrmp_watermark: collation_info.hrmp_watermark,
head_data,
proof_of_validity: PoV { block_data },
})
}
@@ -308,7 +237,7 @@ where
);
let block_hash = b.header().hash();
let collation = self.build_collation(b, block_hash, validation_data.relay_parent_number)?;
let collation = self.build_collation(b, block_hash)?;
let (result_sender, signed_stmt_recv) = oneshot::channel();
@@ -330,9 +259,9 @@ where
}
/// Parameters for [`start_collator`].
pub struct StartCollatorParams<Block: BlockT, Backend, BS, Spawner> {
pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {
pub para_id: ParaId,
pub backend: Arc<Backend>,
pub runtime_api: Arc<RA>,
pub block_status: Arc<BS>,
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
pub overseer_handler: OverseerHandler,
@@ -342,7 +271,7 @@ pub struct StartCollatorParams<Block: BlockT, Backend, BS, Spawner> {
}
/// Start the collator.
pub async fn start_collator<Block, Backend, BS, Spawner>(
pub async fn start_collator<Block, RA, BS, Spawner>(
StartCollatorParams {
para_id,
block_status,
@@ -351,19 +280,20 @@ pub async fn start_collator<Block, Backend, BS, Spawner>(
spawner,
key,
parachain_consensus,
backend,
}: StartCollatorParams<Block, Backend, BS, Spawner>,
runtime_api,
}: StartCollatorParams<Block, RA, BS, Spawner>,
) where
Block: BlockT,
Backend: sc_client_api::Backend<Block> + 'static,
BS: BlockBackend<Block> + Send + Sync + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
RA::Api: CollectCollationInfo<Block>,
{
let collator = Collator::new(
block_status,
Arc::new(spawner),
announce_block,
backend,
runtime_api,
parachain_consensus,
);
@@ -400,13 +330,15 @@ mod tests {
use cumulus_test_runtime::{Block, Header};
use futures::{channel::mpsc, executor::block_on, StreamExt};
use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
use polkadot_overseer::{AllSubsystems, Overseer, HeadSupportsParachains};
use polkadot_overseer::{AllSubsystems, HeadSupportsParachains, Overseer};
use sp_consensus::BlockOrigin;
use sp_core::{testing::TaskExecutor, Pair};
struct AlwaysSupportsParachains;
impl HeadSupportsParachains for AlwaysSupportsParachains {
fn head_supports_parachains(&self, _head: &PHash) -> bool { true }
fn head_supports_parachains(&self, _head: &PHash) -> bool {
true
}
}
#[derive(Clone)]
@@ -450,9 +382,7 @@ mod tests {
let spawner = TaskExecutor::new();
let para_id = ParaId::from(100);
let announce_block = |_, _| ();
let client_builder = TestClientBuilder::new();
let backend = client_builder.backend();
let client = Arc::new(client_builder.build());
let client = Arc::new(TestClientBuilder::new().build());
let header = client.header(&BlockId::Number(0)).unwrap().unwrap();
let (sub_tx, sub_rx) = mpsc::channel(64);
@@ -465,12 +395,13 @@ mod tests {
None,
AlwaysSupportsParachains,
spawner.clone(),
).expect("Creates overseer");
)
.expect("Creates overseer");
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
let collator_start = start_collator(StartCollatorParams {
backend,
runtime_api: client.clone(),
block_status: client.clone(),
announce_block: Arc::new(announce_block),
overseer_handler: handler,