Upgrade pallets to FRAMEv2 (#404)

* Upgrade parachain info pallet to FRAMEv2

* Upgrade parachain system pallet to FRAMEv2

* Use Pallet<T> instead of Module<T>

* Upgrade XCMP queue pallet to FRAMEv2

* Correctly specify the metadata for events in xcmp-queue pallet

* Apply suggestions from code review

* Update pallets/parachain-system/src/tests.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Keith Yeung
2021-05-08 13:18:01 -07:00
committed by GitHub
parent 2641c5b830
commit 647a9e6df9
6 changed files with 1691 additions and 1500 deletions
+264 -193
View File
@@ -25,29 +25,145 @@
#![cfg_attr(not(feature = "std"), no_std)]
use sp_std::{prelude::*, convert::TryFrom};
use rand_chacha::{rand_core::{RngCore, SeedableRng}, ChaChaRng};
use codec::{Decode, Encode};
use sp_runtime::{RuntimeDebug, traits::Hash};
use frame_support::{decl_error, decl_event, decl_module, decl_storage, dispatch::Weight};
use xcm::{
VersionedXcm, v0::{
Error as XcmError, ExecuteXcm, Junction, MultiLocation, SendXcm, Outcome, Xcm,
},
};
use cumulus_primitives_core::{
XcmpMessageHandler, ParaId, XcmpMessageSource, ChannelStatus, MessageSendError, GetChannelInfo,
relay_chain::BlockNumber as RelayBlockNumber,
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
ParaId, XcmpMessageHandler, XcmpMessageSource,
};
use frame_support::weights::Weight;
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaChaRng,
};
use sp_runtime::{traits::Hash, RuntimeDebug};
use sp_std::{convert::TryFrom, prelude::*};
use xcm::{
v0::{Error as XcmError, ExecuteXcm, Junction, MultiLocation, Outcome, SendXcm, Xcm},
VersionedXcm,
};
pub trait Config: frame_system::Config {
type Event: From<Event<Self>> + Into<<Self as frame_system::Config>::Event>;
pub use pallet::*;
/// Something to execute an XCM message. We need this to service the XCMoXCMP queue.
type XcmExecutor: ExecuteXcm<Self::Call>;
#[frame_support::pallet]
pub mod pallet {
use super::*;
use frame_support::pallet_prelude::*;
use frame_system::pallet_prelude::*;
/// Information on the avaialble XCMP channels.
type ChannelInfo: GetChannelInfo;
#[pallet::pallet]
#[pallet::generate_store(pub(super) trait Store)]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config: frame_system::Config {
type Event: From<Event<Self>> + IsType<<Self as frame_system::Config>::Event>;
/// Something to execute an XCM message. We need this to service the XCMoXCMP queue.
type XcmExecutor: ExecuteXcm<Self::Call>;
/// Information on the avaialble XCMP channels.
type ChannelInfo: GetChannelInfo;
}
impl Default for QueueConfigData {
fn default() -> Self {
Self {
suspend_threshold: 2,
drop_threshold: 5,
resume_threshold: 1,
threshold_weight: 100_000,
weight_restrict_decay: 2,
}
}
}
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight {
// on_idle processes additional messages with any remaining block weight.
Self::service_xcmp_queue(max_weight)
}
}
#[pallet::call]
impl<T: Config> Pallet<T> {}
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
#[pallet::metadata(Option<T::Hash> = "Option<Hash>")]
pub enum Event<T: Config> {
/// Some XCM was executed ok.
Success(Option<T::Hash>),
/// Some XCM failed.
Fail(Option<T::Hash>, XcmError),
/// Bad XCM version used.
BadVersion(Option<T::Hash>),
/// Bad XCM format used.
BadFormat(Option<T::Hash>),
/// An upward message was sent to the relay chain.
UpwardMessageSent(Option<T::Hash>),
/// An HRMP message was sent to a sibling parachain.
XcmpMessageSent(Option<T::Hash>),
}
#[pallet::error]
pub enum Error<T> {
/// Failed to send XCM message.
FailedToSend,
/// Bad XCM origin.
BadXcmOrigin,
/// Bad XCM data.
BadXcm,
}
/// Status of the inbound XCMP channels.
#[pallet::storage]
pub(super) type InboundXcmpStatus<T: Config> = StorageValue<
_,
Vec<(
ParaId,
InboundStatus,
Vec<(RelayBlockNumber, XcmpMessageFormat)>,
)>,
ValueQuery,
>;
/// Inbound aggregate XCMP messages. It can only be one per ParaId/block.
#[pallet::storage]
pub(super) type InboundXcmpMessages<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
ParaId,
Twox64Concat,
RelayBlockNumber,
Vec<u8>,
ValueQuery,
>;
/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
/// and last outbound message. If the two indices are equal, then it indicates an empty
/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
/// case of the need to send a high-priority signal message this block.
/// The bool is true if there is a signal message waiting to be sent.
#[pallet::storage]
pub(super) type OutboundXcmpStatus<T: Config> =
StorageValue<_, Vec<(ParaId, OutboundStatus, bool, u16, u16)>, ValueQuery>;
// The new way of doing it:
/// The messages outbound in a given XCMP channel.
#[pallet::storage]
pub(super) type OutboundXcmpMessages<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec<u8>, ValueQuery>;
/// Any signal messages waiting to be sent.
#[pallet::storage]
pub(super) type SignalMessages<T: Config> =
StorageMap<_, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>;
/// The configuration which controls the dynamics of the outbound queue.
#[pallet::storage]
pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, RuntimeDebug)]
@@ -80,90 +196,6 @@ pub struct QueueConfigData {
weight_restrict_decay: Weight,
}
impl Default for QueueConfigData {
fn default() -> Self {
Self {
suspend_threshold: 2,
drop_threshold: 5,
resume_threshold: 1,
threshold_weight: 100_000,
weight_restrict_decay: 2,
}
}
}
decl_storage! {
trait Store for Module<T: Config> as XcmHandler {
/// Status of the inbound XCMP channels.
InboundXcmpStatus: Vec<(ParaId, InboundStatus, Vec<(RelayBlockNumber, XcmpMessageFormat)>)>;
/// Inbound aggregate XCMP messages. It can only be one per ParaId/block.
InboundXcmpMessages: double_map hasher(blake2_128_concat) ParaId,
hasher(twox_64_concat) RelayBlockNumber
=> Vec<u8>;
/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
/// and last outbound message. If the two indices are equal, then it indicates an empty
/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
/// case of the need to send a high-priority signal message this block.
/// The bool is true if there is a signal message waiting to be sent.
OutboundXcmpStatus: Vec<(ParaId, OutboundStatus, bool, u16, u16)>;
// The new way of doing it:
/// The messages outbound in a given XCMP channel.
OutboundXcmpMessages: double_map hasher(blake2_128_concat) ParaId,
hasher(twox_64_concat) u16 => Vec<u8>;
/// Any signal messages waiting to be sent.
SignalMessages: map hasher(blake2_128_concat) ParaId => Vec<u8>;
/// The configuration which controls the dynamics of the outbound queue.
QueueConfig: QueueConfigData;
}
}
decl_event! {
pub enum Event<T> where Hash = <T as frame_system::Config>::Hash {
/// Some XCM was executed ok.
Success(Option<Hash>),
/// Some XCM failed.
Fail(Option<Hash>, XcmError),
/// Bad XCM version used.
BadVersion(Option<Hash>),
/// Bad XCM format used.
BadFormat(Option<Hash>),
/// An upward message was sent to the relay chain.
UpwardMessageSent(Option<Hash>),
/// An HRMP message was sent to a sibling parachain.
XcmpMessageSent(Option<Hash>),
}
}
decl_error! {
pub enum Error for Module<T: Config> {
/// Failed to send XCM message.
FailedToSend,
/// Bad XCM origin.
BadXcmOrigin,
/// Bad XCM data.
BadXcm,
}
}
decl_module! {
pub struct Module<T: Config> for enum Call where origin: T::Origin {
type Error = Error<T>;
fn deposit_event() = default;
fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight {
// on_idle processes additional messages with any remaining block weight.
Self::service_xcmp_queue(max_weight)
}
}
}
#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode)]
pub enum ChannelSignal {
Suspend,
@@ -182,7 +214,7 @@ pub enum XcmpMessageFormat {
Signals,
}
impl<T: Config> Module<T> {
impl<T: Config> Pallet<T> {
/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
///
/// Format is the type of aggregate message that the `fragment` may be safely encoded and
@@ -213,25 +245,32 @@ impl<T: Config> Module<T> {
// Optimization note: `max_message_size` could potentially be stored in
// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
let max_message_size = T::ChannelInfo::get_channel_max(recipient)
.ok_or(MessageSendError::NoChannel)?;
let max_message_size =
T::ChannelInfo::get_channel_max(recipient).ok_or(MessageSendError::NoChannel)?;
if data.len() > max_message_size {
return Err(MessageSendError::TooBig);
}
let mut s = OutboundXcmpStatus::get();
let index = s.iter().position(|item| item.0 == recipient)
let mut s = <OutboundXcmpStatus<T>>::get();
let index = s
.iter()
.position(|item| item.0 == recipient)
.unwrap_or_else(|| {
s.push((recipient, OutboundStatus::Ok, false, 0, 0));
s.len() - 1
});
let have_active = s[index].4 > s[index].3;
let appended = have_active && OutboundXcmpMessages::mutate(recipient, s[index].4 - 1, |s| {
if XcmpMessageFormat::decode(&mut &s[..]) != Ok(format) { return false }
if s.len() + data.len() > max_message_size { return false }
s.extend_from_slice(&data[..]);
return true
});
let appended = have_active
&& <OutboundXcmpMessages<T>>::mutate(recipient, s[index].4 - 1, |s| {
if XcmpMessageFormat::decode(&mut &s[..]) != Ok(format) {
return false;
}
if s.len() + data.len() > max_message_size {
return false;
}
s.extend_from_slice(&data[..]);
return true;
});
if appended {
Ok((s[index].4 - s[index].3 - 1) as u32)
} else {
@@ -240,9 +279,9 @@ impl<T: Config> Module<T> {
s[index].4 += 1;
let mut new_page = format.encode();
new_page.extend_from_slice(&data[..]);
OutboundXcmpMessages::insert(recipient, page_index, new_page);
<OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page);
let r = (s[index].4 - s[index].3 - 1) as u32;
OutboundXcmpStatus::put(s);
<OutboundXcmpStatus<T>>::put(s);
Ok(r)
}
}
@@ -250,26 +289,25 @@ impl<T: Config> Module<T> {
/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
/// block.
fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), ()> {
let mut s = OutboundXcmpStatus::get();
let mut s = <OutboundXcmpStatus<T>>::get();
if let Some(index) = s.iter().position(|item| item.0 == dest) {
s[index].2 = true;
} else {
s.push((dest, OutboundStatus::Ok, true, 0, 0));
}
SignalMessages::mutate(dest, |page| if page.is_empty() {
*page = (XcmpMessageFormat::Signals, signal).encode();
} else {
signal.using_encoded(|s| page.extend_from_slice(s));
<SignalMessages<T>>::mutate(dest, |page| {
if page.is_empty() {
*page = (XcmpMessageFormat::Signals, signal).encode();
} else {
signal.using_encoded(|s| page.extend_from_slice(s));
}
});
OutboundXcmpStatus::put(s);
<OutboundXcmpStatus<T>>::put(s);
Ok(())
}
pub fn send_blob_message(
recipient: ParaId,
blob: Vec<u8>,
) -> Result<u32, MessageSendError> {
pub fn send_blob_message(recipient: ParaId, blob: Vec<u8>) -> Result<u32, MessageSendError> {
Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedEncodedBlob, blob)
}
@@ -284,8 +322,10 @@ impl<T: Config> Module<T> {
// Create a shuffled order for use to iterate through.
// Not a great random seed, but good enough for our purposes.
let seed = frame_system::Pallet::<T>::parent_hash();
let seed = <[u8; 32]>::decode(&mut sp_runtime::traits::TrailingZeroInput::new(seed.as_ref()))
.expect("input is padded with zeroes; qed");
let seed = <[u8; 32]>::decode(&mut sp_runtime::traits::TrailingZeroInput::new(
seed.as_ref(),
))
.expect("input is padded with zeroes; qed");
let mut rng = ChaChaRng::from_seed(seed);
let mut shuffled = (0..len).collect::<Vec<_>>();
for i in 0..len {
@@ -297,7 +337,12 @@ impl<T: Config> Module<T> {
shuffled
}
fn handle_blob_message(_sender: ParaId, _sent_at: RelayBlockNumber, _blob: Vec<u8>, _weight_limit: Weight) -> Result<Weight, bool> {
fn handle_blob_message(
_sender: ParaId,
_sent_at: RelayBlockNumber,
_blob: Vec<u8>,
_weight_limit: Weight,
) -> Result<Weight, bool> {
debug_assert!(false, "Blob messages not handled.");
Err(false)
}
@@ -312,23 +357,19 @@ impl<T: Config> Module<T> {
log::debug!("Processing XCMP-XCM: {:?}", &hash);
let (result, event) = match Xcm::<T::Call>::try_from(xcm) {
Ok(xcm) => {
let location = (
Junction::Parent,
Junction::Parachain(sender.into()),
);
match T::XcmExecutor::execute_xcm(
location.into(),
xcm,
max_weight,
) {
Outcome::Error(e) => (Err(e.clone()), RawEvent::Fail(Some(hash), e)),
Outcome::Complete(w) => (Ok(w), RawEvent::Success(Some(hash))),
let location = (Junction::Parent, Junction::Parachain(sender.into()));
match T::XcmExecutor::execute_xcm(location.into(), xcm, max_weight) {
Outcome::Error(e) => (Err(e.clone()), Event::Fail(Some(hash), e)),
Outcome::Complete(w) => (Ok(w), Event::Success(Some(hash))),
// As far as the caller is concerned, this was dispatched without error, so
// we just report the weight used.
Outcome::Incomplete(w, e) => (Ok(w), RawEvent::Fail(Some(hash), e)),
Outcome::Incomplete(w, e) => (Ok(w), Event::Fail(Some(hash), e)),
}
}
Err(()) => (Err(XcmError::UnhandledXcmVersion), RawEvent::BadVersion(Some(hash))),
Err(()) => (
Err(XcmError::UnhandledXcmVersion),
Event::BadVersion(Some(hash)),
),
};
Self::deposit_event(event);
result
@@ -339,7 +380,7 @@ impl<T: Config> Module<T> {
(sent_at, format): (RelayBlockNumber, XcmpMessageFormat),
max_weight: Weight,
) -> (Weight, bool) {
let data = InboundXcmpMessages::get(sender, sent_at);
let data = <InboundXcmpMessages<T>>::get(sender, sent_at);
let mut last_remaining_fragments;
let mut remaining_fragments = &data[..];
let mut weight_used = 0;
@@ -397,9 +438,9 @@ impl<T: Config> Module<T> {
}
let is_empty = remaining_fragments.is_empty();
if is_empty {
InboundXcmpMessages::remove(sender, sent_at);
<InboundXcmpMessages<T>>::remove(sender, sent_at);
} else {
InboundXcmpMessages::insert(sender, sent_at, remaining_fragments);
<InboundXcmpMessages<T>>::insert(sender, sent_at, remaining_fragments);
}
(weight_used, is_empty)
}
@@ -432,9 +473,9 @@ impl<T: Config> Module<T> {
/// for the second &c. though empirical and or practical factors may give rise to adjusting it
/// further.
fn service_xcmp_queue(max_weight: Weight) -> Weight {
let mut status = InboundXcmpStatus::get(); // <- sorted.
let mut status = <InboundXcmpStatus<T>>::get(); // <- sorted.
if status.len() == 0 {
return 0
return 0;
}
let QueueConfigData {
@@ -442,7 +483,7 @@ impl<T: Config> Module<T> {
threshold_weight,
weight_restrict_decay,
..
} = QueueConfig::get();
} = <QueueConfig<T>>::get();
let mut shuffled = Self::create_shuffle(status.len());
let mut weight_used = 0;
@@ -457,7 +498,9 @@ impl<T: Config> Module<T> {
// send more, heavier messages.
let mut shuffle_index = 0;
while shuffle_index < shuffled.len() && max_weight.saturating_sub(weight_used) >= threshold_weight {
while shuffle_index < shuffled.len()
&& max_weight.saturating_sub(weight_used) >= threshold_weight
{
let index = shuffled[shuffle_index];
let sender = status[index].0;
@@ -466,7 +509,8 @@ impl<T: Config> Module<T> {
// first round. For the second round we unlock all weight. If we come close enough
// on the first round to unlocking everything, then we do so.
if shuffle_index < status.len() {
weight_available += (max_weight - weight_available) / (weight_restrict_decay + 1);
weight_available +=
(max_weight - weight_available) / (weight_restrict_decay + 1);
if weight_available + threshold_weight > max_weight {
weight_available = max_weight;
}
@@ -476,16 +520,16 @@ impl<T: Config> Module<T> {
}
let weight_processed = if status[index].2.is_empty() {
debug_assert!(false, "channel exists in status; there must be messages; qed");
debug_assert!(
false,
"channel exists in status; there must be messages; qed"
);
0
} else {
// Process up to one block's worth for now.
let weight_remaining = weight_available.saturating_sub(weight_used);
let (weight_processed, is_empty) = Self::process_xcmp_message(
sender,
status[index].2[0],
weight_remaining,
);
let (weight_processed, is_empty) =
Self::process_xcmp_message(sender, status[index].2[0], weight_remaining);
if is_empty {
status[index].2.remove(0);
}
@@ -493,20 +537,26 @@ impl<T: Config> Module<T> {
};
weight_used += weight_processed;
if status[index].2.len() as u32 <= resume_threshold && status[index].1 == InboundStatus::Suspended {
if status[index].2.len() as u32 <= resume_threshold
&& status[index].1 == InboundStatus::Suspended
{
// Resume
let r = Self::send_signal(sender, ChannelSignal::Resume);
debug_assert!(r.is_ok(), "WARNING: Failed sending resume into suspended channel");
debug_assert!(
r.is_ok(),
"WARNING: Failed sending resume into suspended channel"
);
status[index].1 = InboundStatus::Ok;
}
// If there are more and we're making progress, we process them after we've given the
// other channels a look in. If we've still not unlocked all weight, then we set them
// up for processing a second time anyway.
if !status[index].2.is_empty() && weight_processed > 0 || weight_available != max_weight {
if !status[index].2.is_empty() && weight_processed > 0 || weight_available != max_weight
{
if shuffle_index + 1 == shuffled.len() {
// Only this queue left. Just run around this loop once more.
continue
continue;
}
shuffled.push(index);
}
@@ -516,12 +566,12 @@ impl<T: Config> Module<T> {
// Only retain the senders that have non-empty queues.
status.retain(|item| !item.2.is_empty());
InboundXcmpStatus::put(status);
<InboundXcmpStatus<T>>::put(status);
weight_used
}
fn suspend_channel(target: ParaId) {
OutboundXcmpStatus::mutate(|s| {
<OutboundXcmpStatus<T>>::mutate(|s| {
if let Some(index) = s.iter().position(|item| item.0 == target) {
let ok = s[index].1 == OutboundStatus::Ok;
debug_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
@@ -533,41 +583,53 @@ impl<T: Config> Module<T> {
}
fn resume_channel(target: ParaId) {
OutboundXcmpStatus::mutate(|s| {
<OutboundXcmpStatus<T>>::mutate(|s| {
if let Some(index) = s.iter().position(|item| item.0 == target) {
let suspended = s[index].1 == OutboundStatus::Suspended;
debug_assert!(suspended, "WARNING: Attempt to resume channel that was not suspended.");
debug_assert!(
suspended,
"WARNING: Attempt to resume channel that was not suspended."
);
if s[index].3 == s[index].4 {
s.remove(index);
} else {
s[index].1 = OutboundStatus::Ok;
}
} else {
debug_assert!(false, "WARNING: Attempt to resume channel that was not suspended.");
debug_assert!(
false,
"WARNING: Attempt to resume channel that was not suspended."
);
}
});
}
}
impl<T: Config> XcmpMessageHandler for Module<T> {
fn handle_xcmp_messages<'a, I: Iterator<Item=(ParaId, RelayBlockNumber, &'a [u8])>>(
impl<T: Config> XcmpMessageHandler for Pallet<T> {
fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
iter: I,
max_weight: Weight,
) -> Weight {
let mut status = InboundXcmpStatus::get();
let mut status = <InboundXcmpStatus<T>>::get();
let QueueConfigData { suspend_threshold, drop_threshold, .. } = QueueConfig::get();
let QueueConfigData {
suspend_threshold,
drop_threshold,
..
} = <QueueConfig<T>>::get();
for (sender, sent_at, data) in iter {
// Figure out the message format.
let mut data_ref = data;
let format = match XcmpMessageFormat::decode(&mut data_ref) {
Ok(f) => f,
Err(_) => {
debug_assert!(false, "Unknown XCMP message format. Silently dropping message");
continue
},
debug_assert!(
false,
"Unknown XCMP message format. Silently dropping message"
);
continue;
}
};
if format == XcmpMessageFormat::Signals {
while !data_ref.is_empty() {
@@ -587,34 +649,39 @@ impl<T: Config> XcmpMessageHandler for Module<T> {
status[i].1 = InboundStatus::Suspended;
let r = Self::send_signal(sender, ChannelSignal::Suspend);
if r.is_err() {
log::warn!("Attempt to suspend channel failed. Messages may be dropped.");
log::warn!(
"Attempt to suspend channel failed. Messages may be dropped."
);
}
}
if (count as u32) < drop_threshold {
status[i].2.push((sent_at, format));
} else {
debug_assert!(false, "XCMP channel queue full. Silently dropping message");
debug_assert!(
false,
"XCMP channel queue full. Silently dropping message"
);
}
},
}
Err(_) => status.push((sender, InboundStatus::Ok, vec![(sent_at, format)])),
}
// Queue the payload for later execution.
InboundXcmpMessages::insert(sender, sent_at, data_ref);
<InboundXcmpMessages<T>>::insert(sender, sent_at, data_ref);
}
// Optimization note; it would make sense to execute messages immediately if
// `status.is_empty()` here.
}
status.sort();
InboundXcmpStatus::put(status);
<InboundXcmpStatus<T>>::put(status);
Self::service_xcmp_queue(max_weight)
}
}
impl<T: Config> XcmpMessageSource for Module<T> {
impl<T: Config> XcmpMessageSource for Pallet<T> {
fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
let mut statuses = OutboundXcmpStatus::get();
let mut statuses = <OutboundXcmpStatus<T>>::get();
let old_statuses_len = statuses.len();
let max_message_count = statuses.len().min(maximum_channels);
let mut result = Vec::with_capacity(max_message_count);
@@ -628,42 +695,42 @@ impl<T: Config> XcmpMessageSource for Module<T> {
break;
}
if outbound_status == OutboundStatus::Suspended {
continue
continue;
}
let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
ChannelStatus::Closed => {
// This means that there is no such channel anymore. Nothing to be done but
// swallow the messages and discard the status.
for i in begin..end {
OutboundXcmpMessages::remove(para_id, i);
<OutboundXcmpMessages<T>>::remove(para_id, i);
}
if signalling {
SignalMessages::remove(para_id);
<SignalMessages<T>>::remove(para_id);
}
*status = (para_id, OutboundStatus::Ok, false, 0, 0);
continue
continue;
}
ChannelStatus::Full => continue,
ChannelStatus::Ready(n, e) => (n, e),
};
let page = if signalling {
let page = SignalMessages::get(para_id);
let page = <SignalMessages<T>>::get(para_id);
if page.len() < max_size_now {
SignalMessages::remove(para_id);
<SignalMessages<T>>::remove(para_id);
signalling = false;
page
} else {
continue
continue;
}
} else if end > begin {
let page = OutboundXcmpMessages::get(para_id, begin);
let page = <OutboundXcmpMessages<T>>::get(para_id, begin);
if page.len() < max_size_now {
OutboundXcmpMessages::remove(para_id, begin);
<OutboundXcmpMessages<T>>::remove(para_id, begin);
begin += 1;
page
} else {
continue
continue;
}
} else {
continue;
@@ -705,23 +772,27 @@ impl<T: Config> XcmpMessageSource for Module<T> {
// be no less than the pruned channels.
statuses.rotate_left(result.len() - pruned);
OutboundXcmpStatus::put(statuses);
<OutboundXcmpStatus<T>>::put(statuses);
result
}
}
/// Xcm sender for sending to a sibling parachain.
impl<T: Config> SendXcm for Module<T> {
impl<T: Config> SendXcm for Pallet<T> {
fn send_xcm(dest: MultiLocation, msg: Xcm<()>) -> Result<(), XcmError> {
match &dest {
// An HRMP message for a sibling parachain.
MultiLocation::X2(Junction::Parent, Junction::Parachain(id)) => {
let msg = VersionedXcm::<()>::from(msg);
let hash = T::Hashing::hash_of(&msg);
Self::send_fragment((*id).into(), XcmpMessageFormat::ConcatenatedVersionedXcm, msg)
.map_err(|e| XcmError::SendFailed(<&'static str>::from(e)))?;
Self::deposit_event(RawEvent::XcmpMessageSent(Some(hash)));
Self::send_fragment(
(*id).into(),
XcmpMessageFormat::ConcatenatedVersionedXcm,
msg,
)
.map_err(|e| XcmError::SendFailed(<&'static str>::from(e)))?;
Self::deposit_event(Event::XcmpMessageSent(Some(hash)));
Ok(())
}
// Anything else is unhandled. This includes a message this is meant for us.