XCM revamp & Ping pallet (#391)

* Add spambot

* Fixes

* Add some extra functions to spambot, bump version

* Lock..

* Aggregate HRMP (XCMP/HMP) messages. Payloads for spambot.

* Fix tests, bump Polkadot.

* Fix HMP tests

* Rename Hrmp -> Xcmp for handler/sender

* Use master branch

* Test Xcm message passing & rename away from HMP

* Docs

* Introduce fee payment mechanics into XCM.

* Rename spambot -> ping

* Lock

* XCMP message dispatch system reimagining

- Moved most of the logic into xcm-handler pallet
- Altered the outgoing XCMP API from push to pull
- Changed underlying outgoing queue data structures to avoid multi-page read/writes
- Introduced queuing for incoming messages
- Introduced signal messages as a flow-control sub-stream
- Introduced flow-control with basic threshold back-pressure
- Introduced overall weight limitation on messages executed
- Additonal alterations to XCM APIs for the new system

* Should process any remaining XCM messages when we're not doing anything else.

* Update API usage and preparation for the big build.

* Some build fixes

* Build fixes

* xcm-handler builds

* Fix warnings

* Docs

* Parachains system builds

* Parachain runtime building

* Fix build

* Introduce transfer_asset specialisation.

* Fixes

* Two-stage upgrade for parachains.

* Fixes

* Fixes

* Updates for message sending.

* Repotting/renaming. Add primitives/utility.

* Remove real-overseer and bump refs

* Configure & document Rococo XCM runtime.

* Add shell runtime, some companion changes for #8589

* Bumps & fixes

* Fix test

* Build fix

* Update pallets/xcmp-queue/src/lib.rs

Co-authored-by: Amar Singh <asinghchrony@protonmail.com>

* Make tests compile

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* remove unused

* remove unused event stuff

* Adds proper validation-worker to make integration tests work

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* import saturating

* remove panic test

Co-authored-by: Robert Habermeier <rphmeier@gmail.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Amar Singh <asinghchrony@protonmail.com>
Co-authored-by: Shawn Tabrizi <shawntabrizi@gmail.com>
Co-authored-by: Bastian Köcher <info@kchr.de>
This commit is contained in:
Gavin Wood
2021-04-14 09:36:59 +02:00
committed by GitHub
parent 2516e06927
commit 86f9a99184
34 changed files with 3168 additions and 1525 deletions
@@ -12,6 +12,7 @@ cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inh
# Polkadot dependencies
polkadot-parachain = { git = "https://github.com/paritytech/polkadot", default-features = false, features = [ "wasm-api" ], branch = "master" }
xcm = { git = "https://github.com/paritytech/polkadot", default-features = false, branch = "master" }
# Substrate dependencies
frame-support = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
@@ -72,4 +73,5 @@ std = [
"cumulus-primitives-core/std",
"cumulus-primitives-parachain-inherent/std",
"environmental/std",
"xcm/std"
]
+258 -350
View File
@@ -27,27 +27,28 @@
//!
//! Users must ensure that they register this pallet as an inherent provider.
use cumulus_primitives_core::{
relay_chain,
well_known_keys::{self, NEW_VALIDATION_CODE},
AbridgedHostConfiguration, DownwardMessageHandler, HrmpMessageHandler, HrmpMessageSender,
InboundDownwardMessage, InboundHrmpMessage, OnValidationData, OutboundHrmpMessage, ParaId,
PersistedValidationData, UpwardMessage, UpwardMessageSender,
};
use cumulus_primitives_parachain_inherent::ParachainInherentData;
use sp_std::{prelude::*, cmp, collections::btree_map::BTreeMap};
use sp_runtime::traits::{BlakeTwo256, Hash};
use sp_inherents::{InherentData, InherentIdentifier, ProvideInherent};
use frame_support::{
decl_error, decl_event, decl_module, decl_storage,
dispatch::DispatchResult,
dispatch::{DispatchResult, DispatchError, DispatchResultWithPostInfo},
ensure, storage,
traits::Get,
weights::{DispatchClass, Weight},
weights::{DispatchClass, Weight, PostDispatchInfo, Pays},
};
use frame_system::{ensure_none, ensure_root};
use polkadot_parachain::primitives::RelayChainBlockNumber;
use cumulus_primitives_core::{
relay_chain,
well_known_keys::{self, NEW_VALIDATION_CODE},
AbridgedHostConfiguration, DownwardMessageHandler, XcmpMessageHandler,
InboundDownwardMessage, InboundHrmpMessage, OnValidationData, OutboundHrmpMessage, ParaId,
PersistedValidationData, UpwardMessage, UpwardMessageSender, MessageSendError,
XcmpMessageSource, ChannelStatus, GetChannelInfo,
};
use cumulus_primitives_parachain_inherent::ParachainInherentData;
use relay_state_snapshot::MessagingStateSnapshot;
use sp_inherents::{InherentData, InherentIdentifier, ProvideInherent};
use sp_runtime::traits::{BlakeTwo256, Hash};
use sp_std::{cmp, collections::btree_map::BTreeMap, vec::Vec};
mod relay_state_snapshot;
#[macro_use]
@@ -56,7 +57,7 @@ pub mod validate_block;
/// The pallet's configuration trait.
pub trait Config: frame_system::Config<OnSetCode = ParachainSetCode<Self>> {
/// The overarching event type.
type Event: From<Event> + Into<<Self as frame_system::Config>::Event>;
type Event: From<Event<Self>> + Into<<Self as frame_system::Config>::Event>;
/// Something which can be notified when the validation data is set.
type OnValidationData: OnValidationData;
@@ -67,11 +68,18 @@ pub trait Config: frame_system::Config<OnSetCode = ParachainSetCode<Self>> {
/// The downward message handlers that will be informed when a message is received.
type DownwardMessageHandlers: DownwardMessageHandler;
/// The place where outbound XCMP messages come from. This is queried in `finalize_block`.
type OutboundXcmpMessageSource: XcmpMessageSource;
/// The HRMP message handlers that will be informed when a message is received.
///
/// The messages are dispatched in the order they were relayed by the relay chain. If multiple
/// messages were relayed at one block, these will be dispatched in ascending order of the sender's para ID.
type HrmpMessageHandlers: HrmpMessageHandler;
/// messages were relayed at one block, these will be dispatched in ascending order of the
/// sender's para ID.
type XcmpMessageHandler: XcmpMessageHandler;
/// The weight we reserve at the beginning of the block for processing XCMP messages.
type ReservedXcmpWeight: Get<Weight>;
}
// This pallet's storage items.
@@ -126,14 +134,16 @@ decl_storage! {
PendingUpwardMessages: Vec<UpwardMessage>;
/// Essentially `OutboundHrmpMessage`s grouped by the recipients.
OutboundHrmpMessages: map hasher(twox_64_concat) ParaId => Vec<Vec<u8>>;
/// HRMP channels with the given recipients are awaiting to be processed. If a `ParaId` is
/// present in this vector then `OutboundHrmpMessages` for it should be not empty.
NonEmptyHrmpChannels: Vec<ParaId>;
/// The number of HRMP messages we observed in `on_initialize` and thus used that number for
/// announcing the weight of `on_initialize` and `on_finialize`.
/// announcing the weight of `on_initialize` and `on_finalize`.
AnnouncedHrmpMessagesPerCandidate: u32;
/// The weight we reserve at the beginning of the block for processing XCMP messages. This
/// overrides the amount set in the Config trait.
ReservedXcmpWeightOverride: Option<Weight>;
/// The next authorized upgrade, if there is one.
AuthorizedUpgrade: Option<T::Hash>;
}
}
@@ -171,7 +181,8 @@ decl_module! {
/// As a side effect, this function upgrades the current validation function
/// if the appropriate time has come.
#[weight = (0, DispatchClass::Mandatory)]
pub fn set_validation_data(origin, data: ParachainInherentData) -> DispatchResult {
// TODO: This weight should be corrected.
pub fn set_validation_data(origin, data: ParachainInherentData) -> DispatchResultWithPostInfo {
ensure_none(origin)?;
assert!(
!ValidationData::exists(),
@@ -196,7 +207,7 @@ decl_module! {
let validation_function = PendingValidationFunction::take();
LastUpgrade::put(&apply_block);
Self::put_parachain_code(&validation_function);
Self::deposit_event(Event::ValidationFunctionApplied(vfp.relay_parent_number));
Self::deposit_event(RawEvent::ValidationFunctionApplied(vfp.relay_parent_number));
}
}
@@ -217,16 +228,18 @@ decl_module! {
<T::OnValidationData as OnValidationData>::on_validation_data(&vfp);
Self::process_inbound_downward_messages(
// TODO: This is more than zero, but will need benchmarking to figure out what.
let mut total_weight = 0;
total_weight += Self::process_inbound_downward_messages(
relevant_messaging_state.dmq_mqc_head,
downward_messages,
)?;
Self::process_inbound_horizontal_messages(
total_weight += Self::process_inbound_horizontal_messages(
&relevant_messaging_state.ingress_channels,
horizontal_messages,
)?;
Ok(())
Ok(PostDispatchInfo { actual_weight: Some(total_weight), pays_fee: Pays::No })
}
#[weight = (1_000, DispatchClass::Operational)]
@@ -235,19 +248,43 @@ decl_module! {
let _ = Self::send_upward_message(message);
}
#[weight = (1_000, DispatchClass::Operational)]
fn sudo_send_hrmp_message(origin, message: OutboundHrmpMessage) {
#[weight = (1_000_000, DispatchClass::Operational)]
fn authorize_upgrade(origin, code_hash: T::Hash) {
ensure_root(origin)?;
let _ = Self::send_hrmp_message(message);
AuthorizedUpgrade::<T>::put(&code_hash);
Self::deposit_event(RawEvent::UpgradeAuthorized(code_hash));
}
#[weight = 1_000_000]
fn enact_authorized_upgrade(origin, code: Vec<u8>) {
// No ensure origin on purpose. We validate by checking the code vs hash in storage.
let required_hash = AuthorizedUpgrade::<T>::get()
.ok_or(Error::<T>::NothingAuthorized)?;
let actual_hash = T::Hashing::hash(&code[..]);
ensure!(actual_hash == required_hash, Error::<T>::Unauthorized);
Self::set_code_impl(code)?;
AuthorizedUpgrade::<T>::kill();
}
fn on_finalize() {
DidSetValidationCode::take();
DidSetValidationCode::kill();
let host_config = Self::host_configuration()
.expect("host configuration is promised to set until `on_finalize`; qed");
let relevant_messaging_state = Self::relevant_messaging_state()
.expect("relevant messaging state is promised to be set until `on_finalize`; qed");
let host_config = match Self::host_configuration() {
Some(ok) => ok,
None => {
debug_assert!(false, "host configuration is promised to set until `on_finalize`; qed");
return
}
};
let relevant_messaging_state = match Self::relevant_messaging_state() {
Some(ok) => ok,
None => {
debug_assert!(false, "relevant messaging state is promised to be set until `on_finalize`; qed");
return
}
};
<Self as Store>::PendingUpwardMessages::mutate(|up| {
let (count, size) = relevant_messaging_state.relay_dispatch_queue_size;
@@ -292,109 +329,19 @@ decl_module! {
// - the capacity and total size of the channel is limited,
// - the maximum size of a message is limited (and can potentially be changed),
let mut non_empty_hrmp_channels = NonEmptyHrmpChannels::get();
// The number of messages we can send is limited by all of:
// - the number of non empty channels
// - the maximum number of messages per candidate according to the fresh config
// - the maximum number of messages per candidate according to the stale config
let outbound_hrmp_num =
non_empty_hrmp_channels.len()
.min(host_config.hrmp_max_message_num_per_candidate as usize)
.min(AnnouncedHrmpMessagesPerCandidate::take() as usize);
let maximum_channels = host_config.hrmp_max_message_num_per_candidate
.min(AnnouncedHrmpMessagesPerCandidate::take()) as usize;
let mut outbound_hrmp_messages = Vec::with_capacity(outbound_hrmp_num);
let mut prune_empty = Vec::with_capacity(outbound_hrmp_num);
for &recipient in non_empty_hrmp_channels.iter() {
if outbound_hrmp_messages.len() == outbound_hrmp_num {
// We have picked the required number of messages for the batch, no reason to
// iterate further.
//
// We check this condition in the beginning of the loop so that we don't include
// a message where the limit is 0.
break;
}
let idx = match relevant_messaging_state
.egress_channels
.binary_search_by_key(&recipient, |(recipient, _)| *recipient)
{
Ok(m) => m,
Err(_) => {
// TODO: #274 This means that there is no such channel anymore. Means that we should
// return back the messages from this channel.
//
// Until then pretend it became empty
prune_empty.push(recipient);
continue;
}
};
let channel_meta = &relevant_messaging_state.egress_channels[idx].1;
if channel_meta.msg_count + 1 > channel_meta.max_capacity {
// The channel is at its capacity. Skip it for now.
continue;
}
let mut pending = <Self as Store>::OutboundHrmpMessages::get(&recipient);
// This panics if `v` is empty. However, we are iterating only once over non-empty
// channels, therefore it cannot panic.
let message_payload = pending.remove(0);
let became_empty = pending.is_empty();
if channel_meta.total_size + message_payload.len() as u32 > channel_meta.max_total_size {
// Sending this message will make the channel total size overflow. Skip it for now.
continue;
}
// If we reached here, then the channel has capacity to receive this message. However,
// it doesn't mean that we are sending it just yet.
if became_empty {
OutboundHrmpMessages::remove(&recipient);
prune_empty.push(recipient);
} else {
OutboundHrmpMessages::insert(&recipient, pending);
}
if message_payload.len() as u32 > channel_meta.max_message_size {
// Apparently, the max message size was decreased since the message while the
// message was buffered. While it's possible to make another iteration to fetch
// the next message, we just keep going here to not complicate the logic too much.
//
// TODO: #274 Return back this message to sender.
continue;
}
outbound_hrmp_messages.push(OutboundHrmpMessage {
recipient,
data: message_payload,
});
}
// Sort the outbound messages by asceding recipient para id to satisfy the acceptance
// criteria requirement.
outbound_hrmp_messages.sort_by_key(|m| m.recipient);
// Prune hrmp channels that became empty. Additionally, because it may so happen that we
// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
// change the order. Otherwise, the next `on_finalize` we will again give attention
// only to those channels that happen to be in the beginning, until they are emptied.
// This leads to "starvation" of the channels near to the end.
//
// To mitigate this we shift all processed elements towards the end of the vector using
// `rotate_left`. To get intution how it works see the examples in its rustdoc.
non_empty_hrmp_channels.retain(|x| !prune_empty.contains(x));
// `prune_empty.len()` is greater or equal to `outbound_hrmp_num` because the loop above
// can only do `outbound_hrmp_num` iterations and `prune_empty` is appended to only inside
// the loop body.
non_empty_hrmp_channels.rotate_left(outbound_hrmp_num - prune_empty.len());
<Self as Store>::NonEmptyHrmpChannels::put(non_empty_hrmp_channels);
storage::unhashed::put(
well_known_keys::HRMP_OUTBOUND_MESSAGES,
&outbound_hrmp_messages,
let outbound_messages = T::OutboundXcmpMessageSource::take_outbound_messages(
maximum_channels,
);
// Note conversion to the OutboundHrmpMessage isn't needed since the data that
// `take_outbound_messages` returns encodes equivalently.
// If the following code breaks, then we'll need to revisit that assumption.
let _ = OutboundHrmpMessage { recipient: ParaId::from(0), data: vec![] };
storage::unhashed::put(well_known_keys::HRMP_OUTBOUND_MESSAGES, &outbound_messages);
}
fn on_initialize(n: T::BlockNumber) -> Weight {
@@ -449,6 +396,55 @@ decl_module! {
}
}
impl<T: Config> GetChannelInfo for Module<T> {
fn get_channel_status(id: ParaId) -> ChannelStatus {
// Note, that we are using `relevant_messaging_state` which may be from the previous
// block, in case this is called from `on_initialize`, i.e. before the inherent with fresh
// data is submitted.
//
// That shouldn't be a problem though because this is anticipated and already can happen.
// This is because sending implies that a message is buffered until there is space to send
// a message in the candidate. After a while waiting in a buffer, it may be discovered that
// the channel to which a message were addressed is now closed. Another possibility, is that
// the maximum message size was decreased so that a message in the buffer doesn't fit. Should
// any of that happen the sender should be notified about the message was discarded.
//
// Here it a similar case, with the difference that the realization that the channel is closed
// came the same block.
let channels = match Self::relevant_messaging_state() {
None => {
log::warn!("calling `get_channel_status` with no RelevantMessagingState?!");
return ChannelStatus::Closed
},
Some(d) => d.egress_channels,
};
// ^^^ NOTE: This storage field should carry over from the previous block. So if it's None
// then it must be that this is an edge-case where a message is attempted to be
// sent at the first block. It should be safe to assume that there are no channels
// opened at all so early. At least, relying on this assumption seems to be a better
// tradeoff, compared to introducing an error variant that the clients should be
// prepared to handle.
let index = match channels.binary_search_by_key(&id, |item| item.0) {
Err(_) => return ChannelStatus::Closed,
Ok(i) => i,
};
let meta = &channels[index].1;
if meta.msg_count + 1 > meta.max_capacity {
// The channel is at its capacity. Skip it for now.
return ChannelStatus::Full;
}
let max_size_now = meta.max_total_size - meta.total_size;
let max_size_ever = meta.max_message_size;
ChannelStatus::Ready(max_size_now as usize, max_size_ever as usize)
}
fn get_channel_max(id: ParaId) -> Option<usize> {
let channels = Self::relevant_messaging_state()?.egress_channels;
let index = channels.binary_search_by_key(&id, |item| item.0).ok()?;
Some(channels[index].1.max_message_size as usize)
}
}
impl<T: Config> Module<T> {
/// Validate the given [`PersistedValidationData`] against the
/// [`ValidationParams`](polkadot_parachain::primitives::ValidationParams).
@@ -481,13 +477,17 @@ impl<T: Config> Module<T> {
fn process_inbound_downward_messages(
expected_dmq_mqc_head: relay_chain::Hash,
downward_messages: Vec<InboundDownwardMessage>,
) -> DispatchResult {
) -> Result<Weight, DispatchError> {
let dm_count = downward_messages.len() as u32;
let mut weight_used = 0;
// Reference fu to avoid the `move` capture.
let weight_used_mut_ref = &mut weight_used;
let result_mqc_head = LastDmqMqcHead::mutate(move |mqc| {
for downward_message in downward_messages {
mqc.extend_downward(&downward_message);
T::DownwardMessageHandlers::handle_downward_message(downward_message);
*weight_used_mut_ref += T::DownwardMessageHandlers::handle_downward_message(downward_message);
}
mqc.0
});
@@ -503,7 +503,7 @@ impl<T: Config> Module<T> {
// PVF's `validate_block` wrapper and collation pipeline.
storage::unhashed::put(well_known_keys::PROCESSED_DOWNWARD_MESSAGES, &dm_count);
Ok(())
Ok(weight_used)
}
/// Process all inbound horizontal messages relayed by the collator.
@@ -513,7 +513,7 @@ impl<T: Config> Module<T> {
fn process_inbound_horizontal_messages(
ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
horizontal_messages: BTreeMap<ParaId, Vec<InboundHrmpMessage>>,
) -> DispatchResult {
) -> Result<Weight, DispatchError> {
// First, check that all submitted messages are sent from channels that exist. The channel
// exists if its MQC head is present in `vfp.hrmp_mqc_heads`.
for sender in horizontal_messages.keys() {
@@ -552,21 +552,26 @@ impl<T: Config> Module<T> {
let mut running_mqc_heads = BTreeMap::new();
let mut hrmp_watermark = None;
for (sender, horizontal_message) in horizontal_messages {
if hrmp_watermark
.map(|w| w < horizontal_message.sent_at)
.unwrap_or(true)
{
hrmp_watermark = Some(horizontal_message.sent_at);
{
for (sender, ref horizontal_message) in &horizontal_messages {
if hrmp_watermark
.map(|w| w < horizontal_message.sent_at)
.unwrap_or(true)
{
hrmp_watermark = Some(horizontal_message.sent_at);
}
running_mqc_heads
.entry(sender)
.or_insert_with(|| last_mqc_heads.get(&sender).cloned().unwrap_or_default())
.extend_hrmp(horizontal_message);
}
running_mqc_heads
.entry(sender)
.or_insert_with(|| last_mqc_heads.get(&sender).cloned().unwrap_or_default())
.extend_hrmp(&horizontal_message);
T::HrmpMessageHandlers::handle_hrmp_message(sender, horizontal_message);
}
let message_iter = horizontal_messages.iter()
.map(|&(sender, ref message)| (sender, message.sent_at, &message.data[..]));
let max_weight = ReservedXcmpWeightOverride::get().unwrap_or_else(T::ReservedXcmpWeight::get);
let weight_used = T::XcmpMessageHandler::handle_xcmp_messages(message_iter, max_weight);
// Check that the MQC heads for each channel provided by the relay chain match the MQC heads
// we have after processing all incoming messages.
@@ -577,7 +582,7 @@ impl<T: Config> Module<T> {
// would corrupt the message queue chain.
for &(ref sender, ref channel) in ingress_channels {
let cur_head = running_mqc_heads
.entry(*sender)
.entry(sender)
.or_insert_with(|| last_mqc_heads.get(&sender).cloned().unwrap_or_default())
.head();
let target_head = channel.mqc_head.unwrap_or_default();
@@ -592,7 +597,7 @@ impl<T: Config> Module<T> {
storage::unhashed::put(well_known_keys::HRMP_WATERMARK, &hrmp_watermark);
}
Ok(())
Ok(weight_used)
}
/// Put a new validation function into a particular location where polkadot
@@ -663,7 +668,7 @@ impl<T: Config> Module<T> {
Self::notify_polkadot_of_pending_upgrade(&validation_function);
PendingRelayChainBlockNumber::put(apply_block);
PendingValidationFunction::put(validation_function);
Self::deposit_event(Event::ValidationFunctionStored(apply_block));
Self::deposit_event(RawEvent::ValidationFunctionStored(apply_block));
Ok(())
}
@@ -714,24 +719,8 @@ impl MessageQueueChain {
}
}
/// An error that can be raised upon sending an upward message.
#[derive(Debug, PartialEq)]
pub enum SendUpErr {
/// The message sent is too big.
TooBig,
}
/// An error that can be raised upon sending a horizontal message.
#[derive(Debug, PartialEq)]
pub enum SendHorizontalErr {
/// The message sent is too big.
TooBig,
/// There is no channel to the specified destination.
NoChannel,
}
impl<T: Config> Module<T> {
pub fn send_upward_message(message: UpwardMessage) -> Result<(), SendUpErr> {
pub fn send_upward_message(message: UpwardMessage) -> Result<u32, MessageSendError> {
// Check if the message fits into the relay-chain constraints.
//
// Note, that we are using `host_configuration` here which may be from the previous
@@ -747,7 +736,7 @@ impl<T: Config> Module<T> {
match Self::host_configuration() {
Some(cfg) => {
if message.len() > cfg.max_upward_message_size as usize {
return Err(SendUpErr::TooBig);
return Err(MessageSendError::TooBig);
}
}
None => {
@@ -763,71 +752,13 @@ impl<T: Config> Module<T> {
}
};
<Self as Store>::PendingUpwardMessages::append(message);
Ok(())
}
pub fn send_hrmp_message(message: OutboundHrmpMessage) -> Result<(), SendHorizontalErr> {
let OutboundHrmpMessage { recipient, data } = message;
// First, check if the message is addressed into an opened channel.
//
// Note, that we are using `relevant_messaging_state` which may be from the previous
// block, in case this is called from `on_initialize`, i.e. before the inherent with fresh
// data is submitted.
//
// That shouldn't be a problem though because this is anticipated and already can happen.
// This is because sending implies that a message is buffered until there is space to send
// a message in the candidate. After a while waiting in a buffer, it may be discovered that
// the channel to which a message were addressed is now closed. Another possibility, is that
// the maximum message size was decreased so that a message in the bufer doesn't fit. Should
// any of that happen the sender should be notified about the message was discarded.
//
// Here it a similar case, with the difference that the realization that the channel is closed
// came the same block.
let relevant_messaging_state = match Self::relevant_messaging_state() {
Some(s) => s,
None => {
// This storage field should carry over from the previous block. So if it's None
// then it must be that this is an edge-case where a message is attempted to be
// sent at the first block. It should be safe to assume that there are no channels
// opened at all so early. At least, relying on this assumption seems to be a better
// tradeoff, compared to introducing an error variant that the clients should be
// prepared to handle.
return Err(SendHorizontalErr::NoChannel);
}
};
let channel_meta = match relevant_messaging_state
.egress_channels
.binary_search_by_key(&recipient, |(recipient, _)| *recipient)
{
Ok(idx) => &relevant_messaging_state.egress_channels[idx].1,
Err(_) => return Err(SendHorizontalErr::NoChannel),
};
if data.len() as u32 > channel_meta.max_message_size {
return Err(SendHorizontalErr::TooBig);
}
// And then at last update the storage.
<Self as Store>::OutboundHrmpMessages::append(&recipient, data);
<Self as Store>::NonEmptyHrmpChannels::mutate(|v| {
if !v.contains(&recipient) {
v.push(recipient);
}
});
Ok(())
Ok(0)
}
}
impl<T: Config> UpwardMessageSender for Module<T> {
fn send_upward_message(message: UpwardMessage) -> Result<(), ()> {
Self::send_upward_message(message).map_err(|_| ())
}
}
impl<T: Config> HrmpMessageSender for Module<T> {
fn send_hrmp_message(message: OutboundHrmpMessage) -> Result<(), ()> {
Self::send_hrmp_message(message).map_err(|_| ())
fn send_upward_message(message: UpwardMessage) -> Result<u32, MessageSendError> {
Self::send_upward_message(message)
}
}
@@ -846,14 +777,20 @@ impl<T: Config> ProvideInherent for Module<T> {
Some(Call::set_validation_data(data))
}
fn is_inherent(call: &Self::Call) -> bool {
matches!(call, Call::set_validation_data(_))
}
}
decl_event! {
pub enum Event {
pub enum Event<T> where Hash = <T as frame_system::Config>::Hash {
// The validation function has been scheduled to apply as of the contained relay chain block number.
ValidationFunctionStored(RelayChainBlockNumber),
// The validation function was applied as of the contained relay chain block number.
ValidationFunctionApplied(RelayChainBlockNumber),
// An upgrade has been authorized.
UpgradeAuthorized(Hash),
}
}
@@ -888,6 +825,10 @@ decl_error! {
HrmpMqcMismatch,
/// No validation function upgrade is currently scheduled.
NotScheduled,
/// No code upgrade has been authorized.
NothingAuthorized,
/// The given code upgrade has not been authorized.
Unauthorized,
}
}
@@ -899,6 +840,7 @@ mod tests {
use codec::Encode;
use cumulus_primitives_core::{
AbridgedHrmpChannel, InboundDownwardMessage, InboundHrmpMessage, PersistedValidationData,
relay_chain::BlockNumber as RelayBlockNumber,
};
use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
use frame_support::{
@@ -927,7 +869,7 @@ mod tests {
UncheckedExtrinsic = UncheckedExtrinsic,
{
System: frame_system::{Pallet, Call, Config, Storage, Event<T>},
ParachainSystem: parachain_system::{Pallet, Call, Storage, Event},
ParachainSystem: parachain_system::{Pallet, Call, Storage, Event<T>},
}
);
@@ -943,6 +885,7 @@ mod tests {
transaction_version: 1,
};
pub const ParachainId: ParaId = ParaId::new(200);
pub const ReservedXcmpWeight: Weight = 0;
}
impl frame_system::Config for Test {
type Origin = Origin;
@@ -974,28 +917,69 @@ mod tests {
type OnValidationData = ();
type SelfParaId = ParachainId;
type DownwardMessageHandlers = SaveIntoThreadLocal;
type HrmpMessageHandlers = SaveIntoThreadLocal;
type XcmpMessageHandler = SaveIntoThreadLocal;
type OutboundXcmpMessageSource = FromThreadLocal;
type ReservedXcmpWeight = ReservedXcmpWeight;
}
pub struct FromThreadLocal;
pub struct SaveIntoThreadLocal;
std::thread_local! {
static HANDLED_DOWNWARD_MESSAGES: RefCell<Vec<InboundDownwardMessage>> = RefCell::new(Vec::new());
static HANDLED_HRMP_MESSAGES: RefCell<Vec<(ParaId, InboundHrmpMessage)>> = RefCell::new(Vec::new());
static HANDLED_XCMP_MESSAGES: RefCell<Vec<(ParaId, relay_chain::BlockNumber, Vec<u8>)>> = RefCell::new(Vec::new());
static SENT_MESSAGES: RefCell<Vec<(ParaId, Vec<u8>)>> = RefCell::new(Vec::new());
}
impl DownwardMessageHandler for SaveIntoThreadLocal {
fn handle_downward_message(msg: InboundDownwardMessage) {
HANDLED_DOWNWARD_MESSAGES.with(|m| {
m.borrow_mut().push(msg);
});
fn send_message(
dest: ParaId,
message: Vec<u8>,
) {
SENT_MESSAGES.with(|m| m.borrow_mut().push((dest, message)));
}
impl XcmpMessageSource for FromThreadLocal {
fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
let mut ids = std::collections::BTreeSet::<ParaId>::new();
let mut taken = 0;
let mut result = Vec::new();
SENT_MESSAGES.with(|ms| ms.borrow_mut()
.retain(|m| {
let status = <Module::<Test> as GetChannelInfo>::get_channel_status(m.0);
let ready = matches!(status, ChannelStatus::Ready(..));
if ready && !ids.contains(&m.0) && taken < maximum_channels {
ids.insert(m.0);
taken += 1;
result.push(m.clone());
false
} else {
true
}
})
);
result
}
}
impl HrmpMessageHandler for SaveIntoThreadLocal {
fn handle_hrmp_message(sender: ParaId, msg: InboundHrmpMessage) {
HANDLED_HRMP_MESSAGES.with(|m| {
m.borrow_mut().push((sender, msg));
impl DownwardMessageHandler for SaveIntoThreadLocal {
fn handle_downward_message(msg: InboundDownwardMessage) -> Weight {
HANDLED_DOWNWARD_MESSAGES.with(|m| {
m.borrow_mut().push(msg);
});
0
}
}
impl XcmpMessageHandler for SaveIntoThreadLocal {
fn handle_xcmp_messages<'a, I: Iterator<Item=(ParaId, RelayBlockNumber, &'a [u8])>>(
iter: I,
_max_weight: Weight,
) -> Weight {
HANDLED_XCMP_MESSAGES.with(|m| {
for (sender, sent_at, message) in iter {
m.borrow_mut().push((sender, sent_at, message.to_vec()));
}
0
})
}
}
@@ -1004,7 +988,7 @@ mod tests {
// our desired mockup.
fn new_test_ext() -> sp_io::TestExternalities {
HANDLED_DOWNWARD_MESSAGES.with(|m| m.borrow_mut().clear());
HANDLED_HRMP_MESSAGES.with(|m| m.borrow_mut().clear());
HANDLED_XCMP_MESSAGES.with(|m| m.borrow_mut().clear());
frame_system::GenesisConfig::default()
.build_storage::<Test>()
@@ -1253,7 +1237,7 @@ mod tests {
let events = System::events();
assert_eq!(
events[0].event,
Event::parachain_system(crate::Event::ValidationFunctionStored(1123))
Event::parachain_system(crate::RawEvent::ValidationFunctionStored(1123).into())
);
},
)
@@ -1264,7 +1248,7 @@ mod tests {
let events = System::events();
assert_eq!(
events[0].event,
Event::parachain_system(crate::Event::ValidationFunctionApplied(1234))
Event::parachain_system(crate::RawEvent::ValidationFunctionApplied(1234).into())
);
},
);
@@ -1399,80 +1383,6 @@ mod tests {
);
}
#[test]
fn send_hrmp_preliminary_no_channel() {
BlockTests::new()
.with_relay_sproof_builder(|_, _, sproof| {
sproof.para_id = ParaId::from(200);
// no channels established
sproof.hrmp_egress_channel_index = Some(vec![]);
})
.add(1, || {})
.add(2, || {
assert!(ParachainSystem::send_hrmp_message(OutboundHrmpMessage {
recipient: ParaId::from(300),
data: b"derp".to_vec(),
})
.is_err());
});
}
#[test]
fn send_hrmp_message_simple() {
BlockTests::new()
.with_relay_sproof_builder(|_, _, sproof| {
sproof.para_id = ParaId::from(200);
sproof.hrmp_egress_channel_index = Some(vec![ParaId::from(300)]);
sproof.hrmp_channels.insert(
HrmpChannelId {
sender: ParaId::from(200),
recipient: ParaId::from(300),
},
AbridgedHrmpChannel {
max_capacity: 1,
max_total_size: 1024,
max_message_size: 8,
msg_count: 0,
total_size: 0,
mqc_head: Default::default(),
},
);
})
.add_with_post_test(
1,
|| {
ParachainSystem::send_hrmp_message(OutboundHrmpMessage {
recipient: ParaId::from(300),
data: b"derp".to_vec(),
})
.unwrap()
},
|| {
// there are no outbound messages since the special logic for handling the
// first block kicks in.
let v: Option<Vec<OutboundHrmpMessage>> =
storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES);
assert_eq!(v, Some(vec![]));
},
)
.add_with_post_test(
2,
|| {},
|| {
let v: Option<Vec<OutboundHrmpMessage>> =
storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES);
assert_eq!(
v,
Some(vec![OutboundHrmpMessage {
recipient: ParaId::from(300),
data: b"derp".to_vec(),
}])
);
},
);
}
#[test]
fn send_hrmp_message_buffer_channel_close() {
BlockTests::new()
@@ -1512,7 +1422,7 @@ mod tests {
);
//
// Adjustement according to block
// Adjustment according to block
//
match relay_block_num {
1 => {}
@@ -1545,16 +1455,14 @@ mod tests {
.add_with_post_test(
1,
|| {
ParachainSystem::send_hrmp_message(OutboundHrmpMessage {
recipient: ParaId::from(300),
data: b"1".to_vec(),
})
.unwrap();
ParachainSystem::send_hrmp_message(OutboundHrmpMessage {
recipient: ParaId::from(400),
data: b"2".to_vec(),
})
.unwrap()
send_message(
ParaId::from(300),
b"1".to_vec(),
);
send_message(
ParaId::from(400),
b"2".to_vec(),
);
},
|| {},
)
@@ -1659,22 +1567,22 @@ mod tests {
lazy_static::lazy_static! {
static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage {
sent_at: 1,
data: b"aquadisco".to_vec(),
data: b"1".to_vec(),
};
static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage {
sent_at: 1,
data: b"mudroom".to_vec(),
data: b"2".to_vec(),
};
static ref MSG_3: InboundHrmpMessage = InboundHrmpMessage {
sent_at: 2,
data: b"eggpeeling".to_vec(),
data: b"3".to_vec(),
};
static ref MSG_4: InboundHrmpMessage = InboundHrmpMessage {
sent_at: 2,
data: b"casino".to_vec(),
data: b"4".to_vec(),
};
}
@@ -1730,21 +1638,21 @@ mod tests {
_ => unreachable!(),
})
.add(1, || {
HANDLED_HRMP_MESSAGES.with(|m| {
HANDLED_XCMP_MESSAGES.with(|m| {
let mut m = m.borrow_mut();
assert_eq!(&*m, &[(ParaId::from(300), MSG_1.clone())]);
assert_eq!(&*m, &[(ParaId::from(300), 1, b"1".to_vec())]);
m.clear();
});
})
.add(2, || {
HANDLED_HRMP_MESSAGES.with(|m| {
HANDLED_XCMP_MESSAGES.with(|m| {
let mut m = m.borrow_mut();
assert_eq!(
&*m,
&[
(ParaId::from(300), MSG_2.clone()),
(ParaId::from(200), MSG_4.clone()),
(ParaId::from(300), MSG_3.clone()),
(ParaId::from(300), 1, b"2".to_vec()),
(ParaId::from(200), 2, b"4".to_vec()),
(ParaId::from(300), 2, b"3".to_vec()),
]
);
m.clear();
@@ -1822,17 +1730,17 @@ mod tests {
_ => unreachable!(),
})
.add(1, || {
HANDLED_HRMP_MESSAGES.with(|m| {
HANDLED_XCMP_MESSAGES.with(|m| {
let mut m = m.borrow_mut();
assert_eq!(&*m, &[(ALICE, MSG_1.clone())]);
assert_eq!(&*m, &[(ALICE, 1, b"mikhailinvanovich".to_vec())]);
m.clear();
});
})
.add(2, || {})
.add(3, || {
HANDLED_HRMP_MESSAGES.with(|m| {
HANDLED_XCMP_MESSAGES.with(|m| {
let mut m = m.borrow_mut();
assert_eq!(&*m, &[(ALICE, MSG_2.clone())]);
assert_eq!(&*m, &[(ALICE, 3, b"1000000000".to_vec())]);
m.clear();
});
});
-232
View File
@@ -1,232 +0,0 @@
// Copyright 2020-2021 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
//! A pallet which implements the message handling APIs for handling incoming XCM:
//! * `DownwardMessageHandler`
//! * `HrmpMessageHandler`
//!
//! Also provides an implementation of `SendXcm` to handle outgoing XCM.
#![cfg_attr(not(feature = "std"), no_std)]
use codec::{Decode, Encode};
use cumulus_primitives_core::{
DownwardMessageHandler, HrmpMessageHandler, HrmpMessageSender, InboundDownwardMessage,
InboundHrmpMessage, OutboundHrmpMessage, ParaId, UpwardMessageSender,
};
use frame_support::{decl_error, decl_event, decl_module, dispatch::DispatchResult, sp_runtime::traits::Hash, traits::EnsureOrigin};
use sp_std::convert::{TryFrom, TryInto};
use xcm::{
v0::{Error as XcmError, ExecuteXcm, Junction, MultiLocation, SendXcm, Xcm},
VersionedXcm,
};
use xcm_executor::traits::LocationConversion;
pub trait Config: frame_system::Config {
type Event: From<Event<Self>> + Into<<Self as frame_system::Config>::Event>;
/// Something to execute an XCM message.
type XcmExecutor: ExecuteXcm;
/// Something to send an upward message.
type UpwardMessageSender: UpwardMessageSender;
/// Something to send an HRMP message.
type HrmpMessageSender: HrmpMessageSender;
/// Required origin for sending XCM messages. Typically Root or parachain
/// council majority.
type SendXcmOrigin: EnsureOrigin<Self::Origin>;
/// Utility for converting from the signed origin (of type `Self::AccountId`) into a sensible
/// `MultiLocation` ready for passing to the XCM interpreter.
type AccountIdConverter: LocationConversion<Self::AccountId>;
}
decl_event! {
pub enum Event<T> where Hash = <T as frame_system::Config>::Hash {
/// Some XCM was executed ok.
Success(Hash),
/// Some XCM failed.
Fail(Hash, XcmError),
/// Bad XCM version used.
BadVersion(Hash),
/// Bad XCM format used.
BadFormat(Hash),
/// An upward message was sent to the relay chain.
UpwardMessageSent(Hash),
/// An HRMP message was sent to a sibling parachain.
HrmpMessageSent(Hash),
}
}
decl_error! {
pub enum Error for Module<T: Config> {
/// Failed to send XCM message.
FailedToSend,
/// Bad XCM origin.
BadXcmOrigin,
}
}
decl_module! {
pub struct Module<T: Config> for enum Call where origin: T::Origin {
type Error = Error<T>;
fn deposit_event() = default;
#[weight = 1_000]
fn send_xcm(origin, dest: MultiLocation, message: Xcm) {
T::SendXcmOrigin::ensure_origin(origin)?;
<Self as SendXcm>::send_xcm(dest, message).map_err(|_| Error::<T>::FailedToSend)?;
}
#[weight = 1_000]
fn send_upward_xcm(origin, message: VersionedXcm) {
T::SendXcmOrigin::ensure_origin(origin)?;
let data = message.encode();
T::UpwardMessageSender::send_upward_message(data).map_err(|_| Error::<T>::FailedToSend)?;
}
#[weight = 1_000]
fn send_hrmp_xcm(origin, recipient: ParaId, message: VersionedXcm) {
T::SendXcmOrigin::ensure_origin(origin)?;
let data = message.encode();
let outbound_message = OutboundHrmpMessage {
recipient,
data,
};
T::HrmpMessageSender::send_hrmp_message(outbound_message).map_err(|_| Error::<T>::FailedToSend)?;
}
}
}
impl<T: Config> Module<T> {
/// Execute an XCM message locally. Returns `DispatchError` if failed.
pub fn execute_xcm(origin: T::AccountId, xcm: Xcm) -> DispatchResult {
let xcm_origin = T::AccountIdConverter::try_into_location(origin)
.map_err(|_| Error::<T>::BadXcmOrigin)?;
let hash = T::Hashing::hash(&xcm.encode());
let event = match T::XcmExecutor::execute_xcm(xcm_origin, xcm) {
Ok(_) => Event::<T>::Success(hash),
Err(e) => Event::<T>::Fail(hash, e),
};
Self::deposit_event(event);
Ok(())
}
}
impl<T: Config> DownwardMessageHandler for Module<T> {
fn handle_downward_message(msg: InboundDownwardMessage) {
let hash = msg.using_encoded(T::Hashing::hash);
log::debug!("Processing Downward XCM: {:?}", &hash);
let event = match VersionedXcm::decode(&mut &msg.msg[..]).map(Xcm::try_from) {
Ok(Ok(xcm)) => {
match T::XcmExecutor::execute_xcm(Junction::Parent.into(), xcm) {
Ok(..) => RawEvent::Success(hash),
Err(e) => RawEvent::Fail(hash, e),
}
}
Ok(Err(..)) => RawEvent::BadVersion(hash),
Err(..) => RawEvent::BadFormat(hash),
};
Self::deposit_event(event);
}
}
impl<T: Config> HrmpMessageHandler for Module<T> {
fn handle_hrmp_message(sender: ParaId, msg: InboundHrmpMessage) {
let hash = msg.using_encoded(T::Hashing::hash);
log::debug!("Processing HRMP XCM: {:?}", &hash);
let event = match VersionedXcm::decode(&mut &msg.data[..]).map(Xcm::try_from) {
Ok(Ok(xcm)) => {
let location = (
Junction::Parent,
Junction::Parachain { id: sender.into() },
);
match T::XcmExecutor::execute_xcm(location.into(), xcm) {
Ok(..) => RawEvent::Success(hash),
Err(e) => RawEvent::Fail(hash, e),
}
}
Ok(Err(..)) => RawEvent::BadVersion(hash),
Err(..) => RawEvent::BadFormat(hash),
};
Self::deposit_event(event);
}
}
impl<T: Config> SendXcm for Module<T> {
fn send_xcm(dest: MultiLocation, msg: Xcm) -> Result<(), XcmError> {
let msg: VersionedXcm = msg.into();
match dest.first() {
// A message for us. Execute directly.
None => {
let msg = msg.try_into().map_err(|_| XcmError::UnhandledXcmVersion)?;
let res = T::XcmExecutor::execute_xcm(MultiLocation::Null, msg);
res
}
// An upward message for the relay chain.
Some(Junction::Parent) if dest.len() == 1 => {
let data = msg.encode();
let hash = T::Hashing::hash(&data);
T::UpwardMessageSender::send_upward_message(data)
.map_err(|_| XcmError::CannotReachDestination)?;
Self::deposit_event(RawEvent::UpwardMessageSent(hash));
Ok(())
}
// An HRMP message for a sibling parachain.
Some(Junction::Parent) if dest.len() == 2 => {
if let Some(Junction::Parachain { id }) = dest.at(1) {
let data = msg.encode();
let hash = T::Hashing::hash(&data);
let message = OutboundHrmpMessage {
recipient: (*id).into(),
data,
};
T::HrmpMessageSender::send_hrmp_message(message)
.map_err(|_| XcmError::CannotReachDestination)?;
Self::deposit_event(RawEvent::HrmpMessageSent(hash));
Ok(())
} else {
Err(XcmError::UnhandledXcmMessage)
}
}
_ => {
/* TODO: Handle other cases, like downward message */
Err(XcmError::UnhandledXcmMessage)
}
}
}
}
/// Origin for the parachains module.
#[derive(PartialEq, Eq, Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug))]
pub enum Origin {
/// It comes from the (parent) relay chain.
Relay,
/// It comes from a (sibling) parachain.
SiblingParachain(ParaId),
}
impl From<ParaId> for Origin {
fn from(id: ParaId) -> Origin {
Origin::SiblingParachain(id)
}
}
impl From<u32> for Origin {
fn from(id: u32) -> Origin {
Origin::SiblingParachain(id.into())
}
}
+30
View File
@@ -0,0 +1,30 @@
[package]
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
name = "cumulus-pallet-xcm"
version = "0.1.0"
[dependencies]
codec = { package = "parity-scale-codec", version = "2.0.0", default-features = false, features = ["derive"] }
serde = { version = "1.0.101", optional = true, features = ["derive"] }
sp-std = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
frame-support = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
frame-system = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
xcm = { git = "https://github.com/paritytech/polkadot", default-features = false, branch = "master" }
cumulus-primitives-core = { path = "../../primitives/core", default-features = false }
[features]
default = ["std"]
std = [
"codec/std",
"serde",
"cumulus-primitives-core/std",
"sp-std/std",
"sp-runtime/std",
"frame-support/std",
"frame-system/std",
]
+92
View File
@@ -0,0 +1,92 @@
// Copyright 2020-2021 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
//! Pallet for stuff specific to parachains' usage of XCM. Right now that's just the origin
//! used by parachains when receiving `Transact` messages from other parachains or the Relay chain
//! which must be natively represented.
#![cfg_attr(not(feature = "std"), no_std)]
use cumulus_primitives_core::ParaId;
use codec::{Encode, Decode};
use sp_runtime::traits::BadOrigin;
pub use pallet::*;
#[frame_support::pallet]
pub mod pallet {
use frame_support::pallet_prelude::*;
use frame_system::pallet_prelude::*;
#[pallet::pallet]
#[pallet::generate_store(pub(super) trait Store)]
pub struct Pallet<T>(_);
/// The module configuration trait.
#[pallet::config]
pub trait Config: frame_system::Config {}
#[pallet::error]
pub enum Error<T> {}
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {}
#[pallet::call]
impl<T: Config> Pallet<T> {}
}
/// Origin for the parachains module.
#[derive(PartialEq, Eq, Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug))]
pub enum Origin {
/// It comes from the (parent) relay chain.
Relay,
/// It comes from a (sibling) parachain.
SiblingParachain(ParaId),
}
impl From<ParaId> for Origin {
fn from(id: ParaId) -> Origin {
Origin::SiblingParachain(id)
}
}
impl From<u32> for Origin {
fn from(id: u32) -> Origin {
Origin::SiblingParachain(id.into())
}
}
/// Ensure that the origin `o` represents a sibling parachain.
/// Returns `Ok` with the parachain ID of the sibling or an `Err` otherwise.
pub fn ensure_sibling_para<OuterOrigin>(o: OuterOrigin) -> Result<ParaId, BadOrigin>
where OuterOrigin: Into<Result<Origin, OuterOrigin>>
{
match o.into() {
Ok(Origin::SiblingParachain(id)) => Ok(id),
_ => Err(BadOrigin),
}
}
/// Ensure that the origin `o` represents is the relay chain.
/// Returns `Ok` if it does or an `Err` otherwise.
pub fn ensure_relay<OuterOrigin>(o: OuterOrigin) -> Result<(), BadOrigin>
where OuterOrigin: Into<Result<Origin, OuterOrigin>>
{
match o.into() {
Ok(Origin::Relay) => Ok(()),
_ => Err(BadOrigin),
}
}
@@ -1,5 +1,5 @@
[package]
name = "cumulus-pallet-xcm-handler"
name = "cumulus-pallet-xcmp-queue"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
@@ -8,9 +8,12 @@ edition = "2018"
# Other dependencies
codec = { package = "parity-scale-codec", version = "2.0.0", features = [ "derive" ], default-features = false }
log = { version = "0.4.14", default-features = false }
rand = { version = "0.8.3", default-features = false }
rand_chacha = { version = "0.3.0", default-features = false }
# Substrate Dependencies
sp-std = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
frame-support = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
frame-system = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
@@ -27,6 +30,7 @@ std = [
"codec/std",
"log/std",
"sp-std/std",
"sp-runtime/std",
"frame-support/std",
"frame-system/std",
"cumulus-primitives-core/std",
+706
View File
@@ -0,0 +1,706 @@
// Copyright 2020-2021 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
//! A pallet which uses the XCMP transport layer to handle both incoming and outgoing XCM message
//! sending and dispatch, queuing, signalling and backpressure. To do so, it implements:
//! * `XcmpMessageHandler`
//! * `XcmpMessageSource`
//!
//! Also provides an implementation of `SendXcm` which can be placed in a router tuple for relaying
//! XCM over XCMP if the destination is `Parent/Parachain`. It requires an implementation of
//! `XcmExecutor` for dispatching incoming XCM messages.
#![cfg_attr(not(feature = "std"), no_std)]
use sp_std::{prelude::*, convert::TryFrom};
use rand_chacha::{rand_core::{RngCore, SeedableRng}, ChaChaRng};
use codec::{Decode, Encode};
use sp_runtime::{RuntimeDebug, traits::Hash};
use frame_support::{decl_error, decl_event, decl_module, decl_storage, dispatch::Weight};
use xcm::{
VersionedXcm, v0::{
Error as XcmError, ExecuteXcm, Junction, MultiLocation, SendXcm, Outcome, Xcm,
},
};
use cumulus_primitives_core::{
XcmpMessageHandler, ParaId, XcmpMessageSource, ChannelStatus, MessageSendError, GetChannelInfo,
relay_chain::BlockNumber as RelayBlockNumber,
};
pub trait Config: frame_system::Config {
type Event: From<Event<Self>> + Into<<Self as frame_system::Config>::Event>;
/// Something to execute an XCM message. We need this to service the XCMoXCMP queue.
type XcmExecutor: ExecuteXcm<Self::Call>;
/// Information on the avaialble XCMP channels.
type ChannelInfo: GetChannelInfo;
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, RuntimeDebug)]
pub enum InboundStatus {
Ok,
Suspended,
}
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug)]
pub enum OutboundStatus {
Ok,
Suspended,
}
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug)]
pub struct QueueConfigData {
/// The number of pages of messages which must be in the queue for the other side to be told to
/// suspend their sending.
suspend_threshold: u32,
/// The number of pages of messages which must be in the queue after which we drop any further
/// messages from the channel.
drop_threshold: u32,
/// The number of pages of messages which the queue must be reduced to before it signals that
/// message sending may recommence after it has been suspended.
resume_threshold: u32,
// The amount of remaining weight under which we stop processing messages.
threshold_weight: Weight,
/// The speed to which the available weight approaches the maximum weight. A lower number
/// results in a faster progression. A value of 1 makes the entire weight available initially.
weight_restrict_decay: Weight,
}
impl Default for QueueConfigData {
fn default() -> Self {
Self {
suspend_threshold: 2,
drop_threshold: 5,
resume_threshold: 1,
threshold_weight: 100_000,
weight_restrict_decay: 2,
}
}
}
decl_storage! {
trait Store for Module<T: Config> as XcmHandler {
/// Status of the inbound XCMP channels.
InboundXcmpStatus: Vec<(ParaId, InboundStatus, Vec<(RelayBlockNumber, XcmpMessageFormat)>)>;
/// Inbound aggregate XCMP messages. It can only be one per ParaId/block.
InboundXcmpMessages: double_map hasher(blake2_128_concat) ParaId,
hasher(twox_64_concat) RelayBlockNumber
=> Vec<u8>;
/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
/// and last outbound message. If the two indices are equal, then it indicates an empty
/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
/// case of the need to send a high-priority signal message this block.
/// The bool is true if there is a signal message waiting to be sent.
OutboundXcmpStatus: Vec<(ParaId, OutboundStatus, bool, u16, u16)>;
// The new way of doing it:
/// The messages outbound in a given XCMP channel.
OutboundXcmpMessages: double_map hasher(blake2_128_concat) ParaId,
hasher(twox_64_concat) u16 => Vec<u8>;
/// Any signal messages waiting to be sent.
SignalMessages: map hasher(blake2_128_concat) ParaId => Vec<u8>;
/// The configuration which controls the dynamics of the outbound queue.
QueueConfig: QueueConfigData;
}
}
decl_event! {
pub enum Event<T> where Hash = <T as frame_system::Config>::Hash {
/// Some XCM was executed ok.
Success(Option<Hash>),
/// Some XCM failed.
Fail(Option<Hash>, XcmError),
/// Bad XCM version used.
BadVersion(Option<Hash>),
/// Bad XCM format used.
BadFormat(Option<Hash>),
/// An upward message was sent to the relay chain.
UpwardMessageSent(Option<Hash>),
/// An HRMP message was sent to a sibling parachain.
XcmpMessageSent(Option<Hash>),
}
}
decl_error! {
pub enum Error for Module<T: Config> {
/// Failed to send XCM message.
FailedToSend,
/// Bad XCM origin.
BadXcmOrigin,
/// Bad XCM data.
BadXcm,
}
}
decl_module! {
pub struct Module<T: Config> for enum Call where origin: T::Origin {
type Error = Error<T>;
fn deposit_event() = default;
fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight {
// on_idle processes additional messages with any remaining block weight.
Self::service_xcmp_queue(max_weight)
}
}
}
#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode)]
pub enum ChannelSignal {
Suspend,
Resume,
}
/// The aggregate XCMP message format.
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode)]
pub enum XcmpMessageFormat {
/// Encoded `VersionedXcm` messages, all concatenated.
ConcatenatedVersionedXcm,
/// Encoded `Vec<u8>` messages, all concatenated.
ConcatenatedEncodedBlob,
/// One or more channel control signals; these should be interpreted immediately upon receipt
/// from the relay-chain.
Signals,
}
impl<T: Config> Module<T> {
/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
///
/// Format is the type of aggregate message that the `fragment` may be safely encoded and
/// appended onto. Whether earlier unused space is used for the fragment at the risk of sending
/// it out of order is determined with `qos`. NOTE: For any two messages to be guaranteed to be
/// dispatched in order, then both must be sent with `ServiceQuality::Ordered`.
///
/// ## Background
///
/// For our purposes, one HRMP "message" is actually an aggregated block of XCM "messages".
///
/// For the sake of clarity, we distinguish between them as message AGGREGATEs versus
/// message FRAGMENTs.
///
/// So each AGGREGATE is comprised of one or more concatenated SCALE-encoded `Vec<u8>`
/// FRAGMENTs. Though each fragment is already probably a SCALE-encoded Xcm, we can't be
/// certain, so we SCALE encode each `Vec<u8>` fragment in order to ensure we have the
/// length prefixed and can thus decode each fragment from the aggregate stream. With this,
/// we can concatenate them into a single aggregate blob without needing to be concerned
/// about encoding fragment boundaries.
fn send_fragment<Fragment: Encode>(
recipient: ParaId,
format: XcmpMessageFormat,
fragment: Fragment,
) -> Result<u32, MessageSendError> {
let data = fragment.encode();
// Optimization note: `max_message_size` could potentially be stored in
// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
let max_message_size = T::ChannelInfo::get_channel_max(recipient)
.ok_or(MessageSendError::NoChannel)?;
if data.len() > max_message_size {
return Err(MessageSendError::TooBig);
}
let mut s = OutboundXcmpStatus::get();
let index = s.iter().position(|item| item.0 == recipient)
.unwrap_or_else(|| {
s.push((recipient, OutboundStatus::Ok, false, 0, 0));
s.len() - 1
});
let have_active = s[index].4 > s[index].3;
let appended = have_active && OutboundXcmpMessages::mutate(recipient, s[index].4 - 1, |s| {
if XcmpMessageFormat::decode(&mut &s[..]) != Ok(format) { return false }
if s.len() + data.len() > max_message_size { return false }
s.extend_from_slice(&data[..]);
return true
});
if appended {
Ok((s[index].4 - s[index].3 - 1) as u32)
} else {
// Need to add a new page.
let page_index = s[index].4;
s[index].4 += 1;
let mut new_page = format.encode();
new_page.extend_from_slice(&data[..]);
OutboundXcmpMessages::insert(recipient, page_index, new_page);
let r = (s[index].4 - s[index].3 - 1) as u32;
OutboundXcmpStatus::put(s);
Ok(r)
}
}
/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
/// block.
fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), ()> {
let mut s = OutboundXcmpStatus::get();
if let Some(index) = s.iter().position(|item| item.0 == dest) {
s[index].2 = true;
} else {
s.push((dest, OutboundStatus::Ok, true, 0, 0));
}
SignalMessages::mutate(dest, |page| if page.is_empty() {
*page = (XcmpMessageFormat::Signals, signal).encode();
} else {
signal.using_encoded(|s| page.extend_from_slice(s));
});
OutboundXcmpStatus::put(s);
Ok(())
}
pub fn send_blob_message(
recipient: ParaId,
blob: Vec<u8>,
) -> Result<u32, MessageSendError> {
Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedEncodedBlob, blob)
}
pub fn send_xcm_message(
recipient: ParaId,
xcm: VersionedXcm<()>,
) -> Result<u32, MessageSendError> {
Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm)
}
fn create_shuffle(len: usize) -> Vec<usize> {
// Create a shuffled order for use to iterate through.
// Not a great random seed, but good enough for our purposes.
let seed = frame_system::Pallet::<T>::parent_hash();
let seed = <[u8; 32]>::decode(&mut sp_runtime::traits::TrailingZeroInput::new(seed.as_ref()))
.expect("input is padded with zeroes; qed");
let mut rng = ChaChaRng::from_seed(seed);
let mut shuffled = (0..len).collect::<Vec<_>>();
for i in 0..len {
let j = (rng.next_u32() as usize) % len;
let a = shuffled[i];
shuffled[i] = shuffled[j];
shuffled[j] = a;
}
shuffled
}
fn handle_blob_message(_sender: ParaId, _sent_at: RelayBlockNumber, _blob: Vec<u8>, _weight_limit: Weight) -> Result<Weight, bool> {
debug_assert!(false, "Blob messages not handled.");
Err(false)
}
fn handle_xcm_message(
sender: ParaId,
_sent_at: RelayBlockNumber,
xcm: VersionedXcm<T::Call>,
max_weight: Weight,
) -> Result<Weight, XcmError> {
let hash = Encode::using_encoded(&xcm, T::Hashing::hash);
log::debug!("Processing XCMP-XCM: {:?}", &hash);
let (result, event) = match Xcm::<T::Call>::try_from(xcm) {
Ok(xcm) => {
let location = (
Junction::Parent,
Junction::Parachain { id: sender.into() },
);
match T::XcmExecutor::execute_xcm(
location.into(),
xcm,
max_weight,
) {
Outcome::Error(e) => (Err(e.clone()), RawEvent::Fail(Some(hash), e)),
Outcome::Complete(w) => (Ok(w), RawEvent::Success(Some(hash))),
// As far as the caller is concerned, this was dispatched without error, so
// we just report the weight used.
Outcome::Incomplete(w, e) => (Ok(w), RawEvent::Fail(Some(hash), e)),
}
}
Err(()) => (Err(XcmError::UnhandledXcmVersion), RawEvent::BadVersion(Some(hash))),
};
Self::deposit_event(event);
result
}
fn process_xcmp_message(
sender: ParaId,
(sent_at, format): (RelayBlockNumber, XcmpMessageFormat),
max_weight: Weight,
) -> (Weight, bool) {
let data = InboundXcmpMessages::get(sender, sent_at);
let mut last_remaining_fragments;
let mut remaining_fragments = &data[..];
let mut weight_used = 0;
match format {
XcmpMessageFormat::ConcatenatedVersionedXcm => {
while !remaining_fragments.is_empty() {
last_remaining_fragments = remaining_fragments;
if let Ok(xcm) = VersionedXcm::<T::Call>::decode(&mut remaining_fragments) {
let weight = max_weight - weight_used;
match Self::handle_xcm_message(sender, sent_at, xcm, weight) {
Ok(used) => weight_used = weight_used.saturating_add(used),
Err(XcmError::TooMuchWeightRequired) => {
// That message didn't get processed this time because of being
// too heavy. We leave it around for next time and bail.
remaining_fragments = last_remaining_fragments;
break;
}
Err(_) => {
// Message looks invalid; don't attempt to retry
}
}
} else {
debug_assert!(false, "Invalid incoming XCMP message data");
remaining_fragments = &b""[..];
}
}
}
XcmpMessageFormat::ConcatenatedEncodedBlob => {
while !remaining_fragments.is_empty() {
last_remaining_fragments = remaining_fragments;
if let Ok(blob) = <Vec<u8>>::decode(&mut remaining_fragments) {
let weight = max_weight - weight_used;
match Self::handle_blob_message(sender, sent_at, blob, weight) {
Ok(used) => weight_used = weight_used.saturating_add(used),
Err(true) => {
// That message didn't get processed this time because of being
// too heavy. We leave it around for next time and bail.
remaining_fragments = last_remaining_fragments;
break;
}
Err(false) => {
// Message invalid; don't attempt to retry
}
}
} else {
debug_assert!(false, "Invalid incoming blob message data");
remaining_fragments = &b""[..];
}
}
}
XcmpMessageFormat::Signals => {
debug_assert!(false, "All signals are handled immediately; qed");
remaining_fragments = &b""[..];
}
}
let is_empty = remaining_fragments.is_empty();
if is_empty {
InboundXcmpMessages::remove(sender, sent_at);
} else {
InboundXcmpMessages::insert(sender, sent_at, remaining_fragments);
}
(weight_used, is_empty)
}
/// Service the incoming XCMP message queue attempting to execute up to `max_weight` execution
/// weight of messages.
fn service_xcmp_queue(max_weight: Weight) -> Weight {
let mut status = InboundXcmpStatus::get(); // <- sorted.
if status.len() == 0 {
return 0
}
let QueueConfigData {
resume_threshold,
threshold_weight,
weight_restrict_decay,
..
} = QueueConfig::get();
let mut shuffled = Self::create_shuffle(status.len());
let mut weight_used = 0;
let mut weight_available = 0;
// We don't want the possibility of a chain sending a series of really heavy messages and
// tying up the block's execution time from other chains. Therefore we execute any remaining
// messages in a random order.
// Order within a single channel will always be preserved, however this does mean that
// relative order between channels may not. The result is that chains which tend to send
// fewer, lighter messages will generally have a lower latency than chains which tend to
// send more, heavier messages.
let mut shuffle_index = 0;
while shuffle_index < shuffled.len() && max_weight.saturating_sub(weight_used) < threshold_weight {
let index = shuffled[shuffle_index];
let sender = status[index].0;
if weight_available != max_weight {
// Get incrementally closer to freeing up max_weight for message execution over the
// first round. For the second round we unlock all weight. If we come close enough
// on the first round to unlocking everything, then we do so.
if shuffle_index < status.len() {
weight_available += (max_weight - weight_available) / weight_restrict_decay;
if weight_available + threshold_weight > max_weight {
weight_available = max_weight;
}
} else {
weight_available = max_weight;
}
}
let weight_processed = if status[index].2.is_empty() {
debug_assert!(false, "channel exists in status; there must be messages; qed");
0
} else {
// Process up to one block's worth for now.
let weight_remaining = weight_available.saturating_sub(weight_used);
let (weight_processed, is_empty) = Self::process_xcmp_message(
sender,
status[index].2[0],
weight_remaining,
);
if is_empty {
status[index].2.remove(0);
}
weight_processed
};
weight_used += weight_processed;
if status[index].2.len() as u32 <= resume_threshold && status[index].1 == InboundStatus::Suspended {
// Resume
let r = Self::send_signal(sender, ChannelSignal::Resume);
debug_assert!(r.is_ok(), "WARNING: Failed sending resume into suspended channel");
status[index].1 = InboundStatus::Ok;
}
// If there are more and we're making progress, we process them after we've given the
// other channels a look in. If we've still not unlocked all weight, then we set them
// up for processing a second time anyway.
if !status[index].2.is_empty() && weight_processed > 0 || weight_available != max_weight {
if shuffle_index + 1 == shuffled.len() {
// Only this queue left. Just run around this loop once more.
continue
}
shuffled.push(index);
}
shuffle_index += 1;
}
// Only retain the senders that have non-empty queues.
status.retain(|item| !item.2.is_empty());
InboundXcmpStatus::put(status);
weight_used
}
fn suspend_channel(target: ParaId) {
OutboundXcmpStatus::mutate(|s| {
if let Some(index) = s.iter().position(|item| item.0 == target) {
let ok = s[index].1 == OutboundStatus::Ok;
debug_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
s[index].1 = OutboundStatus::Suspended;
} else {
s.push((target, OutboundStatus::Suspended, false, 0, 0));
}
});
}
fn resume_channel(target: ParaId) {
OutboundXcmpStatus::mutate(|s| {
if let Some(index) = s.iter().position(|item| item.0 == target) {
let suspended = s[index].1 == OutboundStatus::Suspended;
debug_assert!(suspended, "WARNING: Attempt to resume channel that was not suspended.");
if s[index].3 == s[index].4 {
s.remove(index);
} else {
s[index].1 = OutboundStatus::Ok;
}
} else {
debug_assert!(false, "WARNING: Attempt to resume channel that was not suspended.");
}
});
}
}
impl<T: Config> XcmpMessageHandler for Module<T> {
fn handle_xcmp_messages<'a, I: Iterator<Item=(ParaId, RelayBlockNumber, &'a [u8])>>(
iter: I,
max_weight: Weight,
) -> Weight {
let mut status = InboundXcmpStatus::get();
let QueueConfigData { suspend_threshold, drop_threshold, .. } = QueueConfig::get();
for (sender, sent_at, data) in iter {
// Figure out the message format.
let mut data_ref = data;
let format = match XcmpMessageFormat::decode(&mut data_ref) {
Ok(f) => f,
Err(_) => {
debug_assert!(false, "Unknown XCMP message format. Silently dropping message");
continue
},
};
if format == XcmpMessageFormat::Signals {
while !data_ref.is_empty() {
use ChannelSignal::*;
match ChannelSignal::decode(&mut data_ref) {
Ok(Suspend) => Self::suspend_channel(sender),
Ok(Resume) => Self::resume_channel(sender),
Err(_) => break,
}
}
} else {
// Record the fact we received it.
match status.binary_search_by_key(&sender, |item| item.0) {
Ok(i) => {
let count = status[i].2.len();
if count as u32 >= suspend_threshold && status[i].1 == InboundStatus::Ok {
status[i].1 = InboundStatus::Suspended;
let r = Self::send_signal(sender, ChannelSignal::Suspend);
if r.is_err() {
log::warn!("Attempt to suspend channel failed. Messages may be dropped.");
}
}
if (count as u32) < drop_threshold {
status[i].2.push((sent_at, format));
} else {
debug_assert!(false, "XCMP channel queue full. Silently dropping message");
}
},
Err(_) => status.push((sender, InboundStatus::Ok, vec![(sent_at, format)])),
}
// Queue the payload for later execution.
InboundXcmpMessages::insert(sender, sent_at, data_ref);
}
// Optimization note; it would make sense to execute messages immediately if
// `status.is_empty()` here.
}
status.sort();
InboundXcmpStatus::put(status);
Self::service_xcmp_queue(max_weight)
}
}
impl<T: Config> XcmpMessageSource for Module<T> {
fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
let mut statuses = OutboundXcmpStatus::get();
let old_statuses_len = statuses.len();
let max_message_count = statuses.len().min(maximum_channels);
let mut result = Vec::with_capacity(max_message_count);
for status in statuses.iter_mut() {
let (para_id, outbound_status, mut signalling, mut begin, mut end) = *status;
if result.len() == max_message_count {
// We check this condition in the beginning of the loop so that we don't include
// a message where the limit is 0.
break;
}
if outbound_status == OutboundStatus::Suspended {
continue
}
let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
ChannelStatus::Closed => {
// This means that there is no such channel anymore. Nothing to be done but
// swallow the messages and discard the status.
for i in begin..end {
OutboundXcmpMessages::remove(para_id, i);
}
if signalling {
SignalMessages::remove(para_id);
}
*status = (para_id, OutboundStatus::Ok, false, 0, 0);
continue
}
ChannelStatus::Full => continue,
ChannelStatus::Ready(n, e) => (n, e),
};
let page = if signalling {
let page = SignalMessages::get(para_id);
if page.len() < max_size_now {
SignalMessages::remove(para_id);
signalling = false;
page
} else {
continue
}
} else if end > begin {
let page = OutboundXcmpMessages::get(para_id, begin);
if page.len() < max_size_now {
OutboundXcmpMessages::remove(para_id, begin);
begin += 1;
page
} else {
continue
}
} else {
continue;
};
if begin == end {
begin = 0;
end = 0;
}
if page.len() > max_size_ever {
// TODO: #274 This means that the channel's max message size has changed since
// the message was sent. We should parse it and split into smaller mesasges but
// since it's so unlikely then for now we just drop it.
log::warn!("WARNING: oversize message in queue. silently dropping.");
} else {
result.push((para_id, page));
}
*status = (para_id, outbound_status, signalling, begin, end);
}
// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
// criteria requirement.
result.sort_by_key(|m| m.0);
// Prune hrmp channels that became empty. Additionally, because it may so happen that we
// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
// change the order. Otherwise, the next `on_finalize` we will again give attention
// only to those channels that happen to be in the beginning, until they are emptied.
// This leads to "starvation" of the channels near to the end.
//
// To mitigate this we shift all processed elements towards the end of the vector using
// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
statuses.retain(|x| x.1 == OutboundStatus::Suspended || x.2 || x.3 < x.4);
// old_status_len must be >= status.len() since we never add anything to status.
let pruned = old_statuses_len - statuses.len();
// removing an item from status implies a message being sent, so the result messages must
// be no less than the pruned channels.
statuses.rotate_left(result.len() - pruned);
OutboundXcmpStatus::put(statuses);
result
}
}
/// Xcm sender for sending to a sibling parachain.
impl<T: Config> SendXcm for Module<T> {
fn send_xcm(dest: MultiLocation, msg: Xcm<()>) -> Result<(), XcmError> {
match &dest {
// An HRMP message for a sibling parachain.
MultiLocation::X2(Junction::Parent, Junction::Parachain { id }) => {
let msg = VersionedXcm::<()>::from(msg);
let hash = T::Hashing::hash_of(&msg);
Self::send_fragment((*id).into(), XcmpMessageFormat::ConcatenatedVersionedXcm, msg)
.map_err(|e| XcmError::SendFailed(<&'static str>::from(e)))?;
Self::deposit_event(RawEvent::XcmpMessageSent(Some(hash)));
Ok(())
}
// Anything else is unhandled. This includes a message this is meant for us.
_ => Err(XcmError::CannotReachDestination(dest, msg)),
}
}
}