From 274b3f618b763acbd67f266b36b3f43c2095f323 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Tue, 2 Feb 2021 19:12:20 +0100 Subject: [PATCH] MQC authorization (#308) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * MQC auth Update polkadot WIP * Update polkadot * Silly syntax errors * Fix typo * Leave some comments and docs * Apply suggestions from code review Co-authored-by: Bastian Köcher * Introduce the MessageQueueChain structure * Move the HRMP channel relevance check below * Fix the `receive_hrmp_after_pause` test * ValidationData is passed by reference * Replace "to cumulus" with "to the collator" * Update the test so that they are same as in polkadot Co-authored-by: Bastian Köcher --- cumulus/Cargo.lock | 2 + cumulus/parachain-system/Cargo.toml | 2 + cumulus/parachain-system/src/lib.rs | 608 +++++++++++++++++++++++++--- cumulus/primitives/src/lib.rs | 2 +- 4 files changed, 553 insertions(+), 61 deletions(-) diff --git a/cumulus/Cargo.lock b/cumulus/Cargo.lock index 6f8e42eab6..d7308e6066 100644 --- a/cumulus/Cargo.lock +++ b/cumulus/Cargo.lock @@ -1127,6 +1127,8 @@ dependencies = [ "frame-support", "frame-system", "hash-db", + "hex-literal 0.2.1", + "lazy_static", "pallet-balances", "parity-scale-codec", "polkadot-parachain", diff --git a/cumulus/parachain-system/Cargo.toml b/cumulus/parachain-system/Cargo.toml index 08ffe9ae3a..05ab692d2d 100644 --- a/cumulus/parachain-system/Cargo.toml +++ b/cumulus/parachain-system/Cargo.toml @@ -36,6 +36,8 @@ sp-externalities = { git = "https://github.com/paritytech/substrate", default-fe substrate-test-runtime-client = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" } sp-version = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" } cumulus-test-relay-sproof-builder = { path = "../test/relay-sproof-builder" } +hex-literal = "0.2.1" +lazy_static = "1.4" [features] default = ['std'] diff --git a/cumulus/parachain-system/src/lib.rs b/cumulus/parachain-system/src/lib.rs index 26f19eb4cd..a6c4fcb7ce 100644 --- a/cumulus/parachain-system/src/lib.rs +++ b/cumulus/parachain-system/src/lib.rs @@ -29,18 +29,24 @@ use cumulus_primitives::{ inherents::{SystemInherentData, SYSTEM_INHERENT_IDENTIFIER}, - well_known_keys::{self, NEW_VALIDATION_CODE, VALIDATION_DATA}, AbridgedHostConfiguration, DownwardMessageHandler, - HrmpMessageHandler, HrmpMessageSender, UpwardMessageSender, - OnValidationData, OutboundHrmpMessage, UpwardMessage, PersistedValidationData, ParaId, relay_chain, + relay_chain, + well_known_keys::{self, NEW_VALIDATION_CODE, VALIDATION_DATA}, + AbridgedHostConfiguration, DownwardMessageHandler, HrmpMessageHandler, HrmpMessageSender, + InboundDownwardMessage, InboundHrmpMessage, OnValidationData, OutboundHrmpMessage, ParaId, + PersistedValidationData, UpwardMessage, UpwardMessageSender, }; use frame_support::{ - decl_error, decl_event, decl_module, decl_storage, ensure, storage, - weights::{DispatchClass, Weight}, dispatch::DispatchResult, traits::Get, + decl_error, decl_event, decl_module, decl_storage, + dispatch::DispatchResult, + ensure, storage, + traits::Get, + weights::{DispatchClass, Weight}, }; use frame_system::{ensure_none, ensure_root}; use parachain::primitives::RelayChainBlockNumber; use sp_inherents::{InherentData, InherentIdentifier, ProvideInherent}; -use sp_std::{vec::Vec, cmp}; +use sp_runtime::traits::{BlakeTwo256, Hash}; +use sp_std::{cmp, collections::btree_map::BTreeMap, vec::Vec}; mod relay_state_snapshot; use relay_state_snapshot::MessagingStateSnapshot; @@ -60,6 +66,9 @@ pub trait Config: frame_system::Config { type DownwardMessageHandlers: DownwardMessageHandler; /// The HRMP message handlers that will be informed when a message is received. + /// + /// The messages are dispatched in the order they were relayed by the relay chain. If multiple + /// messages were relayed at one block, these will be dispatched in ascending order of the sender's para ID. type HrmpMessageHandlers: HrmpMessageHandler; } @@ -96,6 +105,17 @@ decl_storage! { /// This data is also absent from the genesis. HostConfiguration get(fn host_configuration): Option; + /// The last downward message queue chain head we have observed. + /// + /// This value is loaded before and saved after processing inbound downward messages carried + /// by the system inherent. + LastDmqMqcHead: MessageQueueChain; + /// The message queue chain heads we have observed per each channel incoming channel. + /// + /// This value is loaded before and saved after processing inbound downward messages carried + /// by the system inherent. + LastHrmpMqcHeads: BTreeMap; + PendingUpwardMessages: Vec; /// Essentially `OutboundHrmpMessage`s grouped by the recipients. @@ -183,41 +203,10 @@ decl_module! { RelevantMessagingState::put(relevant_messaging_state); HostConfiguration::put(host_config); - ::on_validation_data(vfp); + ::on_validation_data(&vfp); - let dm_count = downward_messages.len() as u32; - for downward_message in downward_messages { - T::DownwardMessageHandlers::handle_downward_message(downward_message); - } - - // Store the processed_downward_messages here so that it's will be accessible from - // PVF's `validate_block` wrapper and collation pipeline. - storage::unhashed::put( - well_known_keys::PROCESSED_DOWNWARD_MESSAGES, - &dm_count, - ); - - let mut hrmp_watermark = None; - for (sender, channel_contents) in horizontal_messages { - for horizontal_message in channel_contents { - if hrmp_watermark - .map(|w| w < horizontal_message.sent_at) - .unwrap_or(true) - { - hrmp_watermark = Some(horizontal_message.sent_at); - } - - T::HrmpMessageHandlers::handle_hrmp_message(sender, horizontal_message); - } - } - - // If we processed at least one message, then advance watermark to that location. - if let Some(hrmp_watermark) = hrmp_watermark { - storage::unhashed::put( - well_known_keys::HRMP_WATERMARK, - &hrmp_watermark, - ); - } + Self::process_inbound_downward_messages(&vfp, downward_messages)?; + Self::process_inbound_horizontal_messages(&vfp, horizontal_messages)?; Ok(()) } @@ -443,6 +432,126 @@ decl_module! { } impl Module { + /// Process all inbound downward messages relayed by the collator. + /// + /// Checks if the sequence of the messages is valid, dispatches them and communicates the number + /// of processed messages to the collator via a storage update. + fn process_inbound_downward_messages( + vfp: &PersistedValidationData, + downward_messages: Vec, + ) -> DispatchResult { + let dm_count = downward_messages.len() as u32; + + let result_mqc_head = LastDmqMqcHead::mutate(move |mqc| { + for downward_message in downward_messages { + mqc.extend_downward(&downward_message); + T::DownwardMessageHandlers::handle_downward_message(downward_message); + } + mqc.0 + }); + + // After hashing each message in the message queue chain submitted by the collator, we should + // arrive to the MQC head provided by the relay chain. + ensure!( + result_mqc_head == vfp.dmq_mqc_head, + Error::::DmpMqcMismatch + ); + + // Store the processed_downward_messages here so that it will be accessible from + // PVF's `validate_block` wrapper and collation pipeline. + storage::unhashed::put(well_known_keys::PROCESSED_DOWNWARD_MESSAGES, &dm_count); + + Ok(()) + } + + /// Process all inbound horizontal messages relayed by the collator. + /// + /// This is similar to [`process_inbound_downward_messages`], but works on multiple inbound + /// channels. + fn process_inbound_horizontal_messages( + vfp: &PersistedValidationData, + horizontal_messages: BTreeMap>, + ) -> DispatchResult { + // First, check that all submitted messages are sent from channels that exist. The channel + // exists if its MQC head is present in `vfp.hrmp_mqc_heads`. + for sender in horizontal_messages.keys() { + ensure!( + vfp.hrmp_mqc_heads + .binary_search_by_key(sender, |&(s, _)| s) + .is_ok(), + Error::::HrmpNoMqc, + ); + } + + // Second, prepare horizontal messages for a more convenient processing: + // + // instead of a mapping from a para to a list of inbound HRMP messages, we will have a list + // of tuples `(sender, message)` first ordered by `sent_at` (the relay chain block number + // in which the message hit the relay-chain) and second ordered by para id ascending. + // + // The messages will be dispatched in this order. + let mut horizontal_messages = horizontal_messages + .into_iter() + .flat_map(|(sender, channel_contents)| { + channel_contents + .into_iter() + .map(move |message| (sender, message)) + }) + .collect::>(); + horizontal_messages.sort_by(|a, b| { + // first sort by sent-at and then by the para id + match a.1.sent_at.cmp(&b.1.sent_at) { + cmp::Ordering::Equal => a.0.cmp(&b.0), + ord => ord, + } + }); + + let last_mqc_heads = LastHrmpMqcHeads::get(); + let mut running_mqc_heads = BTreeMap::new(); + let mut hrmp_watermark = None; + + for (sender, horizontal_message) in horizontal_messages { + if hrmp_watermark + .map(|w| w < horizontal_message.sent_at) + .unwrap_or(true) + { + hrmp_watermark = Some(horizontal_message.sent_at); + } + + running_mqc_heads + .entry(sender) + .or_insert_with(|| last_mqc_heads.get(&sender).cloned().unwrap_or_default()) + .extend_hrmp(&horizontal_message); + + T::HrmpMessageHandlers::handle_hrmp_message(sender, horizontal_message); + } + + // Check that the MQC heads for each channel provided by the relay chain match the MQC heads + // we have after processing all incoming messages. + // + // Along the way we also carry over the relevant entries from the `last_mqc_heads` to + // `running_mqc_heads`. Otherwise, in a block where no messages were sent in a channel + // it won't get into next block's `last_mqc_heads` and thus will be all zeros, which + // would corrupt the message queue chain. + for &(ref sender, ref target_head) in &vfp.hrmp_mqc_heads { + let cur_head = running_mqc_heads + .entry(*sender) + .or_insert_with(|| last_mqc_heads.get(&sender).cloned().unwrap_or_default()) + .head(); + + ensure!(&cur_head == target_head, Error::::HrmpMqcMismatch); + } + + LastHrmpMqcHeads::put(running_mqc_heads); + + // If we processed at least one message, then advance watermark to that location. + if let Some(hrmp_watermark) = hrmp_watermark { + storage::unhashed::put(well_known_keys::HRMP_WATERMARK, &hrmp_watermark); + } + + Ok(()) + } + /// Get validation data. /// /// Returns `Some(_)` after the inherent set the data for the current block. @@ -526,6 +635,42 @@ impl Module { } } +/// This struct provides ability to extend a message queue chain (MQC) and compute a new head. +/// +/// MQC is an instance of a [hash chain] applied to a message queue. Using a hash chain it's possible +/// to represent a sequence of messages using only a single hash. +/// +/// A head for an empty chain is agreed to be a zero hash. +/// +/// [hash chain]: https://en.wikipedia.org/wiki/Hash_chain +#[derive(Default, Clone, codec::Encode, codec::Decode)] +struct MessageQueueChain(relay_chain::Hash); + +impl MessageQueueChain { + fn extend_hrmp(&mut self, horizontal_message: &InboundHrmpMessage) -> &mut Self { + let prev_head = self.0; + self.0 = BlakeTwo256::hash_of(&( + prev_head, + horizontal_message.sent_at, + BlakeTwo256::hash_of(&horizontal_message.data), + )); + self + } + + fn extend_downward(&mut self, downward_message: &InboundDownwardMessage) -> &mut Self { + let prev_head = self.0; + self.0 = BlakeTwo256::hash_of(&( + prev_head, + downward_message.sent_at, + BlakeTwo256::hash_of(&downward_message.msg), + )); + self + } + + fn head(&self) -> relay_chain::Hash { + self.0 + } +} /// An error that can be raised upon sending an upward message. #[derive(Debug, PartialEq)] @@ -683,6 +828,21 @@ decl_error! { HostConfigurationNotAvailable, /// Invalid relay-chain storage merkle proof InvalidRelayChainMerkleProof, + /// The messages submitted by the collator in the system inherent when hashed sequentially + /// do not produce the hash that is produced by the relay-chain. + /// + /// This means that at least some of the submitted messages were altered, omitted or added + /// illegaly. + DmpMqcMismatch, + /// The collator submitted a message that is received from a sender that doesn't have a + /// channel opened to this parachain, according to the relay-parent state. + HrmpNoMqc, + /// After processing all messages submitted by the collator and extending hash chains we + /// haven't arrived to the MQCs that were produced by the relay-chain. + /// + /// That means that one or more channels had at least some of the submitted messages altered, + /// omitted or added illegaly. + HrmpMqcMismatch, } } @@ -692,7 +852,9 @@ mod tests { use super::*; use codec::Encode; - use cumulus_primitives::{AbridgedHrmpChannel, PersistedValidationData}; + use cumulus_primitives::{ + AbridgedHrmpChannel, InboundDownwardMessage, InboundHrmpMessage, PersistedValidationData, + }; use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder; use frame_support::{ assert_ok, @@ -701,13 +863,12 @@ mod tests { traits::{OnFinalize, OnInitialize}, }; use frame_system::{InitKind, RawOrigin}; + use hex_literal::hex; use relay_chain::v1::HrmpChannelId; use sp_core::H256; - use sp_runtime::{ - testing::Header, - traits::{BlakeTwo256, IdentityLookup}, - }; + use sp_runtime::{testing::Header, traits::IdentityLookup}; use sp_version::RuntimeVersion; + use std::cell::RefCell; impl_outer_origin! { pub enum Origin for Test where system = frame_system {} @@ -770,16 +931,42 @@ mod tests { type Event = TestEvent; type OnValidationData = (); type SelfParaId = ParachainId; - type DownwardMessageHandlers = (); - type HrmpMessageHandlers = (); + type DownwardMessageHandlers = SaveIntoThreadLocal; + type HrmpMessageHandlers = SaveIntoThreadLocal; } type ParachainSystem = Module; type System = frame_system::Module; + pub struct SaveIntoThreadLocal; + + std::thread_local! { + static HANDLED_DOWNWARD_MESSAGES: RefCell> = RefCell::new(Vec::new()); + static HANDLED_HRMP_MESSAGES: RefCell> = RefCell::new(Vec::new()); + } + + impl DownwardMessageHandler for SaveIntoThreadLocal { + fn handle_downward_message(msg: InboundDownwardMessage) { + HANDLED_DOWNWARD_MESSAGES.with(|m| { + m.borrow_mut().push(msg); + }); + } + } + + impl HrmpMessageHandler for SaveIntoThreadLocal { + fn handle_hrmp_message(sender: ParaId, msg: InboundHrmpMessage) { + HANDLED_HRMP_MESSAGES.with(|m| { + m.borrow_mut().push((sender, msg)); + }) + } + } + // This function basically just builds a genesis storage key/value store according to // our desired mockup. fn new_test_ext() -> sp_io::TestExternalities { + HANDLED_DOWNWARD_MESSAGES.with(|m| m.borrow_mut().clear()); + HANDLED_HRMP_MESSAGES.with(|m| m.borrow_mut().clear()); + frame_system::GenesisConfig::default() .build_storage::() .unwrap() @@ -831,9 +1018,12 @@ mod tests { tests: Vec, pending_upgrade: Option, ran: bool, - relay_sproof_builder_hook: Option< - Box - >, + relay_sproof_builder_hook: + Option>, + persisted_validation_data_hook: + Option>, + inherent_data_hook: + Option>, } impl BlockTests { @@ -876,12 +1066,28 @@ mod tests { fn with_relay_sproof_builder(mut self, f: F) -> Self where - F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut RelayStateSproofBuilder) + F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut RelayStateSproofBuilder), { self.relay_sproof_builder_hook = Some(Box::new(f)); self } + fn with_validation_data(mut self, f: F) -> Self + where + F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut PersistedValidationData), + { + self.persisted_validation_data_hook = Some(Box::new(f)); + self + } + + fn with_inherent_data(mut self, f: F) -> Self + where + F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut SystemInherentData), + { + self.inherent_data_hook = Some(Box::new(f)); + self + } + fn run(&mut self) { self.ran = true; wasm_ext().execute_with(|| { @@ -913,11 +1119,14 @@ mod tests { } let (relay_storage_root, relay_chain_state) = sproof_builder.into_state_root_and_proof(); - let vfp = PersistedValidationData { + let mut vfp = PersistedValidationData { block_number: *n as RelayChainBlockNumber, relay_storage_root, ..Default::default() }; + if let Some(ref hook) = self.persisted_validation_data_hook { + hook(self, *n as RelayChainBlockNumber, &mut vfp); + } storage::unhashed::put(VALIDATION_DATA, &vfp); storage::unhashed::kill(NEW_VALIDATION_CODE); @@ -926,13 +1135,17 @@ mod tests { // to storage; they must also be included in the inherent data. let inherent_data = { let mut inherent_data = InherentData::default(); + let mut system_inherent_data = SystemInherentData { + validation_data: vfp.clone(), + relay_chain_state, + downward_messages: Default::default(), + horizontal_messages: Default::default(), + }; + if let Some(ref hook) = self.inherent_data_hook { + hook(self, *n as RelayChainBlockNumber, &mut system_inherent_data); + } inherent_data - .put_data(SYSTEM_INHERENT_IDENTIFIER, &SystemInherentData { - validation_data: vfp.clone(), - relay_chain_state, - downward_messages: Default::default(), - horizontal_messages: Default::default(), - }) + .put_data(SYSTEM_INHERENT_IDENTIFIER, &system_inherent_data) .expect("failed to put VFP inherent"); inherent_data }; @@ -1118,7 +1331,7 @@ mod tests { }) .add_with_post_test(2, || { /* do nothing within block */ }, - || { + || { let v: Option>> = storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); assert_eq!( v, @@ -1155,7 +1368,7 @@ mod tests { }) .add_with_post_test(2, || { /* do nothing within block */ }, - || { + || { let v: Option>> = storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); assert_eq!( v, @@ -1351,4 +1564,279 @@ mod tests { }, ); } + + #[test] + fn message_queue_chain() { + assert_eq!(MessageQueueChain::default().head(), H256::zero()); + + // Note that the resulting hashes are the same for HRMP and DMP. That's because even though + // the types are nominally different, they have the same structure and computation of the + // new head doesn't differ. + // + // These cases are taken from https://github.com/paritytech/polkadot/pull/2351 + assert_eq!( + MessageQueueChain::default() + .extend_downward(&InboundDownwardMessage { + sent_at: 2, + msg: vec![1, 2, 3], + }) + .extend_downward(&InboundDownwardMessage { + sent_at: 3, + msg: vec![4, 5, 6], + }) + .head(), + hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), + ); + assert_eq!( + MessageQueueChain::default() + .extend_hrmp(&InboundHrmpMessage { + sent_at: 2, + data: vec![1, 2, 3], + }) + .extend_hrmp(&InboundHrmpMessage { + sent_at: 3, + data: vec![4, 5, 6], + }) + .head(), + hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), + ); + } + + #[test] + fn receive_dmp() { + lazy_static::lazy_static! { + static ref MSG: InboundDownwardMessage = InboundDownwardMessage { + sent_at: 1, + msg: b"down".to_vec(), + }; + } + + BlockTests::new() + .with_validation_data( + |_, relay_block_num, validation_data| match relay_block_num { + 1 => { + validation_data.dmq_mqc_head = + MessageQueueChain::default().extend_downward(&MSG).head(); + } + _ => unreachable!(), + }, + ) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.downward_messages.push(MSG.clone()); + } + _ => unreachable!(), + }) + .add(1, || { + HANDLED_DOWNWARD_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[MSG.clone()]); + m.clear(); + }); + }); + } + + #[test] + fn receive_hrmp() { + lazy_static::lazy_static! { + static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 1, + data: b"aquadisco".to_vec(), + }; + + static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 1, + data: b"mudroom".to_vec(), + }; + + static ref MSG_3: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 2, + data: b"eggpeeling".to_vec(), + }; + + static ref MSG_4: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 2, + data: b"casino".to_vec(), + }; + } + + BlockTests::new() + .with_validation_data( + |_, relay_block_num, validation_data| match relay_block_num { + 1 => { + // 200 - doesn't exist yet + // 300 - one new message + validation_data.hrmp_mqc_heads.push(( + ParaId::from(300), + MessageQueueChain::default().extend_hrmp(&MSG_1).head(), + )); + } + 2 => { + // 200 - two new messages + // 300 - now present with one message. + validation_data.hrmp_mqc_heads.push(( + ParaId::from(200), + MessageQueueChain::default().extend_hrmp(&MSG_4).head(), + )); + validation_data.hrmp_mqc_heads.push(( + ParaId::from(300), + MessageQueueChain::default() + .extend_hrmp(&MSG_1) + .extend_hrmp(&MSG_2) + .extend_hrmp(&MSG_3) + .head(), + )); + } + 3 => { + // 200 - no new messages + // 300 - is gone + validation_data.hrmp_mqc_heads.push(( + ParaId::from(200), + MessageQueueChain::default().extend_hrmp(&MSG_4).head(), + )); + } + _ => unreachable!(), + }, + ) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.horizontal_messages + .insert(ParaId::from(300), vec![MSG_1.clone()]); + } + 2 => { + data.horizontal_messages.insert( + ParaId::from(300), + vec![ + // can't be sent at the block 1 actually. However, we cheat here + // because we want to test the case where there are multiple messages + // but the harness at the moment doesn't support block skipping. + MSG_2.clone(), + MSG_3.clone(), + ], + ); + data.horizontal_messages + .insert(ParaId::from(200), vec![MSG_4.clone()]); + } + 3 => {} + _ => unreachable!(), + }) + .add(1, || { + HANDLED_HRMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ParaId::from(300), MSG_1.clone())]); + m.clear(); + }); + }) + .add(2, || { + HANDLED_HRMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!( + &*m, + &[ + (ParaId::from(300), MSG_2.clone()), + (ParaId::from(200), MSG_4.clone()), + (ParaId::from(300), MSG_3.clone()), + ] + ); + m.clear(); + }); + }) + .add(3, || {}); + } + + #[test] + fn receive_hrmp_empty_channel() { + BlockTests::new() + .with_validation_data( + |_, relay_block_num, validation_data| match relay_block_num { + 1 => { + // no channels + } + 2 => { + // one new channel + validation_data.hrmp_mqc_heads.push(( + ParaId::from(300), + MessageQueueChain::default().head(), + )); + } + _ => unreachable!(), + }, + ) + .add(1, || {}) + .add(2, || {}); + } + + #[test] + fn receive_hrmp_after_pause() { + lazy_static::lazy_static! { + static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 1, + data: b"mikhailinvanovich".to_vec(), + }; + + static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 3, + data: b"1000000000".to_vec(), + }; + } + + const ALICE: ParaId = ParaId::new(300); + + BlockTests::new() + .with_validation_data( + |_, relay_block_num, validation_data| match relay_block_num { + 1 => { + validation_data.hrmp_mqc_heads.push(( + ALICE, + MessageQueueChain::default().extend_hrmp(&MSG_1).head(), + )); + } + 2 => { + // 300 - no new messages, mqc stayed the same. + validation_data.hrmp_mqc_heads.push(( + ALICE, + MessageQueueChain::default().extend_hrmp(&MSG_1).head(), + )); + } + 3 => { + // 300 - new message. + validation_data.hrmp_mqc_heads.push(( + ALICE, + MessageQueueChain::default() + .extend_hrmp(&MSG_1) + .extend_hrmp(&MSG_2) + .head(), + )); + } + _ => unreachable!(), + }, + ) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.horizontal_messages.insert(ALICE, vec![MSG_1.clone()]); + } + 2 => { + // no new messages + } + 3 => { + data.horizontal_messages.insert(ALICE, vec![MSG_2.clone()]); + } + _ => unreachable!(), + }) + .add(1, || { + HANDLED_HRMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ALICE, MSG_1.clone())]); + m.clear(); + }); + }) + .add(2, || {}) + .add(3, || { + HANDLED_HRMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ALICE, MSG_2.clone())]); + m.clear(); + }); + }); + } } diff --git a/cumulus/primitives/src/lib.rs b/cumulus/primitives/src/lib.rs index 9ea4a67e05..533ee3a8f8 100644 --- a/cumulus/primitives/src/lib.rs +++ b/cumulus/primitives/src/lib.rs @@ -131,5 +131,5 @@ pub trait HrmpMessageSender { /// A trait which is called when the validation data is set. #[impl_trait_for_tuples::impl_for_tuples(30)] pub trait OnValidationData { - fn on_validation_data(data: PersistedValidationData); + fn on_validation_data(data: &PersistedValidationData); }