From aba8f46ec97743edc92c07c56355ec6ca6ac2f5f Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Tue, 15 Dec 2020 05:42:31 +0100 Subject: [PATCH] Integrate HRMP (#258) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * HRMP message ingestion * Plumb hrmp_watermark to build_collation * Plumb hrmp_watermark to ValidationResult * Plumb hrmp outbound messages * Implement message-broker part of HRMP * Kill UPWARD_MESSAGES as well Otherwise, they will get resent each block * Add sudo versions for easier testing * Remove the xcmp module Not useful for the moment * Doc for HRMP message handler * Estimate the weight upper bound for on_finalize * Remove a redundant type annotation * fix spelling of a method * Apply suggestions from code review Co-authored-by: Bastian Köcher * Deabbreviate dmp and hrmp in the message ingestion type * Don't use binary_search since it's broken by a following rotate Instead use the linear search. We can afford linear search here since due to limited scalability of HRMP we can only have at most a couple of dozens of channels. * Fix the watermark Co-authored-by: Bastian Köcher --- Cargo.lock | 1 + collator/src/lib.rs | 90 ++++++++-- message-broker/src/lib.rs | 163 +++++++++++++++++-- primitives/Cargo.toml | 2 + primitives/src/lib.rs | 49 +++++- primitives/src/xcmp.rs | 42 ----- rococo-parachains/runtime/src/lib.rs | 1 + runtime/src/validate_block/implementation.rs | 24 ++- 8 files changed, 292 insertions(+), 80 deletions(-) delete mode 100644 primitives/src/xcmp.rs diff --git a/Cargo.lock b/Cargo.lock index 5cb5548981..10644cb2dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1155,6 +1155,7 @@ dependencies = [ "polkadot-parachain", "polkadot-primitives", "sc-chain-spec", + "sp-core", "sp-inherents", "sp-runtime", "sp-std", diff --git a/collator/src/lib.rs b/collator/src/lib.rs index 7708b131d1..0a8c5df99f 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -18,8 +18,8 @@ use cumulus_network::WaitToAnnounce; use cumulus_primitives::{ - inherents::{DownwardMessagesType, DOWNWARD_MESSAGES_IDENTIFIER, VALIDATION_DATA_IDENTIFIER}, - well_known_keys, ValidationData, + inherents::{self, VALIDATION_DATA_IDENTIFIER}, + well_known_keys, ValidationData, InboundHrmpMessage, OutboundHrmpMessage, InboundDownwardMessage, }; use cumulus_runtime::ParachainBlockData; @@ -52,7 +52,7 @@ use log::{debug, error, info, trace}; use futures::prelude::*; -use std::{marker::PhantomData, sync::Arc, time::Duration}; +use std::{marker::PhantomData, sync::Arc, time::Duration, collections::BTreeMap}; use parking_lot::Mutex; @@ -146,8 +146,9 @@ where /// for. /// /// Returns `None` in case of an error. - fn retrieve_dmq_contents(&self, relay_parent: PHash) -> Option { - self.polkadot_client + fn retrieve_dmq_contents(&self, relay_parent: PHash) -> Option> { + self + .polkadot_client .runtime_api() .dmq_contents_with_context( &BlockId::hash(relay_parent), @@ -164,6 +165,31 @@ where .ok() } + /// Returns channels contents for each inbound HRMP channel addressed to the parachain we are + /// collating for. + /// + /// Empty channels are also included. + fn retrieve_all_inbound_hrmp_channel_contents(&self, relay_parent: PHash) + -> Option>> + { + self + .polkadot_client + .runtime_api() + .inbound_hrmp_channels_contents_with_context( + &BlockId::hash(relay_parent), + sp_core::ExecutionContext::Importing, + self.para_id, + ) + .map_err(|e| { + error!( + target: "cumulus-collator", + "An error occured during requesting the inbound HRMP messages for {}: {:?}", + relay_parent, e, + ); + }) + .ok() + } + /// Get the inherent data with validation function parameters injected fn inherent_data( &mut self, @@ -193,9 +219,18 @@ where }) .ok()?; - let downward_messages = self.retrieve_dmq_contents(relay_parent)?; + let message_ingestion_data = { + let downward_messages = self.retrieve_dmq_contents(relay_parent)?; + let horizontal_messages = self.retrieve_all_inbound_hrmp_channel_contents(relay_parent)?; + + inherents::MessageIngestionType { + downward_messages, + horizontal_messages, + } + }; + inherent_data - .put_data(DOWNWARD_MESSAGES_IDENTIFIER, &downward_messages) + .put_data(inherents::MESSAGE_INGESTION_IDENTIFIER, &message_ingestion_data) .map_err(|e| { error!( target: "cumulus-collator", @@ -296,15 +331,50 @@ where None => 0, }; + let horizontal_messages = sp_io::storage::get(well_known_keys::HRMP_OUTBOUND_MESSAGES); + let horizontal_messages = match horizontal_messages + .map(|v| Vec::::decode(&mut &v[..])) + { + Some(Ok(horizontal_messages)) => horizontal_messages, + Some(Err(e)) => { + error!( + target: "cumulus-collator", + "Failed to decode the horizontal messages: {:?}", + e + ); + return None + } + None => Vec::new(), + }; + + let hrmp_watermark = sp_io::storage::get(well_known_keys::HRMP_WATERMARK); + let hrmp_watermark = match hrmp_watermark.map(|v| PBlockNumber::decode(&mut &v[..])) { + Some(Ok(hrmp_watermark)) => hrmp_watermark, + Some(Err(e)) => { + error!( + target: "cumulus-collator", + "Failed to decode the HRMP watermark: {:?}", + e + ); + return None + } + None => { + // If the runtime didn't set `HRMP_WATERMARK`, then it means no messages were + // supplied via the message ingestion inherent. Assuming that the PVF/runtime + // checks that legitly there are no pending messages we can therefore move the + // watermark up to the relay-block number. + relay_block_number + } + }; + Some(Collation { upward_messages, new_validation_code: new_validation_code.map(Into::into), head_data, proof_of_validity: PoV { block_data }, processed_downward_messages, - // TODO! - horizontal_messages: Vec::new(), - hrmp_watermark: relay_block_number, + horizontal_messages, + hrmp_watermark, }) }) } diff --git a/message-broker/src/lib.rs b/message-broker/src/lib.rs index 976f114f34..771860cc05 100644 --- a/message-broker/src/lib.rs +++ b/message-broker/src/lib.rs @@ -27,57 +27,117 @@ use frame_support::{ weights::{DispatchClass, Weight}, StorageValue, }; -use frame_system::ensure_none; +use frame_system::{ensure_none, ensure_root}; use sp_inherents::{InherentData, InherentIdentifier, MakeFatalError, ProvideInherent}; use sp_std::{cmp, prelude::*}; use cumulus_primitives::{ - inherents::{DownwardMessagesType, DOWNWARD_MESSAGES_IDENTIFIER}, - well_known_keys, DownwardMessageHandler, InboundDownwardMessage, UpwardMessage, + inherents::{MessageIngestionType, MESSAGE_INGESTION_IDENTIFIER}, + well_known_keys, DownwardMessageHandler, HrmpMessageHandler, OutboundHrmpMessage, + UpwardMessage, ParaId, }; +// TODO: these should be not a constant, but sourced from the relay-chain configuration. +const UMP_MSG_NUM_PER_CANDIDATE: usize = 5; +const HRMP_MSG_NUM_PER_CANDIDATE: usize = 5; + /// Configuration trait of the message broker pallet. pub trait Config: frame_system::Config { /// The downward message handlers that will be informed when a message is received. type DownwardMessageHandlers: DownwardMessageHandler; + /// The HRMP message handlers that will be informed when a message is received. + type HrmpMessageHandlers: HrmpMessageHandler; } decl_storage! { trait Store for Module as MessageBroker { PendingUpwardMessages: Vec; + + /// Essentially `OutboundHrmpMessage`s grouped by the recipients. + OutboundHrmpMessages: map hasher(twox_64_concat) ParaId => Vec>; + /// HRMP channels with the given recipients are awaiting to be processed. If a `ParaId` is + /// present in this vector then `OutboundHrmpMessages` for it should be not empty. + NonEmptyHrmpChannels: Vec; } } decl_module! { pub struct Module for enum Call where origin: T::Origin { /// An entrypoint for an inherent to deposit downward messages into the runtime. It accepts - /// and processes the list of downward messages. + /// and processes the list of downward messages and inbound HRMP messages. #[weight = (10, DispatchClass::Mandatory)] - fn receive_downward_messages(origin, messages: Vec) { + fn ingest_inbound_messages(origin, messages: MessageIngestionType) { ensure_none(origin)?; - let messages_len = messages.len() as u32; - for message in messages { - T::DownwardMessageHandlers::handle_downward_message(message); + let MessageIngestionType { + downward_messages, + horizontal_messages, + } = messages; + + let dm_count = downward_messages.len() as u32; + for downward_message in downward_messages { + T::DownwardMessageHandlers::handle_downward_message(downward_message); } // Store the processed_downward_messages here so that it's will be accessible from // PVF's `validate_block` wrapper and collation pipeline. storage::unhashed::put( well_known_keys::PROCESSED_DOWNWARD_MESSAGES, - &messages_len, + &dm_count, ); + + let mut hrmp_watermark = None; + for (sender, channel_contents) in horizontal_messages { + for horizontal_message in channel_contents { + if hrmp_watermark + .map(|w| w < horizontal_message.sent_at) + .unwrap_or(true) + { + hrmp_watermark = Some(horizontal_message.sent_at); + } + + T::HrmpMessageHandlers::handle_hrmp_message(sender, horizontal_message); + } + } + + // If we processed at least one message, then advance watermark to that location. + if let Some(hrmp_watermark) = hrmp_watermark { + storage::unhashed::put( + well_known_keys::HRMP_WATERMARK, + &hrmp_watermark, + ); + } + } + + #[weight = (1_000, DispatchClass::Operational)] + fn sudo_send_upward_message(origin, message: UpwardMessage) { + ensure_root(origin)?; + let _ = Self::send_upward_message(message); + } + + #[weight = (1_000, DispatchClass::Operational)] + fn sudo_send_hrmp_message(origin, message: OutboundHrmpMessage) { + ensure_root(origin)?; + let _ = Self::send_hrmp_message(message); } fn on_initialize() -> Weight { - // Reads and writes performed by `on_finalize`. - T::DbWeight::get().reads_writes(1, 2) + let mut weight = T::DbWeight::get().writes(3); + storage::unhashed::kill(well_known_keys::HRMP_WATERMARK); + storage::unhashed::kill(well_known_keys::UPWARD_MESSAGES); + storage::unhashed::kill(well_known_keys::HRMP_OUTBOUND_MESSAGES); + + // Reads and writes performed by `on_finalize`. This may actually turn out to be lower, + // but we should err on the safe side. + weight += T::DbWeight::get().reads_writes( + 2 + HRMP_MSG_NUM_PER_CANDIDATE as u64, + 4 + HRMP_MSG_NUM_PER_CANDIDATE as u64, + ); + + weight } fn on_finalize() { - // TODO: this should be not a constant, but sourced from the relay-chain configuration. - const UMP_MSG_NUM_PER_CANDIDATE: usize = 5; - ::PendingUpwardMessages::mutate(|up| { let num = cmp::min(UMP_MSG_NUM_PER_CANDIDATE, up.len()); storage::unhashed::put( @@ -86,6 +146,50 @@ decl_module! { ); *up = up.split_off(num); }); + + // Sending HRMP messages is a little bit more involved. On top of the number of messages + // per block limit, there is also a constraint that it's possible to send only a single + // message to a given recipient per candidate. + let mut non_empty_hrmp_channels = NonEmptyHrmpChannels::get(); + let outbound_hrmp_num = cmp::min(HRMP_MSG_NUM_PER_CANDIDATE, non_empty_hrmp_channels.len()); + let mut outbound_hrmp_messages = Vec::with_capacity(outbound_hrmp_num); + let mut prune_empty = Vec::with_capacity(outbound_hrmp_num); + + for &recipient in non_empty_hrmp_channels.iter().take(outbound_hrmp_num) { + let (message_payload, became_empty) = + ::OutboundHrmpMessages::mutate(&recipient, |v| { + // this panics if `v` is empty. However, we are iterating only once over non-empty + // channels, therefore it cannot panic. + let first = v.remove(0); + let became_empty = v.is_empty(); + (first, became_empty) + }); + + outbound_hrmp_messages.push(OutboundHrmpMessage { + recipient, + data: message_payload, + }); + if became_empty { + prune_empty.push(recipient); + } + } + + // Prune hrmp channels that became empty. Additionally, because it may so happen that we + // only gave attention to some channels in `non_empty_hrmp_channels` it's important to + // change the order. Otherwise, the next `on_finalize` we will again give attention + // only to those channels that happen to be in the beginning, until they are emptied. + // This leads to "starvation" of the channels near to the end. + // + // To mitigate this we shift all processed elements towards the end of the vector using + // `rotate_left`. To get intution how it works see the examples in its rustdoc. + non_empty_hrmp_channels.retain(|x| !prune_empty.contains(x)); + non_empty_hrmp_channels.rotate_left(outbound_hrmp_num - prune_empty.len()); + + ::NonEmptyHrmpChannels::put(non_empty_hrmp_channels); + storage::unhashed::put( + well_known_keys::HRMP_OUTBOUND_MESSAGES, + &outbound_hrmp_messages, + ); } } } @@ -96,6 +200,14 @@ pub enum SendUpErr { TooBig, } +/// An error that can be raised upon sending a horizontal message. +pub enum SendHorizonalErr { + /// The message sent is too big. + TooBig, + /// There is no channel to the specified destination. + NoChannel, +} + impl Module { pub fn send_upward_message(message: UpwardMessage) -> Result<(), SendUpErr> { // TODO: check the message against the limit. The limit should be sourced from the @@ -103,16 +215,33 @@ impl Module { ::PendingUpwardMessages::append(message); Ok(()) } + + pub fn send_hrmp_message(message: OutboundHrmpMessage) -> Result<(), SendHorizonalErr> { + // TODO: + // (a) check against the size limit sourced from the relay-chain configuration + // (b) check if the channel to the recipient is actually opened. + + let OutboundHrmpMessage { recipient, data } = message; + ::OutboundHrmpMessages::append(&recipient, data); + + ::NonEmptyHrmpChannels::mutate(|v| { + if !v.contains(&recipient) { + v.push(recipient); + } + }); + + Ok(()) + } } impl ProvideInherent for Module { type Call = Call; type Error = MakeFatalError<()>; - const INHERENT_IDENTIFIER: InherentIdentifier = DOWNWARD_MESSAGES_IDENTIFIER; + const INHERENT_IDENTIFIER: InherentIdentifier = MESSAGE_INGESTION_IDENTIFIER; fn create_inherent(data: &InherentData) -> Option { - data.get_data::(&DOWNWARD_MESSAGES_IDENTIFIER) + data.get_data::(&MESSAGE_INGESTION_IDENTIFIER) .expect("Downward messages inherent data failed to decode") - .map(|msgs| Call::receive_downward_messages(msgs)) + .map(|msgs| Call::ingest_inbound_messages(msgs)) } } diff --git a/primitives/Cargo.toml b/primitives/Cargo.toml index b0f46efc3e..0d26e9bfdf 100644 --- a/primitives/Cargo.toml +++ b/primitives/Cargo.toml @@ -10,6 +10,7 @@ sp-inherents = { git = "https://github.com/paritytech/substrate", default-featur sp-std = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" } sc-chain-spec = { git = "https://github.com/paritytech/substrate", optional = true, branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" } # Polkadot dependencies polkadot-parachain = { git = "https://github.com/paritytech/polkadot", default-features = false, branch = "master" } @@ -33,4 +34,5 @@ std = [ "sp-inherents/std", "polkadot-core-primitives/std", "sp-runtime/std", + "sp-core/std", ] diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index f18b1fc0c7..f21a683596 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -27,17 +27,35 @@ pub use polkadot_primitives::v1::{ #[cfg(feature = "std")] pub mod genesis; -pub mod xcmp; + +/// An inbound HRMP message. +pub type InboundHrmpMessage = polkadot_primitives::v1::InboundHrmpMessage; + +/// And outbound HRMP message +pub type OutboundHrmpMessage = polkadot_primitives::v1::OutboundHrmpMessage; /// Identifiers and types related to Cumulus Inherents pub mod inherents { use sp_inherents::InherentIdentifier; + use sp_std::{ + vec::Vec, + collections::btree_map::BTreeMap, + }; + use super::{InboundDownwardMessage, InboundHrmpMessage, ParaId}; - /// Inherent identifier for downward messages. - pub const DOWNWARD_MESSAGES_IDENTIFIER: InherentIdentifier = *b"cumdownm"; - - /// The type of the inherent downward messages. - pub type DownwardMessagesType = sp_std::vec::Vec; + /// Inherent identifier for message ingestion inherent. + pub const MESSAGE_INGESTION_IDENTIFIER: InherentIdentifier = *b"msgingst"; + /// The data passed via a message ingestion inherent. Consists of a bundle of + /// DMP and HRMP messages. + #[derive(codec::Encode, codec::Decode, sp_core::RuntimeDebug, Clone, PartialEq)] + pub struct MessageIngestionType { + /// Downward messages in the order they were sent. + pub downward_messages: Vec, + /// HRMP messages grouped by channels. The messages in the inner vec must be in order they + /// were sent. In combination with the rule of no more than one message in a channel per block, + /// this means `sent_at` is **strictly** greater than the previous one (if any). + pub horizontal_messages: BTreeMap>, + } /// The identifier for the `set_validation_data` inherent. pub const VALIDATION_DATA_IDENTIFIER: InherentIdentifier = *b"valfunp0"; @@ -58,6 +76,18 @@ pub mod well_known_keys { /// Code upgarde (set as appropriate by a pallet). pub const NEW_VALIDATION_CODE: &'static [u8] = b":cumulus_new_validation_code:"; + /// The storage key with which the runtime passes outbound HRMP messages it wants to send to the + /// PVF. + /// + /// The value is stored as SCALE encoded `Vec` + pub const HRMP_OUTBOUND_MESSAGES: &'static [u8] = b":cumulus_hrmp_outbound_messages:"; + + /// The storage key for communicating the HRMP watermark from the runtime to the PVF. Cleared by + /// the runtime each block and set after message inclusion, but only if there were messages. + /// + /// The value is stored as SCALE encoded relay-chain's `BlockNumber`. + pub const HRMP_WATERMARK: &'static [u8] = b":cumulus_hrmp_watermark:"; + /// The storage key for the processed downward messages. /// /// The value is stored as SCALE encoded `u32`. @@ -71,6 +101,13 @@ pub trait DownwardMessageHandler { fn handle_downward_message(msg: InboundDownwardMessage); } +/// Something that should be called when an HRMP message is received. +#[impl_trait_for_tuples::impl_for_tuples(30)] +pub trait HrmpMessageHandler { + /// Handle the given HRMP message. + fn handle_hrmp_message(sender: ParaId, msg: InboundHrmpMessage); +} + /// A trait which is called when the validation data is set. #[impl_trait_for_tuples::impl_for_tuples(30)] pub trait OnValidationData { diff --git a/primitives/src/xcmp.rs b/primitives/src/xcmp.rs deleted file mode 100644 index 6fdd4fb22f..0000000000 --- a/primitives/src/xcmp.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// This file is part of Cumulus. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Cumulus. If not, see . - -//! XMCP related primitives - -use polkadot_primitives::v0::Id as ParaId; -use sp_std::vec::Vec; - -/// A raw XCMP message that is being send between two Parachain's. -#[derive(codec::Encode, codec::Decode)] -pub struct RawXCMPMessage { - /// Parachain sending the message. - pub from: ParaId, - /// SCALE encoded message. - pub data: Vec, -} - -/// Something that can handle XCMP messages. -#[impl_trait_for_tuples::impl_for_tuples(30)] -pub trait XCMPMessageHandler { - /// Handle a XCMP message. - fn handle_xcmp_message(src: ParaId, msg: &Message); -} - -/// Something that can send XCMP messages. -pub trait XCMPMessageSender { - /// Send a XCMP message to the given parachain. - fn send_xcmp_message(dest: ParaId, msg: &Message) -> Result<(), ()>; -} diff --git a/rococo-parachains/runtime/src/lib.rs b/rococo-parachains/runtime/src/lib.rs index 8096795ca0..f2a37ccade 100644 --- a/rococo-parachains/runtime/src/lib.rs +++ b/rococo-parachains/runtime/src/lib.rs @@ -221,6 +221,7 @@ impl cumulus_parachain_upgrade::Config for Runtime { impl cumulus_message_broker::Config for Runtime { type DownwardMessageHandlers = (); + type HrmpMessageHandlers = (); } impl parachain_info::Config for Runtime {} diff --git a/runtime/src/validate_block/implementation.rs b/runtime/src/validate_block/implementation.rs index f0adcc1f78..0bdfbbc157 100644 --- a/runtime/src/validate_block/implementation.rs +++ b/runtime/src/validate_block/implementation.rs @@ -30,8 +30,9 @@ use codec::{Decode, Encode}; use cumulus_primitives::{ well_known_keys::{ NEW_VALIDATION_CODE, PROCESSED_DOWNWARD_MESSAGES, UPWARD_MESSAGES, VALIDATION_DATA, + HRMP_WATERMARK, HRMP_OUTBOUND_MESSAGES, }, - UpwardMessage, ValidationData, + UpwardMessage, ValidationData, OutboundHrmpMessage, }; use sp_core::storage::{ChildInfo, TrackedStorageKey}; use sp_externalities::{ @@ -165,15 +166,28 @@ pub fn validate_block>(params: ValidationParams) - .and_then(|v| Decode::decode(&mut &v[..]).ok()) .expect("`ValidationData` is required to be placed into the storage!"); + let horizontal_messages = match overlay.storage(HRMP_OUTBOUND_MESSAGES).flatten() { + Some(encoded) => Vec::::decode(&mut &encoded[..]) + .expect("Outbound HRMP messages vec is not correctly encoded in the storage!"), + None => Vec::new(), + }; + + let hrmp_watermark = overlay + .storage(HRMP_WATERMARK) + .flatten() + .map(|v| { + Decode::decode(&mut &v[..]) + .expect("HRMP watermark is not encoded correctly") + }) + .unwrap_or(validation_data.persisted.block_number); + ValidationResult { head_data, new_validation_code, upward_messages, processed_downward_messages, - //TODO! - horizontal_messages: Vec::new(), - //TODO! - hrmp_watermark: validation_data.persisted.block_number, + horizontal_messages, + hrmp_watermark, } }