Integrate HRMP (#258)

* HRMP message ingestion

* Plumb hrmp_watermark to build_collation

* Plumb hrmp_watermark to ValidationResult

* Plumb hrmp outbound messages

* Implement message-broker part of HRMP

* Kill UPWARD_MESSAGES as well

Otherwise, they will get resent each block

* Add sudo versions for easier testing

* Remove the xcmp module

Not useful for the moment

* Doc for HRMP message handler

* Estimate the weight upper bound for on_finalize

* Remove a redundant type annotation

* fix spelling of a method

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Deabbreviate dmp and hrmp in the message ingestion type

* Don't use binary_search since it's broken by a following rotate

Instead use the linear search. We can afford linear search here since
due to limited scalability of HRMP we can only have at most a couple of
dozens of channels.

* Fix the watermark

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Sergei Shulepov
2020-12-15 05:42:31 +01:00
committed by GitHub
parent 1c5add6a39
commit c84c9b6bb0
8 changed files with 292 additions and 80 deletions
+80 -10
View File
@@ -18,8 +18,8 @@
use cumulus_network::WaitToAnnounce;
use cumulus_primitives::{
inherents::{DownwardMessagesType, DOWNWARD_MESSAGES_IDENTIFIER, VALIDATION_DATA_IDENTIFIER},
well_known_keys, ValidationData,
inherents::{self, VALIDATION_DATA_IDENTIFIER},
well_known_keys, ValidationData, InboundHrmpMessage, OutboundHrmpMessage, InboundDownwardMessage,
};
use cumulus_runtime::ParachainBlockData;
@@ -52,7 +52,7 @@ use log::{debug, error, info, trace};
use futures::prelude::*;
use std::{marker::PhantomData, sync::Arc, time::Duration};
use std::{marker::PhantomData, sync::Arc, time::Duration, collections::BTreeMap};
use parking_lot::Mutex;
@@ -146,8 +146,9 @@ where
/// for.
///
/// Returns `None` in case of an error.
fn retrieve_dmq_contents(&self, relay_parent: PHash) -> Option<DownwardMessagesType> {
self.polkadot_client
fn retrieve_dmq_contents(&self, relay_parent: PHash) -> Option<Vec<InboundDownwardMessage>> {
self
.polkadot_client
.runtime_api()
.dmq_contents_with_context(
&BlockId::hash(relay_parent),
@@ -164,6 +165,31 @@ where
.ok()
}
/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
/// collating for.
///
/// Empty channels are also included.
fn retrieve_all_inbound_hrmp_channel_contents(&self, relay_parent: PHash)
-> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>
{
self
.polkadot_client
.runtime_api()
.inbound_hrmp_channels_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
self.para_id,
)
.map_err(|e| {
error!(
target: "cumulus-collator",
"An error occured during requesting the inbound HRMP messages for {}: {:?}",
relay_parent, e,
);
})
.ok()
}
/// Get the inherent data with validation function parameters injected
fn inherent_data(
&mut self,
@@ -193,9 +219,18 @@ where
})
.ok()?;
let downward_messages = self.retrieve_dmq_contents(relay_parent)?;
let message_ingestion_data = {
let downward_messages = self.retrieve_dmq_contents(relay_parent)?;
let horizontal_messages = self.retrieve_all_inbound_hrmp_channel_contents(relay_parent)?;
inherents::MessageIngestionType {
downward_messages,
horizontal_messages,
}
};
inherent_data
.put_data(DOWNWARD_MESSAGES_IDENTIFIER, &downward_messages)
.put_data(inherents::MESSAGE_INGESTION_IDENTIFIER, &message_ingestion_data)
.map_err(|e| {
error!(
target: "cumulus-collator",
@@ -296,15 +331,50 @@ where
None => 0,
};
let horizontal_messages = sp_io::storage::get(well_known_keys::HRMP_OUTBOUND_MESSAGES);
let horizontal_messages = match horizontal_messages
.map(|v| Vec::<OutboundHrmpMessage>::decode(&mut &v[..]))
{
Some(Ok(horizontal_messages)) => horizontal_messages,
Some(Err(e)) => {
error!(
target: "cumulus-collator",
"Failed to decode the horizontal messages: {:?}",
e
);
return None
}
None => Vec::new(),
};
let hrmp_watermark = sp_io::storage::get(well_known_keys::HRMP_WATERMARK);
let hrmp_watermark = match hrmp_watermark.map(|v| PBlockNumber::decode(&mut &v[..])) {
Some(Ok(hrmp_watermark)) => hrmp_watermark,
Some(Err(e)) => {
error!(
target: "cumulus-collator",
"Failed to decode the HRMP watermark: {:?}",
e
);
return None
}
None => {
// If the runtime didn't set `HRMP_WATERMARK`, then it means no messages were
// supplied via the message ingestion inherent. Assuming that the PVF/runtime
// checks that legitly there are no pending messages we can therefore move the
// watermark up to the relay-block number.
relay_block_number
}
};
Some(Collation {
upward_messages,
new_validation_code: new_validation_code.map(Into::into),
head_data,
proof_of_validity: PoV { block_data },
processed_downward_messages,
// TODO!
horizontal_messages: Vec::new(),
hrmp_watermark: relay_block_number,
horizontal_messages,
hrmp_watermark,
})
})
}