mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-17 10:11:03 +00:00
[Runtime] Bound XCMP queue (#2302)
Remove `without_storage_info` from the XCMP queue pallet. Part of https://github.com/paritytech/polkadot-sdk/issues/323 Changes: - Limit the number of channels that can be suspended at the same time. - Limit the number of channels that can have messages or signals pending at the same time. A No-OP migration is put in place to ensure that all `BoundedVec`s still decode and not truncate after upgrade. The storage version is thereby bumped to 4 to have our tooling remind us to deploy that migration. --------- Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io> Co-authored-by: Francisco Aguirre <franciscoaguirreperez@gmail.com>
This commit is contained in:
committed by
GitHub
parent
a664908304
commit
b8f55d1b76
@@ -51,7 +51,7 @@ pub mod weights;
|
||||
pub use weights::WeightInfo;
|
||||
|
||||
use bounded_collections::BoundedBTreeSet;
|
||||
use codec::{Decode, DecodeLimit, Encode};
|
||||
use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
|
||||
use cumulus_primitives_core::{
|
||||
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
|
||||
ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
|
||||
@@ -105,7 +105,6 @@ pub mod pallet {
|
||||
|
||||
#[pallet::pallet]
|
||||
#[pallet::storage_version(migration::STORAGE_VERSION)]
|
||||
#[pallet::without_storage_info]
|
||||
pub struct Pallet<T>(_);
|
||||
|
||||
#[pallet::config]
|
||||
@@ -132,6 +131,25 @@ pub mod pallet {
|
||||
#[pallet::constant]
|
||||
type MaxInboundSuspended: Get<u32>;
|
||||
|
||||
/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
|
||||
///
|
||||
/// If this is reached, then no further messages can be sent to channels that do not yet
|
||||
/// have a message queued. This should be set to the expected maximum of outbound channels
|
||||
/// which is determined by [`Self::ChannelInfo`]. It is important to set this correctly,
|
||||
/// since otherwise the congestion control protocol will not work as intended and messages
|
||||
/// may be dropped. This value increases the PoV and should therefore not be picked too
|
||||
/// high.
|
||||
#[pallet::constant]
|
||||
type MaxActiveOutboundChannels: Get<u32>;
|
||||
|
||||
/// The maximal page size for HRMP message pages.
|
||||
///
|
||||
/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
|
||||
/// benchmarking. The limit for the size of a message is slightly below this, since some
|
||||
/// overhead is incurred for encoding the format.
|
||||
#[pallet::constant]
|
||||
type MaxPageSize: Get<u32>;
|
||||
|
||||
/// The origin that is allowed to resume or suspend the XCMP queue.
|
||||
type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
|
||||
|
||||
@@ -276,6 +294,10 @@ pub mod pallet {
|
||||
AlreadySuspended,
|
||||
/// The execution is already resumed.
|
||||
AlreadyResumed,
|
||||
/// There are too many active outbound channels.
|
||||
TooManyOutboundChannels,
|
||||
/// The message is too big.
|
||||
TooBig,
|
||||
}
|
||||
|
||||
/// The suspended inbound XCMP channels. All others are not suspended.
|
||||
@@ -297,19 +319,28 @@ pub mod pallet {
|
||||
/// case of the need to send a high-priority signal message this block.
|
||||
/// The bool is true if there is a signal message waiting to be sent.
|
||||
#[pallet::storage]
|
||||
pub(super) type OutboundXcmpStatus<T: Config> =
|
||||
StorageValue<_, Vec<OutboundChannelDetails>, ValueQuery>;
|
||||
pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
|
||||
_,
|
||||
BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
|
||||
ValueQuery,
|
||||
>;
|
||||
|
||||
// The new way of doing it:
|
||||
/// The messages outbound in a given XCMP channel.
|
||||
#[pallet::storage]
|
||||
pub(super) type OutboundXcmpMessages<T: Config> =
|
||||
StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec<u8>, ValueQuery>;
|
||||
pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
|
||||
_,
|
||||
Blake2_128Concat,
|
||||
ParaId,
|
||||
Twox64Concat,
|
||||
u16,
|
||||
BoundedVec<u8, T::MaxPageSize>,
|
||||
ValueQuery,
|
||||
>;
|
||||
|
||||
/// Any signal messages waiting to be sent.
|
||||
#[pallet::storage]
|
||||
pub(super) type SignalMessages<T: Config> =
|
||||
StorageMap<_, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>;
|
||||
StorageMap<_, Blake2_128Concat, ParaId, BoundedVec<u8, T::MaxPageSize>, ValueQuery>;
|
||||
|
||||
/// The configuration which controls the dynamics of the outbound queue.
|
||||
#[pallet::storage]
|
||||
@@ -331,15 +362,14 @@ pub mod pallet {
|
||||
StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
|
||||
pub enum OutboundState {
|
||||
Ok,
|
||||
Suspended,
|
||||
}
|
||||
|
||||
/// Struct containing detailed information about the outbound channel.
|
||||
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)]
|
||||
#[cfg_attr(feature = "std", derive(Debug))]
|
||||
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
|
||||
pub struct OutboundChannelDetails {
|
||||
/// The `ParaId` of the parachain that this channel is connected with.
|
||||
recipient: ParaId,
|
||||
@@ -375,7 +405,7 @@ impl OutboundChannelDetails {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
|
||||
pub struct QueueConfigData {
|
||||
/// The number of pages which must be in the queue for the other side to be told to suspend
|
||||
/// their sending.
|
||||
@@ -478,7 +508,9 @@ impl<T: Config> Pallet<T> {
|
||||
{
|
||||
details
|
||||
} else {
|
||||
all_channels.push(OutboundChannelDetails::new(recipient));
|
||||
all_channels
|
||||
.try_push(OutboundChannelDetails::new(recipient))
|
||||
.map_err(|_| MessageSendError::TooManyChannels)?;
|
||||
all_channels
|
||||
.last_mut()
|
||||
.expect("can't be empty; a new element was just pushed; qed")
|
||||
@@ -503,7 +535,7 @@ impl<T: Config> Pallet<T> {
|
||||
if page.len() + encoded_fragment.len() > max_message_size {
|
||||
return None
|
||||
}
|
||||
page.extend_from_slice(&encoded_fragment[..]);
|
||||
page.try_extend(encoded_fragment.iter().cloned()).ok()?;
|
||||
Some(page.len())
|
||||
},
|
||||
)
|
||||
@@ -521,7 +553,9 @@ impl<T: Config> Pallet<T> {
|
||||
new_page.extend_from_slice(&encoded_fragment[..]);
|
||||
let last_page_size = new_page.len();
|
||||
let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
|
||||
<OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page);
|
||||
let bounded_page =
|
||||
BoundedVec::try_from(new_page).map_err(|_| MessageSendError::TooBig)?;
|
||||
<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
|
||||
<OutboundXcmpStatus<T>>::put(all_channels);
|
||||
(number_of_pages, last_page_size)
|
||||
};
|
||||
@@ -543,17 +577,21 @@ impl<T: Config> Pallet<T> {
|
||||
|
||||
/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
|
||||
/// block.
|
||||
fn send_signal(dest: ParaId, signal: ChannelSignal) {
|
||||
fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
|
||||
let mut s = <OutboundXcmpStatus<T>>::get();
|
||||
if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
|
||||
details.signals_exist = true;
|
||||
} else {
|
||||
s.push(OutboundChannelDetails::new(dest).with_signals());
|
||||
s.try_push(OutboundChannelDetails::new(dest).with_signals())
|
||||
.map_err(|_| Error::<T>::TooManyOutboundChannels)?;
|
||||
}
|
||||
<SignalMessages<T>>::mutate(dest, |page| {
|
||||
*page = (XcmpMessageFormat::Signals, signal).encode();
|
||||
});
|
||||
|
||||
let page = BoundedVec::try_from((XcmpMessageFormat::Signals, signal).encode())
|
||||
.map_err(|_| Error::<T>::TooBig)?;
|
||||
|
||||
<SignalMessages<T>>::insert(dest, page);
|
||||
<OutboundXcmpStatus<T>>::put(s);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn suspend_channel(target: ParaId) {
|
||||
@@ -563,7 +601,13 @@ impl<T: Config> Pallet<T> {
|
||||
defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
|
||||
details.state = OutboundState::Suspended;
|
||||
} else {
|
||||
s.push(OutboundChannelDetails::new(target).with_suspended_state());
|
||||
if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
|
||||
// Nothing that we can do here. The outbound channel does not exist either, so
|
||||
// there should be no message going out as well. The next time that the channel
|
||||
// can be created it will again get the suspension from the remote side. It can
|
||||
// therefore result in a few lost messages, but should eventually self-repair.
|
||||
defensive!("Cannot pause channel; too many outbound channels");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -664,13 +708,17 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
|
||||
let suspended = suspended_channels.contains(¶);
|
||||
|
||||
if suspended && fp.pages <= resume_threshold {
|
||||
Self::send_signal(para, ChannelSignal::Resume);
|
||||
// If the resuming fails then it is not critical. We will retry in the future.
|
||||
let _ = Self::send_signal(para, ChannelSignal::Resume);
|
||||
|
||||
suspended_channels.remove(¶);
|
||||
<InboundXcmpSuspended<T>>::put(suspended_channels);
|
||||
} else if !suspended && fp.pages >= suspend_threshold {
|
||||
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
|
||||
Self::send_signal(para, ChannelSignal::Suspend);
|
||||
if let Err(_) = Self::send_signal(para, ChannelSignal::Suspend) {
|
||||
// It will retry if `drop_threshold` is not reached, but it could be too late.
|
||||
defensive!("Could not send suspension signal; future messages may be dropped.");
|
||||
}
|
||||
|
||||
if let Err(err) = suspended_channels.try_insert(para) {
|
||||
log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err);
|
||||
@@ -842,7 +890,7 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
|
||||
// since it's so unlikely then for now we just drop it.
|
||||
defensive!("WARNING: oversize message in queue - dropping");
|
||||
} else {
|
||||
result.push((para_id, page));
|
||||
result.push((para_id, page.into_inner()));
|
||||
}
|
||||
|
||||
let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
|
||||
@@ -890,7 +938,14 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
|
||||
let pruned = old_statuses_len - statuses.len();
|
||||
// removing an item from status implies a message being sent, so the result messages must
|
||||
// be no less than the pruned channels.
|
||||
statuses.rotate_left(result.len().saturating_sub(pruned));
|
||||
|
||||
// TODO <https://github.com/paritytech/parity-common/pull/800>
|
||||
{
|
||||
let mut statuses_inner = statuses.into_inner();
|
||||
statuses_inner.rotate_left(result.len().saturating_sub(pruned));
|
||||
statuses = BoundedVec::try_from(statuses_inner)
|
||||
.expect("Rotating does not change the length; it still fits; qed");
|
||||
}
|
||||
|
||||
<OutboundXcmpStatus<T>>::put(statuses);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user