diff --git a/cumulus/pallets/parachain-system/src/lib.rs b/cumulus/pallets/parachain-system/src/lib.rs index a2c97250fd..3962445738 100644 --- a/cumulus/pallets/parachain-system/src/lib.rs +++ b/cumulus/pallets/parachain-system/src/lib.rs @@ -16,9 +16,9 @@ #![cfg_attr(not(feature = "std"), no_std)] -//! cumulus-pallet-parachain-system is a base module for cumulus-based parachains. +//! cumulus-pallet-parachain-system is a base pallet for cumulus-based parachains. //! -//! This module handles low-level details of being a parachain. It's responsibilities include: +//! This pallet handles low-level details of being a parachain. It's responsibilities include: //! //! - ingestion of the parachain validation data //! - ingestion of incoming downward and lateral messages and dispatching them @@ -37,13 +37,12 @@ use cumulus_primitives_core::{ }; use cumulus_primitives_parachain_inherent::ParachainInherentData; use frame_support::{ - decl_error, decl_event, decl_module, decl_storage, - dispatch::{DispatchResult, DispatchResultWithPostInfo}, ensure, - inherent::{InherentData, InherentIdentifier, ProvideInherent}, + dispatch::{DispatchError, DispatchResult}, storage, traits::Get, - weights::{DispatchClass, Pays, PostDispatchInfo, Weight}, + weights::{PostDispatchInfo, Weight, Pays}, + inherent::{InherentData, InherentIdentifier, ProvideInherent}, }; use frame_system::{ensure_none, ensure_root}; use polkadot_parachain::primitives::RelayChainBlockNumber; @@ -54,253 +53,86 @@ use sp_runtime::{ InvalidTransaction, TransactionLongevity, TransactionSource, TransactionValidity, ValidTransaction, }, - DispatchError, }; use sp_std::{cmp, collections::btree_map::BTreeMap, prelude::*}; mod relay_state_snapshot; #[macro_use] pub mod validate_block; +#[cfg(test)] +mod tests; -/// The pallet's configuration trait. -pub trait Config: frame_system::Config> { - /// The overarching event type. - type Event: From> + Into<::Event>; +pub use pallet::*; - /// Something which can be notified when the validation data is set. - type OnValidationData: OnValidationData; +#[frame_support::pallet] +pub mod pallet { + use super::*; + use frame_support::pallet_prelude::*; + use frame_system::pallet_prelude::*; - /// Returns the parachain ID we are running with. - type SelfParaId: Get; + #[pallet::pallet] + pub struct Pallet(_); - /// The place where outbound XCMP messages come from. This is queried in `finalize_block`. - type OutboundXcmpMessageSource: XcmpMessageSource; + #[pallet::config] + pub trait Config: frame_system::Config> { + /// The overarching event type. + type Event: From> + IsType<::Event>; - /// The message handler that will be invoked when messages are received via DMP. - type DmpMessageHandler: DmpMessageHandler; + /// Something which can be notified when the validation data is set. + type OnValidationData: OnValidationData; - /// The weight we reserve at the beginning of the block for processing DMP messages. - type ReservedDmpWeight: Get; + /// Returns the parachain ID we are running with. + type SelfParaId: Get; - /// The message handler that will be invoked when messages are received via XCMP. - /// - /// The messages are dispatched in the order they were relayed by the relay chain. If multiple - /// messages were relayed at one block, these will be dispatched in ascending order of the - /// sender's para ID. - type XcmpMessageHandler: XcmpMessageHandler; + /// The place where outbound XCMP messages come from. This is queried in `finalize_block`. + type OutboundXcmpMessageSource: XcmpMessageSource; - /// The weight we reserve at the beginning of the block for processing XCMP messages. - type ReservedXcmpWeight: Get; -} + /// The message handler that will be invoked when messages are received via DMP. + type DmpMessageHandler: DmpMessageHandler; -// This pallet's storage items. -decl_storage! { - trait Store for Module as ParachainSystem { - /// We need to store the new validation function for the span between - /// setting it and applying it. If it has a - /// value, then [`PendingValidationFunction`] must have a real value, and - /// together will coordinate the block number where the upgrade will happen. - PendingRelayChainBlockNumber: Option; + /// The weight we reserve at the beginning of the block for processing DMP messages. + type ReservedDmpWeight: Get; - /// The new validation function we will upgrade to when the relay chain - /// reaches [`PendingRelayChainBlockNumber`]. A real validation function must - /// exist here as long as [`PendingRelayChainBlockNumber`] is set. - PendingValidationFunction get(fn new_validation_function): Vec; - - /// The [`PersistedValidationData`] set for this block. - ValidationData get(fn validation_data): Option; - - /// Were the validation data set to notify the relay chain? - DidSetValidationCode: bool; - - /// The last relay parent block number at which we signalled the code upgrade. - LastUpgrade: relay_chain::BlockNumber; - - /// The snapshot of some state related to messaging relevant to the current parachain as per - /// the relay parent. + /// The message handler that will be invoked when messages are received via XCMP. /// - /// This field is meant to be updated each block with the validation data inherent. Therefore, - /// before processing of the inherent, e.g. in `on_initialize` this data may be stale. - /// - /// This data is also absent from the genesis. - RelevantMessagingState get(fn relevant_messaging_state): Option; - /// The parachain host configuration that was obtained from the relay parent. - /// - /// This field is meant to be updated each block with the validation data inherent. Therefore, - /// before processing of the inherent, e.g. in `on_initialize` this data may be stale. - /// - /// This data is also absent from the genesis. - HostConfiguration get(fn host_configuration): Option; + /// The messages are dispatched in the order they were relayed by the relay chain. If + /// multiple messages were relayed at one block, these will be dispatched in ascending + /// order of the sender's para ID. + type XcmpMessageHandler: XcmpMessageHandler; - /// The last downward message queue chain head we have observed. - /// - /// This value is loaded before and saved after processing inbound downward messages carried - /// by the system inherent. - LastDmqMqcHead: MessageQueueChain; - /// The message queue chain heads we have observed per each channel incoming channel. - /// - /// This value is loaded before and saved after processing inbound downward messages carried - /// by the system inherent. - LastHrmpMqcHeads: BTreeMap; - - PendingUpwardMessages: Vec; - - /// The number of HRMP messages we observed in `on_initialize` and thus used that number for - /// announcing the weight of `on_initialize` and `on_finalize`. - AnnouncedHrmpMessagesPerCandidate: u32; - - /// The weight we reserve at the beginning of the block for processing XCMP messages. This - /// overrides the amount set in the Config trait. - ReservedXcmpWeightOverride: Option; - - /// The weight we reserve at the beginning of the block for processing DMP messages. This - /// overrides the amount set in the Config trait. - ReservedDmpWeightOverride: Option; - - /// The next authorized upgrade, if there is one. - AuthorizedUpgrade: Option; + /// The weight we reserve at the beginning of the block for processing XCMP messages. + type ReservedXcmpWeight: Get; } -} -// The pallet's dispatchable functions. -decl_module! { - pub struct Module for enum Call where origin: T::Origin { - type Error = Error; - - // Initializing events - // this is needed only if you are using events in your pallet - fn deposit_event() = default; - - /// Force an already scheduled validation function upgrade to happen on a particular block. - /// - /// Note that coordinating this block for the upgrade has to happen independently on the relay - /// chain and this parachain. Synchronizing the block for the upgrade is sensitive, and this - /// bypasses all checks and and normal protocols. Very easy to brick your chain if done wrong. - #[weight = (0, DispatchClass::Operational)] - pub fn set_upgrade_block(origin, relay_chain_block: RelayChainBlockNumber) { - ensure_root(origin)?; - if let Some(_old_block) = PendingRelayChainBlockNumber::get() { - PendingRelayChainBlockNumber::put(relay_chain_block); - } else { - return Err(Error::::NotScheduled.into()) - } - } - - /// Set the current validation data. - /// - /// This should be invoked exactly once per block. It will panic at the finalization - /// phase if the call was not invoked. - /// - /// The dispatch origin for this call must be `Inherent` - /// - /// As a side effect, this function upgrades the current validation function - /// if the appropriate time has come. - #[weight = (0, DispatchClass::Mandatory)] - // TODO: This weight should be corrected. - pub fn set_validation_data(origin, data: ParachainInherentData) -> DispatchResultWithPostInfo { - ensure_none(origin)?; - assert!( - !ValidationData::exists(), - "ValidationData must be updated only once in a block", - ); - - let ParachainInherentData { - validation_data: vfp, - relay_chain_state, - downward_messages, - horizontal_messages, - } = data; - - Self::validate_validation_data(&vfp); - - // initialization logic: we know that this runs exactly once every block, - // which means we can put the initialization logic here to remove the - // sequencing problem. - if let Some(apply_block) = PendingRelayChainBlockNumber::get() { - if vfp.relay_parent_number >= apply_block { - PendingRelayChainBlockNumber::kill(); - let validation_function = PendingValidationFunction::take(); - LastUpgrade::put(&apply_block); - Self::put_parachain_code(&validation_function); - Self::deposit_event(RawEvent::ValidationFunctionApplied(vfp.relay_parent_number)); - } - } - - let (host_config, relevant_messaging_state) = - match relay_state_snapshot::extract_from_proof( - T::SelfParaId::get(), - vfp.relay_parent_storage_root, - relay_chain_state - ) { - Ok(r) => r, - Err(err) => { - panic!("invalid relay chain merkle proof: {:?}", err); - } - }; - - ValidationData::put(&vfp); - RelevantMessagingState::put(relevant_messaging_state.clone()); - HostConfiguration::put(host_config); - - ::on_validation_data(&vfp); - - // TODO: This is more than zero, but will need benchmarking to figure out what. - let mut total_weight = 0; - total_weight += Self::process_inbound_downward_messages( - relevant_messaging_state.dmq_mqc_head, - downward_messages, - ); - total_weight += Self::process_inbound_horizontal_messages( - &relevant_messaging_state.ingress_channels, - horizontal_messages, - ); - - Ok(PostDispatchInfo { actual_weight: Some(total_weight), pays_fee: Pays::No }) - } - - #[weight = (1_000, DispatchClass::Operational)] - fn sudo_send_upward_message(origin, message: UpwardMessage) { - ensure_root(origin)?; - let _ = Self::send_upward_message(message); - } - - #[weight = (1_000_000, DispatchClass::Operational)] - fn authorize_upgrade(origin, code_hash: T::Hash) { - ensure_root(origin)?; - - AuthorizedUpgrade::::put(&code_hash); - - Self::deposit_event(RawEvent::UpgradeAuthorized(code_hash)); - } - - #[weight = 1_000_000] - fn enact_authorized_upgrade(_origin, code: Vec) -> DispatchResultWithPostInfo { - // No ensure origin on purpose. We validate by checking the code vs hash in storage. - Self::validate_authorized_upgrade(&code[..])?; - Self::set_code_impl(code)?; - AuthorizedUpgrade::::kill(); - Ok(Pays::No.into()) - } - - fn on_finalize() { - DidSetValidationCode::kill(); + #[pallet::hooks] + impl Hooks> for Pallet { + fn on_finalize(_: T::BlockNumber) { + >::kill(); let host_config = match Self::host_configuration() { Some(ok) => ok, None => { - debug_assert!(false, "host configuration is promised to set until `on_finalize`; qed"); - return - } - }; - let relevant_messaging_state = match Self::relevant_messaging_state() { - Some(ok) => ok, - None => { - debug_assert!(false, "relevant messaging state is promised to be set until `on_finalize`; qed"); - return + debug_assert!( + false, + "host configuration is promised to set until `on_finalize`; qed", + ); + return; } }; + let relevant_messaging_state = + match Self::relevant_messaging_state() { + Some(ok) => ok, + None => { + debug_assert!( + false, + "relevant messaging state is promised to be set until `on_finalize`; \ + qed", + ); + return; + } + }; - ::PendingUpwardMessages::mutate(|up| { + >::mutate(|up| { let (count, size) = relevant_messaging_state.relay_dispatch_queue_size; let available_capacity = cmp::min( @@ -343,12 +175,12 @@ decl_module! { // - the capacity and total size of the channel is limited, // - the maximum size of a message is limited (and can potentially be changed), - let maximum_channels = host_config.hrmp_max_message_num_per_candidate - .min(AnnouncedHrmpMessagesPerCandidate::take()) as usize; + let maximum_channels = host_config + .hrmp_max_message_num_per_candidate + .min(>::take()) as usize; - let outbound_messages = T::OutboundXcmpMessageSource::take_outbound_messages( - maximum_channels, - ); + let outbound_messages = + T::OutboundXcmpMessageSource::take_outbound_messages(maximum_channels); // Note conversion to the `OutboundHrmpMessage` isn't needed since the data that // `take_outbound_messages` returns encodes equivalently. @@ -356,23 +188,26 @@ decl_module! { // The following code is a smoke test to check that the `OutboundHrmpMessage` type // doesn't accidentally change (e.g. by having a field added to it). If the following // line breaks, then we'll need to revisit the assumption that the result of - // `take_outbound_messages` can be placed into `HRMP_OUTBOUND_MESSAGES` directly without - // a decode/encode round-trip. - let _ = OutboundHrmpMessage { recipient: ParaId::from(0), data: vec![] }; + // `take_outbound_messages` can be placed into `HRMP_OUTBOUND_MESSAGES` directly + // without a decode/encode round-trip. + let _ = OutboundHrmpMessage { + recipient: ParaId::from(0), + data: vec![], + }; storage::unhashed::put(well_known_keys::HRMP_OUTBOUND_MESSAGES, &outbound_messages); } fn on_initialize(_n: T::BlockNumber) -> Weight { - // To prevent removing `NEW_VALIDATION_CODE` that was set by another `on_initialize` like - // for example from scheduler, we only kill the storage entry if it was not yet updated - // in the current block. - if !DidSetValidationCode::get() { + // To prevent removing `NEW_VALIDATION_CODE` that was set by another `on_initialize` + // like for example from scheduler, we only kill the storage entry if it was not yet + // updated in the current block. + if !>::get() { storage::unhashed::kill(NEW_VALIDATION_CODE); } // Remove the validation from the old block. - ValidationData::kill(); + >::kill(); let mut weight = T::DbWeight::get().writes(3); storage::unhashed::kill(well_known_keys::HRMP_WATERMARK); @@ -382,27 +217,26 @@ decl_module! { // Here, in `on_initialize` we must report the weight for both `on_initialize` and // `on_finalize`. // - // One complication here, is that the `host_configuration` is updated by an inherent and - // those are processed after the block initialization phase. Therefore, we have to be - // content only with the configuration as per the previous block. That means that + // One complication here, is that the `host_configuration` is updated by an inherent + // and those are processed after the block initialization phase. Therefore, we have to + // be content only with the configuration as per the previous block. That means that // the configuration can be either stale (or be abscent altogether in case of the // beginning of the chain). // // In order to mitigate this, we do the following. At the time, we are only concerned - // about `hrmp_max_message_num_per_candidate`. We reserve the amount of weight to process - // the number of HRMP messages according to the potentially stale configuration. In - // `on_finalize` we will process only the maximum between the announced number of messages - // and the actual received in the fresh configuration. + // about `hrmp_max_message_num_per_candidate`. We reserve the amount of weight to + // process the number of HRMP messages according to the potentially stale + // configuration. In `on_finalize` we will process only the maximum between the + // announced number of messages and the actual received in the fresh configuration. // // In the common case, they will be the same. In the case the actual value is smaller // than the announced, we would waste some of weight. In the case the actual value is // greater than the announced, we will miss opportunity to send a couple of messages. weight += T::DbWeight::get().reads_writes(1, 1); - let hrmp_max_message_num_per_candidate = - Self::host_configuration() - .map(|cfg| cfg.hrmp_max_message_num_per_candidate) - .unwrap_or(0); - AnnouncedHrmpMessagesPerCandidate::put(hrmp_max_message_num_per_candidate); + let hrmp_max_message_num_per_candidate = Self::host_configuration() + .map(|cfg| cfg.hrmp_max_message_num_per_candidate) + .unwrap_or(0); + >::put(hrmp_max_message_num_per_candidate); // NOTE that the actual weight consumed by `on_finalize` may turn out lower. weight += T::DbWeight::get().reads_writes( @@ -413,9 +247,287 @@ decl_module! { weight } } + + #[pallet::call] + impl Pallet { + /// Force an already scheduled validation function upgrade to happen on a particular block. + /// + /// Note that coordinating this block for the upgrade has to happen independently on the + /// relay chain and this parachain. Synchronizing the block for the upgrade is sensitive, + /// and this bypasses all checks and and normal protocols. Very easy to brick your chain + /// if done wrong. + #[pallet::weight((0, DispatchClass::Operational))] + pub fn set_upgrade_block( + origin: OriginFor, + relay_chain_block: RelayChainBlockNumber, + ) -> DispatchResult { + ensure_root(origin)?; + if >::get().is_some() { + >::put(relay_chain_block); + Ok(()) + } else { + Err(Error::::NotScheduled.into()) + } + } + + /// Set the current validation data. + /// + /// This should be invoked exactly once per block. It will panic at the finalization + /// phase if the call was not invoked. + /// + /// The dispatch origin for this call must be `Inherent` + /// + /// As a side effect, this function upgrades the current validation function + /// if the appropriate time has come. + #[pallet::weight((0, DispatchClass::Mandatory))] + // TODO: This weight should be corrected. + pub fn set_validation_data( + origin: OriginFor, + data: ParachainInherentData, + ) -> DispatchResultWithPostInfo { + ensure_none(origin)?; + assert!( + !>::exists(), + "ValidationData must be updated only once in a block", + ); + + let ParachainInherentData { + validation_data: vfp, + relay_chain_state, + downward_messages, + horizontal_messages, + } = data; + + Self::validate_validation_data(&vfp); + + // initialization logic: we know that this runs exactly once every block, + // which means we can put the initialization logic here to remove the + // sequencing problem. + if let Some(apply_block) = >::get() { + if vfp.relay_parent_number >= apply_block { + >::kill(); + let validation_function = >::take(); + >::put(&apply_block); + Self::put_parachain_code(&validation_function); + Self::deposit_event(Event::ValidationFunctionApplied(vfp.relay_parent_number)); + } + } + + let (host_config, relevant_messaging_state) = + match relay_state_snapshot::extract_from_proof( + T::SelfParaId::get(), + vfp.relay_parent_storage_root, + relay_chain_state, + ) { + Ok(r) => r, + Err(err) => { + panic!("invalid relay chain merkle proof: {:?}", err); + } + }; + + >::put(&vfp); + >::put(relevant_messaging_state.clone()); + >::put(host_config); + + ::on_validation_data(&vfp); + + // TODO: This is more than zero, but will need benchmarking to figure out what. + let mut total_weight = 0; + total_weight += Self::process_inbound_downward_messages( + relevant_messaging_state.dmq_mqc_head, + downward_messages, + ); + total_weight += Self::process_inbound_horizontal_messages( + &relevant_messaging_state.ingress_channels, + horizontal_messages, + ); + + Ok(PostDispatchInfo { + actual_weight: Some(total_weight), + pays_fee: Pays::No, + }) + } + + #[pallet::weight((1_000, DispatchClass::Operational))] + fn sudo_send_upward_message( + origin: OriginFor, + message: UpwardMessage, + ) -> DispatchResult { + ensure_root(origin)?; + let _ = Self::send_upward_message(message); + Ok(()) + } + + #[pallet::weight((1_000_000, DispatchClass::Operational))] + fn authorize_upgrade(origin: OriginFor, code_hash: T::Hash) -> DispatchResult { + ensure_root(origin)?; + + AuthorizedUpgrade::::put(&code_hash); + + Self::deposit_event(Event::UpgradeAuthorized(code_hash)); + Ok(()) + } + + #[pallet::weight(1_000_000)] + fn enact_authorized_upgrade(_: OriginFor, code: Vec) -> DispatchResultWithPostInfo { + Self::validate_authorized_upgrade(&code[..])?; + Self::set_code_impl(code)?; + AuthorizedUpgrade::::kill(); + Ok(Pays::No.into()) + } + } + + #[pallet::event] + #[pallet::generate_deposit(pub(super) fn deposit_event)] + #[pallet::metadata(T::Hash = "Hash")] + pub enum Event { + /// The validation function has been scheduled to apply as of the contained relay chain + /// block number. + ValidationFunctionStored(RelayChainBlockNumber), + /// The validation function was applied as of the contained relay chain block number. + ValidationFunctionApplied(RelayChainBlockNumber), + /// An upgrade has been authorized. + UpgradeAuthorized(T::Hash), + /// Some downward messages have been received and will be processed. + /// \[ count \] + DownwardMessagesReceived(u32), + /// Downward messages were processed using the given weight. + /// \[ weight_used, result_mqc_head \] + DownwardMessagesProcessed(Weight, relay_chain::Hash), + } + + #[pallet::error] + pub enum Error { + /// Attempt to upgrade validation function while existing upgrade pending + OverlappingUpgrades, + /// Polkadot currently prohibits this parachain from upgrading its validation function + ProhibitedByPolkadot, + /// The supplied validation function has compiled into a blob larger than Polkadot is + /// willing to run + TooBig, + /// The inherent which supplies the validation data did not run this block + ValidationDataNotAvailable, + /// The inherent which supplies the host configuration did not run this block + HostConfigurationNotAvailable, + /// No validation function upgrade is currently scheduled. + NotScheduled, + /// No code upgrade has been authorized. + NothingAuthorized, + /// The given code upgrade has not been authorized. + Unauthorized, + } + + /// We need to store the new validation function for the span between + /// setting it and applying it. If it has a + /// value, then [`PendingValidationFunction`] must have a real value, and + /// together will coordinate the block number where the upgrade will happen. + #[pallet::storage] + pub(super) type PendingRelayChainBlockNumber = + StorageValue<_, RelayChainBlockNumber>; + + /// The new validation function we will upgrade to when the relay chain + /// reaches [`PendingRelayChainBlockNumber`]. A real validation function must + /// exist here as long as [`PendingRelayChainBlockNumber`] is set. + #[pallet::storage] + #[pallet::getter(fn new_validation_function)] + pub(super) type PendingValidationFunction = StorageValue<_, Vec, ValueQuery>; + + /// The [`PersistedValidationData`] set for this block. + #[pallet::storage] + #[pallet::getter(fn validation_data)] + pub(super) type ValidationData = StorageValue<_, PersistedValidationData>; + + /// Were the validation data set to notify the relay chain? + #[pallet::storage] + pub(super) type DidSetValidationCode = StorageValue<_, bool, ValueQuery>; + + /// The last relay parent block number at which we signalled the code upgrade. + #[pallet::storage] + pub(super) type LastUpgrade = StorageValue<_, relay_chain::BlockNumber, ValueQuery>; + + /// The snapshot of some state related to messaging relevant to the current parachain as per + /// the relay parent. + /// + /// This field is meant to be updated each block with the validation data inherent. Therefore, + /// before processing of the inherent, e.g. in `on_initialize` this data may be stale. + /// + /// This data is also absent from the genesis. + #[pallet::storage] + #[pallet::getter(fn relevant_messaging_state)] + pub(super) type RelevantMessagingState = StorageValue<_, MessagingStateSnapshot>; + + /// The parachain host configuration that was obtained from the relay parent. + /// + /// This field is meant to be updated each block with the validation data inherent. Therefore, + /// before processing of the inherent, e.g. in `on_initialize` this data may be stale. + /// + /// This data is also absent from the genesis. + #[pallet::storage] + #[pallet::getter(fn host_configuration)] + pub(super) type HostConfiguration = StorageValue<_, AbridgedHostConfiguration>; + + /// The last downward message queue chain head we have observed. + /// + /// This value is loaded before and saved after processing inbound downward messages carried + /// by the system inherent. + #[pallet::storage] + pub(super) type LastDmqMqcHead = StorageValue<_, MessageQueueChain, ValueQuery>; + + /// The message queue chain heads we have observed per each channel incoming channel. + /// + /// This value is loaded before and saved after processing inbound downward messages carried + /// by the system inherent. + #[pallet::storage] + pub(super) type LastHrmpMqcHeads = + StorageValue<_, BTreeMap, ValueQuery>; + + #[pallet::storage] + pub(super) type PendingUpwardMessages = + StorageValue<_, Vec, ValueQuery>; + + /// The number of HRMP messages we observed in `on_initialize` and thus used that number for + /// announcing the weight of `on_initialize` and `on_finalize`. + #[pallet::storage] + pub(super) type AnnouncedHrmpMessagesPerCandidate = StorageValue<_, u32, ValueQuery>; + + /// The weight we reserve at the beginning of the block for processing XCMP messages. This + /// overrides the amount set in the Config trait. + #[pallet::storage] + pub(super) type ReservedXcmpWeightOverride = StorageValue<_, Weight>; + + /// The weight we reserve at the beginning of the block for processing DMP messages. This + /// overrides the amount set in the Config trait. + #[pallet::storage] + pub(super) type ReservedDmpWeightOverride = StorageValue<_, Weight>; + + /// The next authorized upgrade, if there is one. + #[pallet::storage] + pub(super) type AuthorizedUpgrade = StorageValue<_, T::Hash>; + + #[pallet::inherent] + impl ProvideInherent for Pallet { + type Call = Call; + type Error = sp_inherents::MakeFatalError<()>; + const INHERENT_IDENTIFIER: InherentIdentifier = + cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER; + + fn create_inherent(data: &InherentData) -> Option { + let data: ParachainInherentData = data + .get_data(&Self::INHERENT_IDENTIFIER) + .ok() + .flatten() + .expect("validation function params are always injected into inherent data; qed"); + + Some(Call::set_validation_data(data)) + } + + fn is_inherent(call: &Self::Call) -> bool { + matches!(call, Call::set_validation_data(_)) + } + } } -impl Module { +impl Pallet { fn validate_authorized_upgrade(code: &[u8]) -> Result { let required_hash = AuthorizedUpgrade::::get().ok_or(Error::::NothingAuthorized)?; let actual_hash = T::Hashing::hash(&code[..]); @@ -424,7 +536,7 @@ impl Module { } } -impl sp_runtime::traits::ValidateUnsigned for Module { +impl sp_runtime::traits::ValidateUnsigned for Pallet { type Call = Call; fn validate_unsigned(_source: TransactionSource, call: &Self::Call) -> TransactionValidity { @@ -446,21 +558,22 @@ impl sp_runtime::traits::ValidateUnsigned for Module { } } -impl GetChannelInfo for Module { +impl GetChannelInfo for Pallet { fn get_channel_status(id: ParaId) -> ChannelStatus { // Note, that we are using `relevant_messaging_state` which may be from the previous - // block, in case this is called from `on_initialize`, i.e. before the inherent with fresh - // data is submitted. + // block, in case this is called from `on_initialize`, i.e. before the inherent with + // fresh data is submitted. // - // That shouldn't be a problem though because this is anticipated and already can happen. - // This is because sending implies that a message is buffered until there is space to send - // a message in the candidate. After a while waiting in a buffer, it may be discovered that - // the channel to which a message were addressed is now closed. Another possibility, is that - // the maximum message size was decreased so that a message in the buffer doesn't fit. Should - // any of that happen the sender should be notified about the message was discarded. + // That shouldn't be a problem though because this is anticipated and already can + // happen. This is because sending implies that a message is buffered until there is + // space to send a message in the candidate. After a while waiting in a buffer, it may + // be discovered that the channel to which a message were addressed is now closed. + // Another possibility, is that the maximum message size was decreased so that a + // message in the buffer doesn't fit. Should any of that happen the sender should be + // notified about the message was discarded. // - // Here it a similar case, with the difference that the realization that the channel is closed - // came the same block. + // Here it a similar case, with the difference that the realization that the channel is + // closed came the same block. let channels = match Self::relevant_messaging_state() { None => { log::warn!("calling `get_channel_status` with no RelevantMessagingState?!"); @@ -468,8 +581,8 @@ impl GetChannelInfo for Module { } Some(d) => d.egress_channels, }; - // ^^^ NOTE: This storage field should carry over from the previous block. So if it's None - // then it must be that this is an edge-case where a message is attempted to be + // ^^^ NOTE: This storage field should carry over from the previous block. So if it's + // None then it must be that this is an edge-case where a message is attempted to be // sent at the first block. It should be safe to assume that there are no channels // opened at all so early. At least, relying on this assumption seems to be a better // tradeoff, compared to introducing an error variant that the clients should be @@ -495,7 +608,7 @@ impl GetChannelInfo for Module { } } -impl Module { +impl Pallet { /// Validate the given [`PersistedValidationData`] against the /// [`ValidationParams`](polkadot_parachain::primitives::ValidationParams). /// @@ -523,23 +636,23 @@ impl Module { /// Process all inbound downward messages relayed by the collator. /// - /// Checks if the sequence of the messages is valid, dispatches them and communicates the number - /// of processed messages to the collator via a storage update. + /// Checks if the sequence of the messages is valid, dispatches them and communicates the + /// number of processed messages to the collator via a storage update. /// - /// **Panics** if it turns out that after processing all messages the Message Queue Chain hash - /// doesn't match the expected. + /// **Panics** if it turns out that after processing all messages the Message Queue Chain + /// hash doesn't match the expected. fn process_inbound_downward_messages( expected_dmq_mqc_head: relay_chain::Hash, downward_messages: Vec, ) -> Weight { let dm_count = downward_messages.len() as u32; - let mut dmq_head = LastDmqMqcHead::get(); + let mut dmq_head = >::get(); let mut weight_used = 0; if dm_count != 0 { - Self::deposit_event(RawEvent::DownwardMessagesReceived(dm_count)); - let max_weight = - ReservedDmpWeightOverride::get().unwrap_or_else(T::ReservedDmpWeight::get); + Self::deposit_event(Event::DownwardMessagesReceived(dm_count)); + let max_weight = >::get() + .unwrap_or_else(T::ReservedDmpWeight::get); let message_iter = downward_messages .into_iter() @@ -548,16 +661,16 @@ impl Module { }) .map(|m| (m.sent_at, m.msg)); weight_used += T::DmpMessageHandler::handle_dmp_messages(message_iter, max_weight); - LastDmqMqcHead::put(&dmq_head); + >::put(&dmq_head); - Self::deposit_event(RawEvent::DownwardMessagesProcessed(weight_used, dmq_head.0)); - }; + Self::deposit_event(Event::DownwardMessagesProcessed(weight_used, dmq_head.0)); + } - // After hashing each message in the message queue chain submitted by the collator, we should - // arrive to the MQC head provided by the relay chain. + // After hashing each message in the message queue chain submitted by the collator, we + // should arrive to the MQC head provided by the relay chain. // - // A mismatch means that at least some of the submitted messages were altered, omitted or added - // improperly. + // A mismatch means that at least some of the submitted messages were altered, omitted or + // added improperly. assert_eq!(dmq_head.0, expected_dmq_mqc_head); // Store the processed_downward_messages here so that it will be accessible from @@ -572,30 +685,33 @@ impl Module { /// This is similar to [`process_inbound_downward_messages`], but works on multiple inbound /// channels. /// - /// **Panics** if either any of horizontal messages submitted by the collator was sent from a - /// para which has no open channel to this parachain or if after processing messages - /// across all inbound channels MQCs were obtained which do not correspond to the - /// ones found on the relay-chain. + /// **Panics** if either any of horizontal messages submitted by the collator was sent from + /// a para which has no open channel to this parachain or if after processing + /// messages across all inbound channels MQCs were obtained which do not + /// correspond to the ones found on the relay-chain. fn process_inbound_horizontal_messages( ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)], horizontal_messages: BTreeMap>, ) -> Weight { - // First, check that all submitted messages are sent from channels that exist. The channel - // exists if its MQC head is present in `vfp.hrmp_mqc_heads`. + // First, check that all submitted messages are sent from channels that exist. The + // channel exists if its MQC head is present in `vfp.hrmp_mqc_heads`. for sender in horizontal_messages.keys() { - // A violation of the assertion below indicates that one of the messages submitted by - // the collator was sent from a sender that doesn't have a channel opened to this parachain, - // according to the relay-parent state. - assert!(ingress_channels - .binary_search_by_key(sender, |&(s, _)| s) - .is_ok(),); + // A violation of the assertion below indicates that one of the messages submitted + // by the collator was sent from a sender that doesn't have a channel opened to + // this parachain, according to the relay-parent state. + assert!( + ingress_channels + .binary_search_by_key(sender, |&(s, _)| s) + .is_ok(), + ); } // Second, prepare horizontal messages for a more convenient processing: // - // instead of a mapping from a para to a list of inbound HRMP messages, we will have a list - // of tuples `(sender, message)` first ordered by `sent_at` (the relay chain block number - // in which the message hit the relay-chain) and second ordered by para id ascending. + // instead of a mapping from a para to a list of inbound HRMP messages, we will have a + // list of tuples `(sender, message)` first ordered by `sent_at` (the relay chain block + // number in which the message hit the relay-chain) and second ordered by para id + // ascending. // // The messages will be dispatched in this order. let mut horizontal_messages = horizontal_messages @@ -614,7 +730,7 @@ impl Module { } }); - let last_mqc_heads = LastHrmpMqcHeads::get(); + let last_mqc_heads = >::get(); let mut running_mqc_heads = BTreeMap::new(); let mut hrmp_watermark = None; @@ -638,11 +754,11 @@ impl Module { .map(|&(sender, ref message)| (sender, message.sent_at, &message.data[..])); let max_weight = - ReservedXcmpWeightOverride::get().unwrap_or_else(T::ReservedXcmpWeight::get); + >::get().unwrap_or_else(T::ReservedXcmpWeight::get); let weight_used = T::XcmpMessageHandler::handle_xcmp_messages(message_iter, max_weight); - // Check that the MQC heads for each channel provided by the relay chain match the MQC heads - // we have after processing all incoming messages. + // Check that the MQC heads for each channel provided by the relay chain match the MQC + // heads we have after processing all incoming messages. // // Along the way we also carry over the relevant entries from the `last_mqc_heads` to // `running_mqc_heads`. Otherwise, in a block where no messages were sent in a channel @@ -654,10 +770,11 @@ impl Module { .or_insert_with(|| last_mqc_heads.get(&sender).cloned().unwrap_or_default()) .head(); let target_head = channel.mqc_head.unwrap_or_default(); + assert!(cur_head == target_head); } - LastHrmpMqcHeads::put(running_mqc_heads); + >::put(running_mqc_heads); // If we processed at least one message, then advance watermark to that location. if let Some(hrmp_watermark) = hrmp_watermark { @@ -672,7 +789,7 @@ impl Module { /// upgrade has been scheduled. fn notify_polkadot_of_pending_upgrade(code: &[u8]) { storage::unhashed::put_raw(NEW_VALIDATION_CODE, code); - DidSetValidationCode::put(true); + >::put(true); } /// Put a new validation function into a particular location where this @@ -685,7 +802,7 @@ impl Module { /// /// Returns `None` if the relay chain parachain host configuration hasn't been submitted yet. pub fn max_code_size() -> Option { - HostConfiguration::get().map(|cfg| cfg.max_code_size) + >::get().map(|cfg| cfg.max_code_size) } /// Returns if a PVF/runtime upgrade could be signalled at the current block, and if so @@ -694,13 +811,14 @@ impl Module { vfp: &PersistedValidationData, cfg: &AbridgedHostConfiguration, ) -> Option { - if PendingRelayChainBlockNumber::get().is_some() { + if >::get().is_some() { // There is already upgrade scheduled. Upgrade is not allowed. return None; } - let relay_blocks_since_last_upgrade = - vfp.relay_parent_number.saturating_sub(LastUpgrade::get()); + let relay_blocks_since_last_upgrade = vfp + .relay_parent_number + .saturating_sub(>::get()); if relay_blocks_since_last_upgrade <= cfg.validation_upgrade_frequency { // The cooldown after the last upgrade hasn't elapsed yet. Upgrade is not allowed. @@ -713,11 +831,12 @@ impl Module { /// The implementation of the runtime upgrade functionality for parachains. fn set_code_impl(validation_function: Vec) -> DispatchResult { ensure!( - !PendingValidationFunction::exists(), + !>::exists(), Error::::OverlappingUpgrades ); let vfp = Self::validation_data().ok_or(Error::::ValidationDataNotAvailable)?; - let cfg = Self::host_configuration().ok_or(Error::::HostConfigurationNotAvailable)?; + let cfg = + Self::host_configuration().ok_or(Error::::HostConfigurationNotAvailable)?; ensure!( validation_function.len() <= cfg.max_code_size as usize, Error::::TooBig @@ -733,9 +852,9 @@ impl Module { // storage keeps track locally for the parachain upgrade, which will // be applied later. Self::notify_polkadot_of_pending_upgrade(&validation_function); - PendingRelayChainBlockNumber::put(apply_block); - PendingValidationFunction::put(validation_function); - Self::deposit_event(RawEvent::ValidationFunctionStored(apply_block)); + >::put(apply_block); + >::put(validation_function); + Self::deposit_event(Event::ValidationFunctionStored(apply_block)); Ok(()) } @@ -745,14 +864,14 @@ pub struct ParachainSetCode(sp_std::marker::PhantomData); impl frame_system::SetCode for ParachainSetCode { fn set_code(code: Vec) -> DispatchResult { - Module::::set_code_impl(code) + Pallet::::set_code_impl(code) } } /// This struct provides ability to extend a message queue chain (MQC) and compute a new head. /// -/// MQC is an instance of a [hash chain] applied to a message queue. Using a hash chain it's possible -/// to represent a sequence of messages using only a single hash. +/// MQC is an instance of a [hash chain] applied to a message queue. Using a hash chain it's +/// possible to represent a sequence of messages using only a single hash. /// /// A head for an empty chain is agreed to be a zero hash. /// @@ -786,7 +905,7 @@ impl MessageQueueChain { } } -impl Module { +impl Pallet { pub fn send_upward_message(message: UpwardMessage) -> Result { // Check if the message fits into the relay-chain constraints. // @@ -811,987 +930,20 @@ impl Module { // then it must be that this is an edge-case where a message is attempted to be // sent at the first block. // - // Let's pass this message through. I think it's not unreasonable to expect that the - // message is not huge and it comes through, but if it doesn't it can be returned - // back to the sender. + // Let's pass this message through. I think it's not unreasonable to expect that + // the message is not huge and it comes through, but if it doesn't it can be + // returned back to the sender. // // Thus fall through here. } }; - ::PendingUpwardMessages::append(message); + >::append(message); Ok(0) } } -impl UpwardMessageSender for Module { +impl UpwardMessageSender for Pallet { fn send_upward_message(message: UpwardMessage) -> Result { Self::send_upward_message(message) } } - -impl ProvideInherent for Module { - type Call = Call; - type Error = sp_inherents::MakeFatalError<()>; - const INHERENT_IDENTIFIER: InherentIdentifier = - cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER; - - fn create_inherent(data: &InherentData) -> Option { - let data: ParachainInherentData = data - .get_data(&Self::INHERENT_IDENTIFIER) - .ok() - .flatten() - .expect("validation function params are always injected into inherent data; qed"); - - Some(Call::set_validation_data(data)) - } - - fn is_inherent(call: &Self::Call) -> bool { - matches!(call, Call::set_validation_data(_)) - } -} - -decl_event! { - pub enum Event where Hash = ::Hash { - /// The validation function has been scheduled to apply as of the contained relay chain block number. - ValidationFunctionStored(RelayChainBlockNumber), - /// The validation function was applied as of the contained relay chain block number. - ValidationFunctionApplied(RelayChainBlockNumber), - /// An upgrade has been authorized. - UpgradeAuthorized(Hash), - /// Some downward messages have been received and will be processed. - /// \[ count \] - DownwardMessagesReceived(u32), - /// Downward messages were processed using the given weight. - /// \[ weight_used, result_mqc_head \] - DownwardMessagesProcessed(Weight, relay_chain::Hash), - } -} - -decl_error! { - pub enum Error for Module { - /// Attempt to upgrade validation function while existing upgrade pending - OverlappingUpgrades, - /// Polkadot currently prohibits this parachain from upgrading its validation function - ProhibitedByPolkadot, - /// The supplied validation function has compiled into a blob larger than Polkadot is willing to run - TooBig, - /// The inherent which supplies the validation data did not run this block - ValidationDataNotAvailable, - /// The inherent which supplies the host configuration did not run this block - HostConfigurationNotAvailable, - /// No validation function upgrade is currently scheduled. - NotScheduled, - /// No code upgrade has been authorized. - NothingAuthorized, - /// The given code upgrade has not been authorized. - Unauthorized, - } -} - -/// tests for this pallet -#[cfg(test)] -mod tests { - use super::*; - - use codec::Encode; - use cumulus_primitives_core::{ - relay_chain::BlockNumber as RelayBlockNumber, AbridgedHrmpChannel, InboundDownwardMessage, - InboundHrmpMessage, PersistedValidationData, - }; - use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder; - use frame_support::{ - assert_ok, - dispatch::UnfilteredDispatchable, - parameter_types, - traits::{OnFinalize, OnInitialize}, - }; - use frame_system::{InitKind, RawOrigin}; - use hex_literal::hex; - use relay_chain::v1::HrmpChannelId; - use sp_core::H256; - use sp_runtime::{testing::Header, traits::IdentityLookup}; - use sp_version::RuntimeVersion; - use std::cell::RefCell; - - use crate as parachain_system; - - type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic; - type Block = frame_system::mocking::MockBlock; - - frame_support::construct_runtime!( - pub enum Test where - Block = Block, - NodeBlock = Block, - UncheckedExtrinsic = UncheckedExtrinsic, - { - System: frame_system::{Pallet, Call, Config, Storage, Event}, - ParachainSystem: parachain_system::{Pallet, Call, Storage, Event}, - } - ); - - parameter_types! { - pub const BlockHashCount: u64 = 250; - pub Version: RuntimeVersion = RuntimeVersion { - spec_name: sp_version::create_runtime_str!("test"), - impl_name: sp_version::create_runtime_str!("system-test"), - authoring_version: 1, - spec_version: 1, - impl_version: 1, - apis: sp_version::create_apis_vec!([]), - transaction_version: 1, - }; - pub const ParachainId: ParaId = ParaId::new(200); - pub const ReservedXcmpWeight: Weight = 0; - pub const ReservedDmpWeight: Weight = 0; - } - impl frame_system::Config for Test { - type Origin = Origin; - type Call = Call; - type Index = u64; - type BlockNumber = u64; - type Hash = H256; - type Hashing = BlakeTwo256; - type AccountId = u64; - type Lookup = IdentityLookup; - type Header = Header; - type Event = Event; - type BlockHashCount = BlockHashCount; - type BlockLength = (); - type BlockWeights = (); - type Version = Version; - type PalletInfo = PalletInfo; - type AccountData = (); - type OnNewAccount = (); - type OnKilledAccount = (); - type DbWeight = (); - type BaseCallFilter = (); - type SystemWeightInfo = (); - type SS58Prefix = (); - type OnSetCode = ParachainSetCode; - } - impl Config for Test { - type Event = Event; - type OnValidationData = (); - type SelfParaId = ParachainId; - type OutboundXcmpMessageSource = FromThreadLocal; - type DmpMessageHandler = SaveIntoThreadLocal; - type ReservedDmpWeight = ReservedDmpWeight; - type XcmpMessageHandler = SaveIntoThreadLocal; - type ReservedXcmpWeight = ReservedXcmpWeight; - } - - pub struct FromThreadLocal; - pub struct SaveIntoThreadLocal; - - std::thread_local! { - static HANDLED_DMP_MESSAGES: RefCell)>> = RefCell::new(Vec::new()); - static HANDLED_XCMP_MESSAGES: RefCell)>> = RefCell::new(Vec::new()); - static SENT_MESSAGES: RefCell)>> = RefCell::new(Vec::new()); - } - - fn send_message(dest: ParaId, message: Vec) { - SENT_MESSAGES.with(|m| m.borrow_mut().push((dest, message))); - } - - impl XcmpMessageSource for FromThreadLocal { - fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec)> { - let mut ids = std::collections::BTreeSet::::new(); - let mut taken = 0; - let mut result = Vec::new(); - SENT_MESSAGES.with(|ms| { - ms.borrow_mut().retain(|m| { - let status = as GetChannelInfo>::get_channel_status(m.0); - let ready = matches!(status, ChannelStatus::Ready(..)); - if ready && !ids.contains(&m.0) && taken < maximum_channels { - ids.insert(m.0); - taken += 1; - result.push(m.clone()); - false - } else { - true - } - }) - }); - result - } - } - - impl DmpMessageHandler for SaveIntoThreadLocal { - fn handle_dmp_messages( - iter: impl Iterator)>, - _max_weight: Weight, - ) -> Weight { - HANDLED_DMP_MESSAGES.with(|m| { - for i in iter { - m.borrow_mut().push(i); - } - 0 - }) - } - } - - impl XcmpMessageHandler for SaveIntoThreadLocal { - fn handle_xcmp_messages<'a, I: Iterator>( - iter: I, - _max_weight: Weight, - ) -> Weight { - HANDLED_XCMP_MESSAGES.with(|m| { - for (sender, sent_at, message) in iter { - m.borrow_mut().push((sender, sent_at, message.to_vec())); - } - 0 - }) - } - } - - // This function basically just builds a genesis storage key/value store according to - // our desired mockup. - fn new_test_ext() -> sp_io::TestExternalities { - HANDLED_DMP_MESSAGES.with(|m| m.borrow_mut().clear()); - HANDLED_XCMP_MESSAGES.with(|m| m.borrow_mut().clear()); - - frame_system::GenesisConfig::default() - .build_storage::() - .unwrap() - .into() - } - - struct CallInWasm(Vec); - - impl sp_core::traits::CallInWasm for CallInWasm { - fn call_in_wasm( - &self, - _wasm_code: &[u8], - _code_hash: Option>, - _method: &str, - _call_data: &[u8], - _ext: &mut dyn sp_externalities::Externalities, - _missing_host_functions: sp_core::traits::MissingHostFunctions, - ) -> Result, String> { - Ok(self.0.clone()) - } - } - - fn wasm_ext() -> sp_io::TestExternalities { - let version = RuntimeVersion { - spec_name: "test".into(), - spec_version: 2, - impl_version: 1, - ..Default::default() - }; - let call_in_wasm = CallInWasm(version.encode()); - - let mut ext = new_test_ext(); - ext.register_extension(sp_core::traits::CallInWasmExt::new(call_in_wasm)); - ext - } - - struct BlockTest { - n: ::BlockNumber, - within_block: Box, - after_block: Option>, - } - - /// BlockTests exist to test blocks with some setup: we have to assume that - /// `validate_block` will mutate and check storage in certain predictable - /// ways, for example, and we want to always ensure that tests are executed - /// in the context of some particular block number. - #[derive(Default)] - struct BlockTests { - tests: Vec, - pending_upgrade: Option, - ran: bool, - relay_sproof_builder_hook: - Option>, - persisted_validation_data_hook: - Option>, - inherent_data_hook: - Option>, - } - - impl BlockTests { - fn new() -> BlockTests { - Default::default() - } - - fn add_raw(mut self, test: BlockTest) -> Self { - self.tests.push(test); - self - } - - fn add(self, n: ::BlockNumber, within_block: F) -> Self - where - F: 'static + Fn(), - { - self.add_raw(BlockTest { - n, - within_block: Box::new(within_block), - after_block: None, - }) - } - - fn add_with_post_test( - self, - n: ::BlockNumber, - within_block: F1, - after_block: F2, - ) -> Self - where - F1: 'static + Fn(), - F2: 'static + Fn(), - { - self.add_raw(BlockTest { - n, - within_block: Box::new(within_block), - after_block: Some(Box::new(after_block)), - }) - } - - fn with_relay_sproof_builder(mut self, f: F) -> Self - where - F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut RelayStateSproofBuilder), - { - self.relay_sproof_builder_hook = Some(Box::new(f)); - self - } - - #[allow(dead_code)] // might come in handy in future. If now is future and it still hasn't - feel free. - fn with_validation_data(mut self, f: F) -> Self - where - F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut PersistedValidationData), - { - self.persisted_validation_data_hook = Some(Box::new(f)); - self - } - - fn with_inherent_data(mut self, f: F) -> Self - where - F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut ParachainInherentData), - { - self.inherent_data_hook = Some(Box::new(f)); - self - } - - fn run(&mut self) { - self.ran = true; - wasm_ext().execute_with(|| { - for BlockTest { - n, - within_block, - after_block, - } in self.tests.iter() - { - // clear pending updates, as applicable - if let Some(upgrade_block) = self.pending_upgrade { - if n >= &upgrade_block.into() { - self.pending_upgrade = None; - } - } - - // begin initialization - System::initialize( - &n, - &Default::default(), - &Default::default(), - InitKind::Full, - ); - - // now mess with the storage the way validate_block does - let mut sproof_builder = RelayStateSproofBuilder::default(); - if let Some(ref hook) = self.relay_sproof_builder_hook { - hook(self, *n as RelayChainBlockNumber, &mut sproof_builder); - } - let (relay_parent_storage_root, relay_chain_state) = - sproof_builder.into_state_root_and_proof(); - let mut vfp = PersistedValidationData { - relay_parent_number: *n as RelayChainBlockNumber, - relay_parent_storage_root, - ..Default::default() - }; - if let Some(ref hook) = self.persisted_validation_data_hook { - hook(self, *n as RelayChainBlockNumber, &mut vfp); - } - - ValidationData::put(&vfp); - storage::unhashed::kill(NEW_VALIDATION_CODE); - - // It is insufficient to push the validation function params - // to storage; they must also be included in the inherent data. - let inherent_data = { - let mut inherent_data = InherentData::default(); - let mut system_inherent_data = ParachainInherentData { - validation_data: vfp.clone(), - relay_chain_state, - downward_messages: Default::default(), - horizontal_messages: Default::default(), - }; - if let Some(ref hook) = self.inherent_data_hook { - hook(self, *n as RelayChainBlockNumber, &mut system_inherent_data); - } - inherent_data - .put_data( - cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER, - &system_inherent_data, - ) - .expect("failed to put VFP inherent"); - inherent_data - }; - - // execute the block - ParachainSystem::on_initialize(*n); - ParachainSystem::create_inherent(&inherent_data) - .expect("got an inherent") - .dispatch_bypass_filter(RawOrigin::None.into()) - .expect("dispatch succeeded"); - within_block(); - ParachainSystem::on_finalize(*n); - - // did block execution set new validation code? - if storage::unhashed::exists(NEW_VALIDATION_CODE) { - if self.pending_upgrade.is_some() { - panic!("attempted to set validation code while upgrade was pending"); - } - } - - // clean up - System::finalize(); - if let Some(after_block) = after_block { - after_block(); - } - } - }); - } - } - - impl Drop for BlockTests { - fn drop(&mut self) { - if !self.ran { - self.run(); - } - } - } - - #[test] - #[should_panic] - fn block_tests_run_on_drop() { - BlockTests::new().add(123, || { - panic!("if this test passes, block tests run properly") - }); - } - - #[test] - fn events() { - BlockTests::new() - .with_relay_sproof_builder(|_, _, builder| { - builder.host_config.validation_upgrade_delay = 1000; - }) - .add_with_post_test( - 123, - || { - assert_ok!(System::set_code(RawOrigin::Root.into(), Default::default())); - }, - || { - let events = System::events(); - assert_eq!( - events[0].event, - Event::parachain_system( - crate::RawEvent::ValidationFunctionStored(1123).into() - ) - ); - }, - ) - .add_with_post_test( - 1234, - || {}, - || { - let events = System::events(); - assert_eq!( - events[0].event, - Event::parachain_system( - crate::RawEvent::ValidationFunctionApplied(1234).into() - ) - ); - }, - ); - } - - #[test] - fn non_overlapping() { - BlockTests::new() - .with_relay_sproof_builder(|_, _, builder| { - builder.host_config.validation_upgrade_delay = 1000; - }) - .add(123, || { - assert_ok!(System::set_code(RawOrigin::Root.into(), Default::default())); - }) - .add(234, || { - assert_eq!( - System::set_code(RawOrigin::Root.into(), Default::default()), - Err(Error::::OverlappingUpgrades.into()), - ) - }); - } - - #[test] - fn manipulates_storage() { - BlockTests::new() - .add(123, || { - assert!( - !PendingValidationFunction::exists(), - "validation function must not exist yet" - ); - assert_ok!(System::set_code(RawOrigin::Root.into(), Default::default())); - assert!( - PendingValidationFunction::exists(), - "validation function must now exist" - ); - }) - .add_with_post_test( - 1234, - || {}, - || { - assert!( - !PendingValidationFunction::exists(), - "validation function must have been unset" - ); - }, - ); - } - - #[test] - fn checks_size() { - BlockTests::new() - .with_relay_sproof_builder(|_, _, builder| { - builder.host_config.max_code_size = 8; - }) - .add(123, || { - assert_eq!( - System::set_code(RawOrigin::Root.into(), vec![0; 64]), - Err(Error::::TooBig.into()), - ); - }); - } - - #[test] - fn send_upward_message_num_per_candidate() { - BlockTests::new() - .with_relay_sproof_builder(|_, _, sproof| { - sproof.host_config.max_upward_message_num_per_candidate = 1; - sproof.relay_dispatch_queue_size = None; - }) - .add_with_post_test( - 1, - || { - ParachainSystem::send_upward_message(b"Mr F was here".to_vec()).unwrap(); - ParachainSystem::send_upward_message(b"message 2".to_vec()).unwrap(); - }, - || { - let v: Option>> = - storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); - assert_eq!(v, Some(vec![b"Mr F was here".to_vec()]),); - }, - ) - .add_with_post_test( - 2, - || { /* do nothing within block */ }, - || { - let v: Option>> = - storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); - assert_eq!(v, Some(vec![b"message 2".to_vec()]),); - }, - ); - } - - #[test] - fn send_upward_message_relay_bottleneck() { - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| { - sproof.host_config.max_upward_message_num_per_candidate = 2; - sproof.host_config.max_upward_queue_count = 5; - - match relay_block_num { - 1 => sproof.relay_dispatch_queue_size = Some((5, 0)), - 2 => sproof.relay_dispatch_queue_size = Some((4, 0)), - _ => unreachable!(), - } - }) - .add_with_post_test( - 1, - || { - ParachainSystem::send_upward_message(vec![0u8; 8]).unwrap(); - }, - || { - // The message won't be sent because there is already one message in queue. - let v: Option>> = - storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); - assert_eq!(v, Some(vec![]),); - }, - ) - .add_with_post_test( - 2, - || { /* do nothing within block */ }, - || { - let v: Option>> = - storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); - assert_eq!(v, Some(vec![vec![0u8; 8]]),); - }, - ); - } - - #[test] - fn send_hrmp_message_buffer_channel_close() { - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| { - // - // Base case setup - // - sproof.para_id = ParaId::from(200); - sproof.hrmp_egress_channel_index = Some(vec![ParaId::from(300), ParaId::from(400)]); - sproof.hrmp_channels.insert( - HrmpChannelId { - sender: ParaId::from(200), - recipient: ParaId::from(300), - }, - AbridgedHrmpChannel { - max_capacity: 1, - msg_count: 1, // <- 1/1 means the channel is full - max_total_size: 1024, - max_message_size: 8, - total_size: 0, - mqc_head: Default::default(), - }, - ); - sproof.hrmp_channels.insert( - HrmpChannelId { - sender: ParaId::from(200), - recipient: ParaId::from(400), - }, - AbridgedHrmpChannel { - max_capacity: 1, - msg_count: 1, - max_total_size: 1024, - max_message_size: 8, - total_size: 0, - mqc_head: Default::default(), - }, - ); - - // - // Adjustment according to block - // - match relay_block_num { - 1 => {} - 2 => {} - 3 => { - // The channel 200->400 ceases to exist at the relay chain block 3 - sproof - .hrmp_egress_channel_index - .as_mut() - .unwrap() - .retain(|n| n != &ParaId::from(400)); - sproof.hrmp_channels.remove(&HrmpChannelId { - sender: ParaId::from(200), - recipient: ParaId::from(400), - }); - - // We also free up space for a message in the 200->300 channel. - sproof - .hrmp_channels - .get_mut(&HrmpChannelId { - sender: ParaId::from(200), - recipient: ParaId::from(300), - }) - .unwrap() - .msg_count = 0; - } - _ => unreachable!(), - } - }) - .add_with_post_test( - 1, - || { - send_message(ParaId::from(300), b"1".to_vec()); - send_message(ParaId::from(400), b"2".to_vec()); - }, - || {}, - ) - .add_with_post_test( - 2, - || {}, - || { - // both channels are at capacity so we do not expect any messages. - let v: Option> = - storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES); - assert_eq!(v, Some(vec![])); - }, - ) - .add_with_post_test( - 3, - || {}, - || { - let v: Option> = - storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES); - assert_eq!( - v, - Some(vec![OutboundHrmpMessage { - recipient: ParaId::from(300), - data: b"1".to_vec(), - }]) - ); - }, - ); - } - - #[test] - fn message_queue_chain() { - assert_eq!(MessageQueueChain::default().head(), H256::zero()); - - // Note that the resulting hashes are the same for HRMP and DMP. That's because even though - // the types are nominally different, they have the same structure and computation of the - // new head doesn't differ. - // - // These cases are taken from https://github.com/paritytech/polkadot/pull/2351 - assert_eq!( - MessageQueueChain::default() - .extend_downward(&InboundDownwardMessage { - sent_at: 2, - msg: vec![1, 2, 3], - }) - .extend_downward(&InboundDownwardMessage { - sent_at: 3, - msg: vec![4, 5, 6], - }) - .head(), - hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), - ); - assert_eq!( - MessageQueueChain::default() - .extend_hrmp(&InboundHrmpMessage { - sent_at: 2, - data: vec![1, 2, 3], - }) - .extend_hrmp(&InboundHrmpMessage { - sent_at: 3, - data: vec![4, 5, 6], - }) - .head(), - hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), - ); - } - - #[test] - fn receive_dmp() { - lazy_static::lazy_static! { - static ref MSG: InboundDownwardMessage = InboundDownwardMessage { - sent_at: 1, - msg: b"down".to_vec(), - }; - } - - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { - 1 => { - sproof.dmq_mqc_head = - Some(MessageQueueChain::default().extend_downward(&MSG).head()); - } - _ => unreachable!(), - }) - .with_inherent_data(|_, relay_block_num, data| match relay_block_num { - 1 => { - data.downward_messages.push(MSG.clone()); - } - _ => unreachable!(), - }) - .add(1, || { - HANDLED_DMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!(&*m, &[(MSG.sent_at, MSG.msg.clone())]); - m.clear(); - }); - }); - } - - #[test] - fn receive_hrmp() { - lazy_static::lazy_static! { - static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 1, - data: b"1".to_vec(), - }; - - static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 1, - data: b"2".to_vec(), - }; - - static ref MSG_3: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 2, - data: b"3".to_vec(), - }; - - static ref MSG_4: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 2, - data: b"4".to_vec(), - }; - } - - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { - 1 => { - // 200 - doesn't exist yet - // 300 - one new message - sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); - } - 2 => { - // 200 - two new messages - // 300 - now present with one message. - sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head()); - sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = Some( - MessageQueueChain::default() - .extend_hrmp(&MSG_1) - .extend_hrmp(&MSG_2) - .extend_hrmp(&MSG_3) - .head(), - ); - } - 3 => { - // 200 - no new messages - // 300 - is gone - sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head()); - } - _ => unreachable!(), - }) - .with_inherent_data(|_, relay_block_num, data| match relay_block_num { - 1 => { - data.horizontal_messages - .insert(ParaId::from(300), vec![MSG_1.clone()]); - } - 2 => { - data.horizontal_messages.insert( - ParaId::from(300), - vec![ - // can't be sent at the block 1 actually. However, we cheat here - // because we want to test the case where there are multiple messages - // but the harness at the moment doesn't support block skipping. - MSG_2.clone(), - MSG_3.clone(), - ], - ); - data.horizontal_messages - .insert(ParaId::from(200), vec![MSG_4.clone()]); - } - 3 => {} - _ => unreachable!(), - }) - .add(1, || { - HANDLED_XCMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!(&*m, &[(ParaId::from(300), 1, b"1".to_vec())]); - m.clear(); - }); - }) - .add(2, || { - HANDLED_XCMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!( - &*m, - &[ - (ParaId::from(300), 1, b"2".to_vec()), - (ParaId::from(200), 2, b"4".to_vec()), - (ParaId::from(300), 2, b"3".to_vec()), - ] - ); - m.clear(); - }); - }) - .add(3, || {}); - } - - #[test] - fn receive_hrmp_empty_channel() { - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { - 1 => { - // no channels - } - 2 => { - // one new channel - sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = - Some(MessageQueueChain::default().head()); - } - _ => unreachable!(), - }) - .add(1, || {}) - .add(2, || {}); - } - - #[test] - fn receive_hrmp_after_pause() { - lazy_static::lazy_static! { - static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 1, - data: b"mikhailinvanovich".to_vec(), - }; - - static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 3, - data: b"1000000000".to_vec(), - }; - } - - const ALICE: ParaId = ParaId::new(300); - - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { - 1 => { - sproof.upsert_inbound_channel(ALICE).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); - } - 2 => { - // 300 - no new messages, mqc stayed the same. - sproof.upsert_inbound_channel(ALICE).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); - } - 3 => { - // 300 - new message. - sproof.upsert_inbound_channel(ALICE).mqc_head = Some( - MessageQueueChain::default() - .extend_hrmp(&MSG_1) - .extend_hrmp(&MSG_2) - .head(), - ); - } - _ => unreachable!(), - }) - .with_inherent_data(|_, relay_block_num, data| match relay_block_num { - 1 => { - data.horizontal_messages.insert(ALICE, vec![MSG_1.clone()]); - } - 2 => { - // no new messages - } - 3 => { - data.horizontal_messages.insert(ALICE, vec![MSG_2.clone()]); - } - _ => unreachable!(), - }) - .add(1, || { - HANDLED_XCMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!(&*m, &[(ALICE, 1, b"mikhailinvanovich".to_vec())]); - m.clear(); - }); - }) - .add(2, || {}) - .add(3, || { - HANDLED_XCMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!(&*m, &[(ALICE, 3, b"1000000000".to_vec())]); - m.clear(); - }); - }); - } -} diff --git a/cumulus/pallets/parachain-system/src/tests.rs b/cumulus/pallets/parachain-system/src/tests.rs new file mode 100755 index 0000000000..57c420f74c --- /dev/null +++ b/cumulus/pallets/parachain-system/src/tests.rs @@ -0,0 +1,935 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . +use super::*; + +use codec::Encode; +use cumulus_primitives_core::{ + AbridgedHrmpChannel, InboundDownwardMessage, InboundHrmpMessage, PersistedValidationData, + relay_chain::BlockNumber as RelayBlockNumber, +}; +use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder; +use frame_support::{ + assert_ok, + dispatch::UnfilteredDispatchable, + parameter_types, + storage, + traits::{OnFinalize, OnInitialize}, + weights::Weight, + inherent::{InherentData, ProvideInherent}, +}; +use frame_system::{InitKind, RawOrigin}; +use hex_literal::hex; +use relay_chain::v1::HrmpChannelId; +use sp_core::H256; +use sp_runtime::{testing::Header, traits::IdentityLookup}; +use sp_version::RuntimeVersion; +use std::cell::RefCell; + +use crate as parachain_system; + +type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic; +type Block = frame_system::mocking::MockBlock; + +frame_support::construct_runtime!( + pub enum Test where + Block = Block, + NodeBlock = Block, + UncheckedExtrinsic = UncheckedExtrinsic, + { + System: frame_system::{Pallet, Call, Config, Storage, Event}, + ParachainSystem: parachain_system::{Pallet, Call, Storage, Event}, + } +); + +parameter_types! { + pub const BlockHashCount: u64 = 250; + pub Version: RuntimeVersion = RuntimeVersion { + spec_name: sp_version::create_runtime_str!("test"), + impl_name: sp_version::create_runtime_str!("system-test"), + authoring_version: 1, + spec_version: 1, + impl_version: 1, + apis: sp_version::create_apis_vec!([]), + transaction_version: 1, + }; + pub const ParachainId: ParaId = ParaId::new(200); + pub const ReservedXcmpWeight: Weight = 0; + pub const ReservedDmpWeight: Weight = 0; +} +impl frame_system::Config for Test { + type Origin = Origin; + type Call = Call; + type Index = u64; + type BlockNumber = u64; + type Hash = H256; + type Hashing = BlakeTwo256; + type AccountId = u64; + type Lookup = IdentityLookup; + type Header = Header; + type Event = Event; + type BlockHashCount = BlockHashCount; + type BlockLength = (); + type BlockWeights = (); + type Version = Version; + type PalletInfo = PalletInfo; + type AccountData = (); + type OnNewAccount = (); + type OnKilledAccount = (); + type DbWeight = (); + type BaseCallFilter = (); + type SystemWeightInfo = (); + type SS58Prefix = (); + type OnSetCode = ParachainSetCode; +} +impl Config for Test { + type Event = Event; + type OnValidationData = (); + type SelfParaId = ParachainId; + type OutboundXcmpMessageSource = FromThreadLocal; + type DmpMessageHandler = SaveIntoThreadLocal; + type ReservedDmpWeight = ReservedDmpWeight; + type XcmpMessageHandler = SaveIntoThreadLocal; + type ReservedXcmpWeight = ReservedXcmpWeight; +} + +pub struct FromThreadLocal; +pub struct SaveIntoThreadLocal; + +std::thread_local! { + static HANDLED_DMP_MESSAGES: RefCell)>> = RefCell::new(Vec::new()); + static HANDLED_XCMP_MESSAGES: RefCell)>> = RefCell::new(Vec::new()); + static SENT_MESSAGES: RefCell)>> = RefCell::new(Vec::new()); +} + +fn send_message( + dest: ParaId, + message: Vec, +) { + SENT_MESSAGES.with(|m| m.borrow_mut().push((dest, message))); +} + +impl XcmpMessageSource for FromThreadLocal { + fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec)> { + let mut ids = std::collections::BTreeSet::::new(); + let mut taken = 0; + let mut result = Vec::new(); + SENT_MESSAGES.with(|ms| ms.borrow_mut() + .retain(|m| { + let status = as GetChannelInfo>::get_channel_status(m.0); + let ready = matches!(status, ChannelStatus::Ready(..)); + if ready && !ids.contains(&m.0) && taken < maximum_channels { + ids.insert(m.0); + taken += 1; + result.push(m.clone()); + false + } else { + true + } + }) + ); + result + } +} + +impl DmpMessageHandler for SaveIntoThreadLocal { + fn handle_dmp_messages( + iter: impl Iterator)>, + _max_weight: Weight, + ) -> Weight { + HANDLED_DMP_MESSAGES.with(|m| { + for i in iter { + m.borrow_mut().push(i); + } + 0 + }) + } +} + +impl XcmpMessageHandler for SaveIntoThreadLocal { + fn handle_xcmp_messages<'a, I: Iterator>( + iter: I, + _max_weight: Weight, + ) -> Weight { + HANDLED_XCMP_MESSAGES.with(|m| { + for (sender, sent_at, message) in iter { + m.borrow_mut().push((sender, sent_at, message.to_vec())); + } + 0 + }) + } +} + +// This function basically just builds a genesis storage key/value store according to +// our desired mockup. +fn new_test_ext() -> sp_io::TestExternalities { + HANDLED_DMP_MESSAGES.with(|m| m.borrow_mut().clear()); + HANDLED_XCMP_MESSAGES.with(|m| m.borrow_mut().clear()); + + frame_system::GenesisConfig::default() + .build_storage::() + .unwrap() + .into() +} + +struct CallInWasm(Vec); + +impl sp_core::traits::CallInWasm for CallInWasm { + fn call_in_wasm( + &self, + _wasm_code: &[u8], + _code_hash: Option>, + _method: &str, + _call_data: &[u8], + _ext: &mut dyn sp_externalities::Externalities, + _missing_host_functions: sp_core::traits::MissingHostFunctions, + ) -> Result, String> { + Ok(self.0.clone()) + } +} + +fn wasm_ext() -> sp_io::TestExternalities { + let version = RuntimeVersion { + spec_name: "test".into(), + spec_version: 2, + impl_version: 1, + ..Default::default() + }; + let call_in_wasm = CallInWasm(version.encode()); + + let mut ext = new_test_ext(); + ext.register_extension(sp_core::traits::CallInWasmExt::new(call_in_wasm)); + ext +} + +struct BlockTest { + n: ::BlockNumber, + within_block: Box, + after_block: Option>, +} + +/// BlockTests exist to test blocks with some setup: we have to assume that +/// `validate_block` will mutate and check storage in certain predictable +/// ways, for example, and we want to always ensure that tests are executed +/// in the context of some particular block number. +#[derive(Default)] +struct BlockTests { + tests: Vec, + pending_upgrade: Option, + ran: bool, + relay_sproof_builder_hook: + Option>, + persisted_validation_data_hook: + Option>, + inherent_data_hook: + Option>, +} + +impl BlockTests { + fn new() -> BlockTests { + Default::default() + } + + fn add_raw(mut self, test: BlockTest) -> Self { + self.tests.push(test); + self + } + + fn add(self, n: ::BlockNumber, within_block: F) -> Self + where + F: 'static + Fn(), + { + self.add_raw(BlockTest { + n, + within_block: Box::new(within_block), + after_block: None, + }) + } + + fn add_with_post_test( + self, + n: ::BlockNumber, + within_block: F1, + after_block: F2, + ) -> Self + where + F1: 'static + Fn(), + F2: 'static + Fn(), + { + self.add_raw(BlockTest { + n, + within_block: Box::new(within_block), + after_block: Some(Box::new(after_block)), + }) + } + + fn with_relay_sproof_builder(mut self, f: F) -> Self + where + F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut RelayStateSproofBuilder), + { + self.relay_sproof_builder_hook = Some(Box::new(f)); + self + } + + #[allow(dead_code)] // might come in handy in future. If now is future and it still hasn't - feel free. + fn with_validation_data(mut self, f: F) -> Self + where + F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut PersistedValidationData), + { + self.persisted_validation_data_hook = Some(Box::new(f)); + self + } + + fn with_inherent_data(mut self, f: F) -> Self + where + F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut ParachainInherentData), + { + self.inherent_data_hook = Some(Box::new(f)); + self + } + + fn run(&mut self) { + self.ran = true; + wasm_ext().execute_with(|| { + for BlockTest { + n, + within_block, + after_block, + } in self.tests.iter() + { + // clear pending updates, as applicable + if let Some(upgrade_block) = self.pending_upgrade { + if n >= &upgrade_block.into() { + self.pending_upgrade = None; + } + } + + // begin initialization + System::initialize( + &n, + &Default::default(), + &Default::default(), + InitKind::Full, + ); + + // now mess with the storage the way validate_block does + let mut sproof_builder = RelayStateSproofBuilder::default(); + if let Some(ref hook) = self.relay_sproof_builder_hook { + hook(self, *n as RelayChainBlockNumber, &mut sproof_builder); + } + let (relay_parent_storage_root, relay_chain_state) = + sproof_builder.into_state_root_and_proof(); + let mut vfp = PersistedValidationData { + relay_parent_number: *n as RelayChainBlockNumber, + relay_parent_storage_root, + ..Default::default() + }; + if let Some(ref hook) = self.persisted_validation_data_hook { + hook(self, *n as RelayChainBlockNumber, &mut vfp); + } + + >::put(&vfp); + storage::unhashed::kill(NEW_VALIDATION_CODE); + + // It is insufficient to push the validation function params + // to storage; they must also be included in the inherent data. + let inherent_data = { + let mut inherent_data = InherentData::default(); + let mut system_inherent_data = ParachainInherentData { + validation_data: vfp.clone(), + relay_chain_state, + downward_messages: Default::default(), + horizontal_messages: Default::default(), + }; + if let Some(ref hook) = self.inherent_data_hook { + hook(self, *n as RelayChainBlockNumber, &mut system_inherent_data); + } + inherent_data + .put_data( + cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER, + &system_inherent_data, + ) + .expect("failed to put VFP inherent"); + inherent_data + }; + + // execute the block + ParachainSystem::on_initialize(*n); + ParachainSystem::create_inherent(&inherent_data) + .expect("got an inherent") + .dispatch_bypass_filter(RawOrigin::None.into()) + .expect("dispatch succeeded"); + within_block(); + ParachainSystem::on_finalize(*n); + + // did block execution set new validation code? + if storage::unhashed::exists(NEW_VALIDATION_CODE) { + if self.pending_upgrade.is_some() { + panic!("attempted to set validation code while upgrade was pending"); + } + } + + // clean up + System::finalize(); + if let Some(after_block) = after_block { + after_block(); + } + } + }); + } +} + +impl Drop for BlockTests { + fn drop(&mut self) { + if !self.ran { + self.run(); + } + } +} + +#[test] +#[should_panic] +fn block_tests_run_on_drop() { + BlockTests::new().add(123, || { + panic!("if this test passes, block tests run properly") + }); +} + +#[test] +fn events() { + BlockTests::new() + .with_relay_sproof_builder(|_, _, builder| { + builder.host_config.validation_upgrade_delay = 1000; + }) + .add_with_post_test( + 123, + || { + assert_ok!(System::set_code( + RawOrigin::Root.into(), + Default::default() + )); + }, + || { + let events = System::events(); + assert_eq!( + events[0].event, + Event::parachain_system(crate::Event::ValidationFunctionStored(1123).into()) + ); + }, + ) + .add_with_post_test( + 1234, + || {}, + || { + let events = System::events(); + assert_eq!( + events[0].event, + Event::parachain_system(crate::Event::ValidationFunctionApplied(1234).into()) + ); + }, + ); +} + +#[test] +fn non_overlapping() { + BlockTests::new() + .with_relay_sproof_builder(|_, _, builder| { + builder.host_config.validation_upgrade_delay = 1000; + }) + .add(123, || { + assert_ok!(System::set_code( + RawOrigin::Root.into(), + Default::default() + )); + }) + .add(234, || { + assert_eq!( + System::set_code(RawOrigin::Root.into(), Default::default()), + Err(Error::::OverlappingUpgrades.into()), + ) + }); +} + +#[test] +fn manipulates_storage() { + BlockTests::new() + .add(123, || { + assert!( + !>::exists(), + "validation function must not exist yet" + ); + assert_ok!(System::set_code( + RawOrigin::Root.into(), + Default::default() + )); + assert!( + >::exists(), + "validation function must now exist" + ); + }) + .add_with_post_test( + 1234, + || {}, + || { + assert!( + !>::exists(), + "validation function must have been unset" + ); + }, + ); +} + +#[test] +fn checks_size() { + BlockTests::new() + .with_relay_sproof_builder(|_, _, builder| { + builder.host_config.max_code_size = 8; + }) + .add(123, || { + assert_eq!( + System::set_code(RawOrigin::Root.into(), vec![0; 64]), + Err(Error::::TooBig.into()), + ); + }); +} + +#[test] +fn send_upward_message_num_per_candidate() { + BlockTests::new() + .with_relay_sproof_builder(|_, _, sproof| { + sproof.host_config.max_upward_message_num_per_candidate = 1; + sproof.relay_dispatch_queue_size = None; + }) + .add_with_post_test( + 1, + || { + ParachainSystem::send_upward_message(b"Mr F was here".to_vec()).unwrap(); + ParachainSystem::send_upward_message(b"message 2".to_vec()).unwrap(); + }, + || { + let v: Option>> = + storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); + assert_eq!(v, Some(vec![b"Mr F was here".to_vec()]),); + }, + ) + .add_with_post_test( + 2, + || { /* do nothing within block */ }, + || { + let v: Option>> = + storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); + assert_eq!(v, Some(vec![b"message 2".to_vec()]),); + }, + ); +} + +#[test] +fn send_upward_message_relay_bottleneck() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| { + sproof.host_config.max_upward_message_num_per_candidate = 2; + sproof.host_config.max_upward_queue_count = 5; + + match relay_block_num { + 1 => sproof.relay_dispatch_queue_size = Some((5, 0)), + 2 => sproof.relay_dispatch_queue_size = Some((4, 0)), + _ => unreachable!(), + } + }) + .add_with_post_test( + 1, + || { + ParachainSystem::send_upward_message(vec![0u8; 8]).unwrap(); + }, + || { + // The message won't be sent because there is already one message in queue. + let v: Option>> = + storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); + assert_eq!(v, Some(vec![]),); + }, + ) + .add_with_post_test( + 2, + || { /* do nothing within block */ }, + || { + let v: Option>> = + storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); + assert_eq!(v, Some(vec![vec![0u8; 8]]),); + }, + ); +} + +#[test] +fn send_hrmp_message_buffer_channel_close() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| { + // + // Base case setup + // + sproof.para_id = ParaId::from(200); + sproof.hrmp_egress_channel_index = Some(vec![ParaId::from(300), ParaId::from(400)]); + sproof.hrmp_channels.insert( + HrmpChannelId { + sender: ParaId::from(200), + recipient: ParaId::from(300), + }, + AbridgedHrmpChannel { + max_capacity: 1, + msg_count: 1, // <- 1/1 means the channel is full + max_total_size: 1024, + max_message_size: 8, + total_size: 0, + mqc_head: Default::default(), + }, + ); + sproof.hrmp_channels.insert( + HrmpChannelId { + sender: ParaId::from(200), + recipient: ParaId::from(400), + }, + AbridgedHrmpChannel { + max_capacity: 1, + msg_count: 1, + max_total_size: 1024, + max_message_size: 8, + total_size: 0, + mqc_head: Default::default(), + }, + ); + + // + // Adjustment according to block + // + match relay_block_num { + 1 => {} + 2 => {} + 3 => { + // The channel 200->400 ceases to exist at the relay chain block 3 + sproof + .hrmp_egress_channel_index + .as_mut() + .unwrap() + .retain(|n| n != &ParaId::from(400)); + sproof.hrmp_channels.remove(&HrmpChannelId { + sender: ParaId::from(200), + recipient: ParaId::from(400), + }); + + // We also free up space for a message in the 200->300 channel. + sproof + .hrmp_channels + .get_mut(&HrmpChannelId { + sender: ParaId::from(200), + recipient: ParaId::from(300), + }) + .unwrap() + .msg_count = 0; + } + _ => unreachable!(), + } + }) + .add_with_post_test( + 1, + || { + send_message( + ParaId::from(300), + b"1".to_vec(), + ); + send_message( + ParaId::from(400), + b"2".to_vec(), + ); + }, + || {}, + ) + .add_with_post_test( + 2, + || {}, + || { + // both channels are at capacity so we do not expect any messages. + let v: Option> = + storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES); + assert_eq!(v, Some(vec![])); + }, + ) + .add_with_post_test( + 3, + || {}, + || { + let v: Option> = + storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES); + assert_eq!( + v, + Some(vec![OutboundHrmpMessage { + recipient: ParaId::from(300), + data: b"1".to_vec(), + }]) + ); + }, + ); +} + +#[test] +fn message_queue_chain() { + assert_eq!(MessageQueueChain::default().head(), H256::zero()); + + // Note that the resulting hashes are the same for HRMP and DMP. That's because even though + // the types are nominally different, they have the same structure and computation of the + // new head doesn't differ. + // + // These cases are taken from https://github.com/paritytech/polkadot/pull/2351 + assert_eq!( + MessageQueueChain::default() + .extend_downward(&InboundDownwardMessage { + sent_at: 2, + msg: vec![1, 2, 3], + }) + .extend_downward(&InboundDownwardMessage { + sent_at: 3, + msg: vec![4, 5, 6], + }) + .head(), + hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), + ); + assert_eq!( + MessageQueueChain::default() + .extend_hrmp(&InboundHrmpMessage { + sent_at: 2, + data: vec![1, 2, 3], + }) + .extend_hrmp(&InboundHrmpMessage { + sent_at: 3, + data: vec![4, 5, 6], + }) + .head(), + hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), + ); +} + +#[test] +fn receive_dmp() { + lazy_static::lazy_static! { + static ref MSG: InboundDownwardMessage = InboundDownwardMessage { + sent_at: 1, + msg: b"down".to_vec(), + }; + } + + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + sproof.dmq_mqc_head = + Some(MessageQueueChain::default().extend_downward(&MSG).head()); + } + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.downward_messages.push(MSG.clone()); + } + _ => unreachable!(), + }) + .add(1, || { + HANDLED_DMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(MSG.sent_at, MSG.msg.clone())]); + m.clear(); + }); + }); +} + +#[test] +fn receive_hrmp() { + lazy_static::lazy_static! { + static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 1, + data: b"1".to_vec(), + }; + + static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 1, + data: b"2".to_vec(), + }; + + static ref MSG_3: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 2, + data: b"3".to_vec(), + }; + + static ref MSG_4: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 2, + data: b"4".to_vec(), + }; + } + + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + // 200 - doesn't exist yet + // 300 - one new message + sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); + } + 2 => { + // 200 - two new messages + // 300 - now present with one message. + sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head()); + sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = Some( + MessageQueueChain::default() + .extend_hrmp(&MSG_1) + .extend_hrmp(&MSG_2) + .extend_hrmp(&MSG_3) + .head(), + ); + } + 3 => { + // 200 - no new messages + // 300 - is gone + sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head()); + } + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.horizontal_messages + .insert(ParaId::from(300), vec![MSG_1.clone()]); + } + 2 => { + data.horizontal_messages.insert( + ParaId::from(300), + vec![ + // can't be sent at the block 1 actually. However, we cheat here + // because we want to test the case where there are multiple messages + // but the harness at the moment doesn't support block skipping. + MSG_2.clone(), + MSG_3.clone(), + ], + ); + data.horizontal_messages + .insert(ParaId::from(200), vec![MSG_4.clone()]); + } + 3 => {} + _ => unreachable!(), + }) + .add(1, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ParaId::from(300), 1, b"1".to_vec())]); + m.clear(); + }); + }) + .add(2, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!( + &*m, + &[ + (ParaId::from(300), 1, b"2".to_vec()), + (ParaId::from(200), 2, b"4".to_vec()), + (ParaId::from(300), 2, b"3".to_vec()), + ] + ); + m.clear(); + }); + }) + .add(3, || {}); +} + +#[test] +fn receive_hrmp_empty_channel() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + // no channels + } + 2 => { + // one new channel + sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = + Some(MessageQueueChain::default().head()); + } + _ => unreachable!(), + }) + .add(1, || {}) + .add(2, || {}); +} + +#[test] +fn receive_hrmp_after_pause() { + lazy_static::lazy_static! { + static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 1, + data: b"mikhailinvanovich".to_vec(), + }; + + static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 3, + data: b"1000000000".to_vec(), + }; + } + + const ALICE: ParaId = ParaId::new(300); + + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + sproof.upsert_inbound_channel(ALICE).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); + } + 2 => { + // 300 - no new messages, mqc stayed the same. + sproof.upsert_inbound_channel(ALICE).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); + } + 3 => { + // 300 - new message. + sproof.upsert_inbound_channel(ALICE).mqc_head = Some( + MessageQueueChain::default() + .extend_hrmp(&MSG_1) + .extend_hrmp(&MSG_2) + .head(), + ); + } + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.horizontal_messages.insert(ALICE, vec![MSG_1.clone()]); + } + 2 => { + // no new messages + } + 3 => { + data.horizontal_messages.insert(ALICE, vec![MSG_2.clone()]); + } + _ => unreachable!(), + }) + .add(1, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ALICE, 1, b"mikhailinvanovich".to_vec())]); + m.clear(); + }); + }) + .add(2, || {}) + .add(3, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ALICE, 3, b"1000000000".to_vec())]); + m.clear(); + }); + }); +} diff --git a/cumulus/pallets/xcmp-queue/src/lib.rs b/cumulus/pallets/xcmp-queue/src/lib.rs index 82b58e79ed..bd27542c6a 100644 --- a/cumulus/pallets/xcmp-queue/src/lib.rs +++ b/cumulus/pallets/xcmp-queue/src/lib.rs @@ -25,29 +25,145 @@ #![cfg_attr(not(feature = "std"), no_std)] -use sp_std::{prelude::*, convert::TryFrom}; -use rand_chacha::{rand_core::{RngCore, SeedableRng}, ChaChaRng}; use codec::{Decode, Encode}; -use sp_runtime::{RuntimeDebug, traits::Hash}; -use frame_support::{decl_error, decl_event, decl_module, decl_storage, dispatch::Weight}; -use xcm::{ - VersionedXcm, v0::{ - Error as XcmError, ExecuteXcm, Junction, MultiLocation, SendXcm, Outcome, Xcm, - }, -}; use cumulus_primitives_core::{ - XcmpMessageHandler, ParaId, XcmpMessageSource, ChannelStatus, MessageSendError, GetChannelInfo, - relay_chain::BlockNumber as RelayBlockNumber, + relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError, + ParaId, XcmpMessageHandler, XcmpMessageSource, +}; +use frame_support::weights::Weight; +use rand_chacha::{ + rand_core::{RngCore, SeedableRng}, + ChaChaRng, +}; +use sp_runtime::{traits::Hash, RuntimeDebug}; +use sp_std::{convert::TryFrom, prelude::*}; +use xcm::{ + v0::{Error as XcmError, ExecuteXcm, Junction, MultiLocation, Outcome, SendXcm, Xcm}, + VersionedXcm, }; -pub trait Config: frame_system::Config { - type Event: From> + Into<::Event>; +pub use pallet::*; - /// Something to execute an XCM message. We need this to service the XCMoXCMP queue. - type XcmExecutor: ExecuteXcm; +#[frame_support::pallet] +pub mod pallet { + use super::*; + use frame_support::pallet_prelude::*; + use frame_system::pallet_prelude::*; - /// Information on the avaialble XCMP channels. - type ChannelInfo: GetChannelInfo; + #[pallet::pallet] + #[pallet::generate_store(pub(super) trait Store)] + pub struct Pallet(_); + + #[pallet::config] + pub trait Config: frame_system::Config { + type Event: From> + IsType<::Event>; + + /// Something to execute an XCM message. We need this to service the XCMoXCMP queue. + type XcmExecutor: ExecuteXcm; + + /// Information on the avaialble XCMP channels. + type ChannelInfo: GetChannelInfo; + } + + impl Default for QueueConfigData { + fn default() -> Self { + Self { + suspend_threshold: 2, + drop_threshold: 5, + resume_threshold: 1, + threshold_weight: 100_000, + weight_restrict_decay: 2, + } + } + } + + #[pallet::hooks] + impl Hooks> for Pallet { + fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight { + // on_idle processes additional messages with any remaining block weight. + Self::service_xcmp_queue(max_weight) + } + } + + #[pallet::call] + impl Pallet {} + + #[pallet::event] + #[pallet::generate_deposit(pub(super) fn deposit_event)] + #[pallet::metadata(Option = "Option")] + pub enum Event { + /// Some XCM was executed ok. + Success(Option), + /// Some XCM failed. + Fail(Option, XcmError), + /// Bad XCM version used. + BadVersion(Option), + /// Bad XCM format used. + BadFormat(Option), + /// An upward message was sent to the relay chain. + UpwardMessageSent(Option), + /// An HRMP message was sent to a sibling parachain. + XcmpMessageSent(Option), + } + + #[pallet::error] + pub enum Error { + /// Failed to send XCM message. + FailedToSend, + /// Bad XCM origin. + BadXcmOrigin, + /// Bad XCM data. + BadXcm, + } + + /// Status of the inbound XCMP channels. + #[pallet::storage] + pub(super) type InboundXcmpStatus = StorageValue< + _, + Vec<( + ParaId, + InboundStatus, + Vec<(RelayBlockNumber, XcmpMessageFormat)>, + )>, + ValueQuery, + >; + + /// Inbound aggregate XCMP messages. It can only be one per ParaId/block. + #[pallet::storage] + pub(super) type InboundXcmpMessages = StorageDoubleMap< + _, + Blake2_128Concat, + ParaId, + Twox64Concat, + RelayBlockNumber, + Vec, + ValueQuery, + >; + + /// The non-empty XCMP channels in order of becoming non-empty, and the index of the first + /// and last outbound message. If the two indices are equal, then it indicates an empty + /// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater + /// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in + /// case of the need to send a high-priority signal message this block. + /// The bool is true if there is a signal message waiting to be sent. + #[pallet::storage] + pub(super) type OutboundXcmpStatus = + StorageValue<_, Vec<(ParaId, OutboundStatus, bool, u16, u16)>, ValueQuery>; + + // The new way of doing it: + /// The messages outbound in a given XCMP channel. + #[pallet::storage] + pub(super) type OutboundXcmpMessages = + StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec, ValueQuery>; + + /// Any signal messages waiting to be sent. + #[pallet::storage] + pub(super) type SignalMessages = + StorageMap<_, Blake2_128Concat, ParaId, Vec, ValueQuery>; + + /// The configuration which controls the dynamics of the outbound queue. + #[pallet::storage] + pub(super) type QueueConfig = StorageValue<_, QueueConfigData, ValueQuery>; } #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, RuntimeDebug)] @@ -80,90 +196,6 @@ pub struct QueueConfigData { weight_restrict_decay: Weight, } -impl Default for QueueConfigData { - fn default() -> Self { - Self { - suspend_threshold: 2, - drop_threshold: 5, - resume_threshold: 1, - threshold_weight: 100_000, - weight_restrict_decay: 2, - } - } -} - -decl_storage! { - trait Store for Module as XcmHandler { - /// Status of the inbound XCMP channels. - InboundXcmpStatus: Vec<(ParaId, InboundStatus, Vec<(RelayBlockNumber, XcmpMessageFormat)>)>; - - /// Inbound aggregate XCMP messages. It can only be one per ParaId/block. - InboundXcmpMessages: double_map hasher(blake2_128_concat) ParaId, - hasher(twox_64_concat) RelayBlockNumber - => Vec; - - /// The non-empty XCMP channels in order of becoming non-empty, and the index of the first - /// and last outbound message. If the two indices are equal, then it indicates an empty - /// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater - /// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in - /// case of the need to send a high-priority signal message this block. - /// The bool is true if there is a signal message waiting to be sent. - OutboundXcmpStatus: Vec<(ParaId, OutboundStatus, bool, u16, u16)>; - - // The new way of doing it: - /// The messages outbound in a given XCMP channel. - OutboundXcmpMessages: double_map hasher(blake2_128_concat) ParaId, - hasher(twox_64_concat) u16 => Vec; - - /// Any signal messages waiting to be sent. - SignalMessages: map hasher(blake2_128_concat) ParaId => Vec; - - /// The configuration which controls the dynamics of the outbound queue. - QueueConfig: QueueConfigData; - } -} - -decl_event! { - pub enum Event where Hash = ::Hash { - /// Some XCM was executed ok. - Success(Option), - /// Some XCM failed. - Fail(Option, XcmError), - /// Bad XCM version used. - BadVersion(Option), - /// Bad XCM format used. - BadFormat(Option), - /// An upward message was sent to the relay chain. - UpwardMessageSent(Option), - /// An HRMP message was sent to a sibling parachain. - XcmpMessageSent(Option), - } -} - -decl_error! { - pub enum Error for Module { - /// Failed to send XCM message. - FailedToSend, - /// Bad XCM origin. - BadXcmOrigin, - /// Bad XCM data. - BadXcm, - } -} - -decl_module! { - pub struct Module for enum Call where origin: T::Origin { - type Error = Error; - - fn deposit_event() = default; - - fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight { - // on_idle processes additional messages with any remaining block weight. - Self::service_xcmp_queue(max_weight) - } - } -} - #[derive(PartialEq, Eq, Copy, Clone, Encode, Decode)] pub enum ChannelSignal { Suspend, @@ -182,7 +214,7 @@ pub enum XcmpMessageFormat { Signals, } -impl Module { +impl Pallet { /// Place a message `fragment` on the outgoing XCMP queue for `recipient`. /// /// Format is the type of aggregate message that the `fragment` may be safely encoded and @@ -213,25 +245,32 @@ impl Module { // Optimization note: `max_message_size` could potentially be stored in // `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed. - let max_message_size = T::ChannelInfo::get_channel_max(recipient) - .ok_or(MessageSendError::NoChannel)?; + let max_message_size = + T::ChannelInfo::get_channel_max(recipient).ok_or(MessageSendError::NoChannel)?; if data.len() > max_message_size { return Err(MessageSendError::TooBig); } - let mut s = OutboundXcmpStatus::get(); - let index = s.iter().position(|item| item.0 == recipient) + let mut s = >::get(); + let index = s + .iter() + .position(|item| item.0 == recipient) .unwrap_or_else(|| { s.push((recipient, OutboundStatus::Ok, false, 0, 0)); s.len() - 1 }); let have_active = s[index].4 > s[index].3; - let appended = have_active && OutboundXcmpMessages::mutate(recipient, s[index].4 - 1, |s| { - if XcmpMessageFormat::decode(&mut &s[..]) != Ok(format) { return false } - if s.len() + data.len() > max_message_size { return false } - s.extend_from_slice(&data[..]); - return true - }); + let appended = have_active + && >::mutate(recipient, s[index].4 - 1, |s| { + if XcmpMessageFormat::decode(&mut &s[..]) != Ok(format) { + return false; + } + if s.len() + data.len() > max_message_size { + return false; + } + s.extend_from_slice(&data[..]); + return true; + }); if appended { Ok((s[index].4 - s[index].3 - 1) as u32) } else { @@ -240,9 +279,9 @@ impl Module { s[index].4 += 1; let mut new_page = format.encode(); new_page.extend_from_slice(&data[..]); - OutboundXcmpMessages::insert(recipient, page_index, new_page); + >::insert(recipient, page_index, new_page); let r = (s[index].4 - s[index].3 - 1) as u32; - OutboundXcmpStatus::put(s); + >::put(s); Ok(r) } } @@ -250,26 +289,25 @@ impl Module { /// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this /// block. fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), ()> { - let mut s = OutboundXcmpStatus::get(); + let mut s = >::get(); if let Some(index) = s.iter().position(|item| item.0 == dest) { s[index].2 = true; } else { s.push((dest, OutboundStatus::Ok, true, 0, 0)); } - SignalMessages::mutate(dest, |page| if page.is_empty() { - *page = (XcmpMessageFormat::Signals, signal).encode(); - } else { - signal.using_encoded(|s| page.extend_from_slice(s)); + >::mutate(dest, |page| { + if page.is_empty() { + *page = (XcmpMessageFormat::Signals, signal).encode(); + } else { + signal.using_encoded(|s| page.extend_from_slice(s)); + } }); - OutboundXcmpStatus::put(s); + >::put(s); Ok(()) } - pub fn send_blob_message( - recipient: ParaId, - blob: Vec, - ) -> Result { + pub fn send_blob_message(recipient: ParaId, blob: Vec) -> Result { Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedEncodedBlob, blob) } @@ -284,8 +322,10 @@ impl Module { // Create a shuffled order for use to iterate through. // Not a great random seed, but good enough for our purposes. let seed = frame_system::Pallet::::parent_hash(); - let seed = <[u8; 32]>::decode(&mut sp_runtime::traits::TrailingZeroInput::new(seed.as_ref())) - .expect("input is padded with zeroes; qed"); + let seed = <[u8; 32]>::decode(&mut sp_runtime::traits::TrailingZeroInput::new( + seed.as_ref(), + )) + .expect("input is padded with zeroes; qed"); let mut rng = ChaChaRng::from_seed(seed); let mut shuffled = (0..len).collect::>(); for i in 0..len { @@ -297,7 +337,12 @@ impl Module { shuffled } - fn handle_blob_message(_sender: ParaId, _sent_at: RelayBlockNumber, _blob: Vec, _weight_limit: Weight) -> Result { + fn handle_blob_message( + _sender: ParaId, + _sent_at: RelayBlockNumber, + _blob: Vec, + _weight_limit: Weight, + ) -> Result { debug_assert!(false, "Blob messages not handled."); Err(false) } @@ -312,23 +357,19 @@ impl Module { log::debug!("Processing XCMP-XCM: {:?}", &hash); let (result, event) = match Xcm::::try_from(xcm) { Ok(xcm) => { - let location = ( - Junction::Parent, - Junction::Parachain(sender.into()), - ); - match T::XcmExecutor::execute_xcm( - location.into(), - xcm, - max_weight, - ) { - Outcome::Error(e) => (Err(e.clone()), RawEvent::Fail(Some(hash), e)), - Outcome::Complete(w) => (Ok(w), RawEvent::Success(Some(hash))), + let location = (Junction::Parent, Junction::Parachain(sender.into())); + match T::XcmExecutor::execute_xcm(location.into(), xcm, max_weight) { + Outcome::Error(e) => (Err(e.clone()), Event::Fail(Some(hash), e)), + Outcome::Complete(w) => (Ok(w), Event::Success(Some(hash))), // As far as the caller is concerned, this was dispatched without error, so // we just report the weight used. - Outcome::Incomplete(w, e) => (Ok(w), RawEvent::Fail(Some(hash), e)), + Outcome::Incomplete(w, e) => (Ok(w), Event::Fail(Some(hash), e)), } } - Err(()) => (Err(XcmError::UnhandledXcmVersion), RawEvent::BadVersion(Some(hash))), + Err(()) => ( + Err(XcmError::UnhandledXcmVersion), + Event::BadVersion(Some(hash)), + ), }; Self::deposit_event(event); result @@ -339,7 +380,7 @@ impl Module { (sent_at, format): (RelayBlockNumber, XcmpMessageFormat), max_weight: Weight, ) -> (Weight, bool) { - let data = InboundXcmpMessages::get(sender, sent_at); + let data = >::get(sender, sent_at); let mut last_remaining_fragments; let mut remaining_fragments = &data[..]; let mut weight_used = 0; @@ -397,9 +438,9 @@ impl Module { } let is_empty = remaining_fragments.is_empty(); if is_empty { - InboundXcmpMessages::remove(sender, sent_at); + >::remove(sender, sent_at); } else { - InboundXcmpMessages::insert(sender, sent_at, remaining_fragments); + >::insert(sender, sent_at, remaining_fragments); } (weight_used, is_empty) } @@ -432,9 +473,9 @@ impl Module { /// for the second &c. though empirical and or practical factors may give rise to adjusting it /// further. fn service_xcmp_queue(max_weight: Weight) -> Weight { - let mut status = InboundXcmpStatus::get(); // <- sorted. + let mut status = >::get(); // <- sorted. if status.len() == 0 { - return 0 + return 0; } let QueueConfigData { @@ -442,7 +483,7 @@ impl Module { threshold_weight, weight_restrict_decay, .. - } = QueueConfig::get(); + } = >::get(); let mut shuffled = Self::create_shuffle(status.len()); let mut weight_used = 0; @@ -457,7 +498,9 @@ impl Module { // send more, heavier messages. let mut shuffle_index = 0; - while shuffle_index < shuffled.len() && max_weight.saturating_sub(weight_used) >= threshold_weight { + while shuffle_index < shuffled.len() + && max_weight.saturating_sub(weight_used) >= threshold_weight + { let index = shuffled[shuffle_index]; let sender = status[index].0; @@ -466,7 +509,8 @@ impl Module { // first round. For the second round we unlock all weight. If we come close enough // on the first round to unlocking everything, then we do so. if shuffle_index < status.len() { - weight_available += (max_weight - weight_available) / (weight_restrict_decay + 1); + weight_available += + (max_weight - weight_available) / (weight_restrict_decay + 1); if weight_available + threshold_weight > max_weight { weight_available = max_weight; } @@ -476,16 +520,16 @@ impl Module { } let weight_processed = if status[index].2.is_empty() { - debug_assert!(false, "channel exists in status; there must be messages; qed"); + debug_assert!( + false, + "channel exists in status; there must be messages; qed" + ); 0 } else { // Process up to one block's worth for now. let weight_remaining = weight_available.saturating_sub(weight_used); - let (weight_processed, is_empty) = Self::process_xcmp_message( - sender, - status[index].2[0], - weight_remaining, - ); + let (weight_processed, is_empty) = + Self::process_xcmp_message(sender, status[index].2[0], weight_remaining); if is_empty { status[index].2.remove(0); } @@ -493,20 +537,26 @@ impl Module { }; weight_used += weight_processed; - if status[index].2.len() as u32 <= resume_threshold && status[index].1 == InboundStatus::Suspended { + if status[index].2.len() as u32 <= resume_threshold + && status[index].1 == InboundStatus::Suspended + { // Resume let r = Self::send_signal(sender, ChannelSignal::Resume); - debug_assert!(r.is_ok(), "WARNING: Failed sending resume into suspended channel"); + debug_assert!( + r.is_ok(), + "WARNING: Failed sending resume into suspended channel" + ); status[index].1 = InboundStatus::Ok; } // If there are more and we're making progress, we process them after we've given the // other channels a look in. If we've still not unlocked all weight, then we set them // up for processing a second time anyway. - if !status[index].2.is_empty() && weight_processed > 0 || weight_available != max_weight { + if !status[index].2.is_empty() && weight_processed > 0 || weight_available != max_weight + { if shuffle_index + 1 == shuffled.len() { // Only this queue left. Just run around this loop once more. - continue + continue; } shuffled.push(index); } @@ -516,12 +566,12 @@ impl Module { // Only retain the senders that have non-empty queues. status.retain(|item| !item.2.is_empty()); - InboundXcmpStatus::put(status); + >::put(status); weight_used } fn suspend_channel(target: ParaId) { - OutboundXcmpStatus::mutate(|s| { + >::mutate(|s| { if let Some(index) = s.iter().position(|item| item.0 == target) { let ok = s[index].1 == OutboundStatus::Ok; debug_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok."); @@ -533,41 +583,53 @@ impl Module { } fn resume_channel(target: ParaId) { - OutboundXcmpStatus::mutate(|s| { + >::mutate(|s| { if let Some(index) = s.iter().position(|item| item.0 == target) { let suspended = s[index].1 == OutboundStatus::Suspended; - debug_assert!(suspended, "WARNING: Attempt to resume channel that was not suspended."); + debug_assert!( + suspended, + "WARNING: Attempt to resume channel that was not suspended." + ); if s[index].3 == s[index].4 { s.remove(index); } else { s[index].1 = OutboundStatus::Ok; } } else { - debug_assert!(false, "WARNING: Attempt to resume channel that was not suspended."); + debug_assert!( + false, + "WARNING: Attempt to resume channel that was not suspended." + ); } }); } } -impl XcmpMessageHandler for Module { - fn handle_xcmp_messages<'a, I: Iterator>( +impl XcmpMessageHandler for Pallet { + fn handle_xcmp_messages<'a, I: Iterator>( iter: I, max_weight: Weight, ) -> Weight { - let mut status = InboundXcmpStatus::get(); + let mut status = >::get(); - let QueueConfigData { suspend_threshold, drop_threshold, .. } = QueueConfig::get(); + let QueueConfigData { + suspend_threshold, + drop_threshold, + .. + } = >::get(); for (sender, sent_at, data) in iter { - // Figure out the message format. let mut data_ref = data; let format = match XcmpMessageFormat::decode(&mut data_ref) { Ok(f) => f, Err(_) => { - debug_assert!(false, "Unknown XCMP message format. Silently dropping message"); - continue - }, + debug_assert!( + false, + "Unknown XCMP message format. Silently dropping message" + ); + continue; + } }; if format == XcmpMessageFormat::Signals { while !data_ref.is_empty() { @@ -587,34 +649,39 @@ impl XcmpMessageHandler for Module { status[i].1 = InboundStatus::Suspended; let r = Self::send_signal(sender, ChannelSignal::Suspend); if r.is_err() { - log::warn!("Attempt to suspend channel failed. Messages may be dropped."); + log::warn!( + "Attempt to suspend channel failed. Messages may be dropped." + ); } } if (count as u32) < drop_threshold { status[i].2.push((sent_at, format)); } else { - debug_assert!(false, "XCMP channel queue full. Silently dropping message"); + debug_assert!( + false, + "XCMP channel queue full. Silently dropping message" + ); } - }, + } Err(_) => status.push((sender, InboundStatus::Ok, vec![(sent_at, format)])), } // Queue the payload for later execution. - InboundXcmpMessages::insert(sender, sent_at, data_ref); + >::insert(sender, sent_at, data_ref); } // Optimization note; it would make sense to execute messages immediately if // `status.is_empty()` here. } status.sort(); - InboundXcmpStatus::put(status); + >::put(status); Self::service_xcmp_queue(max_weight) } } -impl XcmpMessageSource for Module { +impl XcmpMessageSource for Pallet { fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec)> { - let mut statuses = OutboundXcmpStatus::get(); + let mut statuses = >::get(); let old_statuses_len = statuses.len(); let max_message_count = statuses.len().min(maximum_channels); let mut result = Vec::with_capacity(max_message_count); @@ -628,42 +695,42 @@ impl XcmpMessageSource for Module { break; } if outbound_status == OutboundStatus::Suspended { - continue + continue; } let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) { ChannelStatus::Closed => { // This means that there is no such channel anymore. Nothing to be done but // swallow the messages and discard the status. for i in begin..end { - OutboundXcmpMessages::remove(para_id, i); + >::remove(para_id, i); } if signalling { - SignalMessages::remove(para_id); + >::remove(para_id); } *status = (para_id, OutboundStatus::Ok, false, 0, 0); - continue + continue; } ChannelStatus::Full => continue, ChannelStatus::Ready(n, e) => (n, e), }; let page = if signalling { - let page = SignalMessages::get(para_id); + let page = >::get(para_id); if page.len() < max_size_now { - SignalMessages::remove(para_id); + >::remove(para_id); signalling = false; page } else { - continue + continue; } } else if end > begin { - let page = OutboundXcmpMessages::get(para_id, begin); + let page = >::get(para_id, begin); if page.len() < max_size_now { - OutboundXcmpMessages::remove(para_id, begin); + >::remove(para_id, begin); begin += 1; page } else { - continue + continue; } } else { continue; @@ -705,23 +772,27 @@ impl XcmpMessageSource for Module { // be no less than the pruned channels. statuses.rotate_left(result.len() - pruned); - OutboundXcmpStatus::put(statuses); + >::put(statuses); result } } /// Xcm sender for sending to a sibling parachain. -impl SendXcm for Module { +impl SendXcm for Pallet { fn send_xcm(dest: MultiLocation, msg: Xcm<()>) -> Result<(), XcmError> { match &dest { // An HRMP message for a sibling parachain. MultiLocation::X2(Junction::Parent, Junction::Parachain(id)) => { let msg = VersionedXcm::<()>::from(msg); let hash = T::Hashing::hash_of(&msg); - Self::send_fragment((*id).into(), XcmpMessageFormat::ConcatenatedVersionedXcm, msg) - .map_err(|e| XcmError::SendFailed(<&'static str>::from(e)))?; - Self::deposit_event(RawEvent::XcmpMessageSent(Some(hash))); + Self::send_fragment( + (*id).into(), + XcmpMessageFormat::ConcatenatedVersionedXcm, + msg, + ) + .map_err(|e| XcmError::SendFailed(<&'static str>::from(e)))?; + Self::deposit_event(Event::XcmpMessageSent(Some(hash))); Ok(()) } // Anything else is unhandled. This includes a message this is meant for us. diff --git a/cumulus/rococo-parachains/pallets/parachain-info/src/lib.rs b/cumulus/rococo-parachains/pallets/parachain-info/src/lib.rs index 4a189dee67..0d83b0c66d 100644 --- a/cumulus/rococo-parachains/pallets/parachain-info/src/lib.rs +++ b/cumulus/rococo-parachains/pallets/parachain-info/src/lib.rs @@ -18,25 +18,58 @@ #![cfg_attr(not(feature = "std"), no_std)] -use frame_support::{decl_module, decl_storage, traits::Get}; +pub use pallet::*; -use cumulus_primitives_core::ParaId; +#[frame_support::pallet] +pub mod pallet { + use frame_support::pallet_prelude::*; + use frame_system::pallet_prelude::*; + use cumulus_primitives_core::ParaId; -/// Configuration trait of this pallet. -pub trait Config: frame_system::Config {} + #[pallet::pallet] + #[pallet::generate_store(pub(super) trait Store)] + pub struct Pallet(_); -impl Get for Module { - fn get() -> ParaId { - Self::parachain_id() + #[pallet::config] + pub trait Config: frame_system::Config {} + + #[pallet::hooks] + impl Hooks> for Pallet {} + + #[pallet::call] + impl Pallet {} + + #[pallet::genesis_config] + pub struct GenesisConfig { + pub parachain_id: ParaId, + } + + #[cfg(feature = "std")] + impl Default for GenesisConfig { + fn default() -> Self { + Self { + parachain_id: 100.into() + } + } + } + + #[pallet::genesis_build] + impl GenesisBuild for GenesisConfig { + fn build(&self) { + >::put(&self.parachain_id); + } + } + + #[pallet::type_value] + pub(super) fn DefaultForParachainId() -> ParaId { 100.into() } + + #[pallet::storage] + #[pallet::getter(fn parachain_id)] + pub(super) type ParachainId = StorageValue<_, ParaId, ValueQuery, DefaultForParachainId>; + + impl Get for Pallet { + fn get() -> ParaId { + Self::parachain_id() + } } } - -decl_storage! { - trait Store for Module as ParachainInfo { - ParachainId get(fn parachain_id) config(): ParaId = 100.into(); - } -} - -decl_module! { - pub struct Module for enum Call where origin: T::Origin {} -} diff --git a/cumulus/rococo-parachains/runtime/src/lib.rs b/cumulus/rococo-parachains/runtime/src/lib.rs index ea36553f91..9487c7313b 100644 --- a/cumulus/rococo-parachains/runtime/src/lib.rs +++ b/cumulus/rococo-parachains/runtime/src/lib.rs @@ -238,7 +238,7 @@ parameter_types! { impl cumulus_pallet_parachain_system::Config for Runtime { type Event = Event; type OnValidationData = (); - type SelfParaId = parachain_info::Module; + type SelfParaId = parachain_info::Pallet; type OutboundXcmpMessageSource = XcmpQueue; type DmpMessageHandler = DmpQueue; type ReservedDmpWeight = ReservedDmpWeight; diff --git a/cumulus/rococo-parachains/shell-runtime/src/lib.rs b/cumulus/rococo-parachains/shell-runtime/src/lib.rs index 4d7a4a8dea..58d7a42200 100644 --- a/cumulus/rococo-parachains/shell-runtime/src/lib.rs +++ b/cumulus/rococo-parachains/shell-runtime/src/lib.rs @@ -161,7 +161,7 @@ parameter_types! { impl cumulus_pallet_parachain_system::Config for Runtime { type Event = Event; type OnValidationData = (); - type SelfParaId = parachain_info::Module; + type SelfParaId = parachain_info::Pallet; type OutboundXcmpMessageSource = (); type DmpMessageHandler = cumulus_pallet_xcm::UnlimitedDmpExecution; type ReservedDmpWeight = ReservedDmpWeight;