mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 15:51:12 +00:00
[FRAME] Use 'ready' pages in XCMP suspend logic (#2393)
Changes: - `QueueFootprint` gets a new field; `ready_pages` that contains the non-overweight and not yet processed pages. - `XCMP` queue pallet is change to use the `ready_pages` instead of `pages` to calculate the channel suspension thresholds. This should give the XCMP queue pallet a more correct view of when to suspend channels. --------- Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
This commit is contained in:
committed by
GitHub
parent
c367ac2488
commit
329c077236
@@ -600,7 +600,7 @@ impl<T: Config> Pallet<T> {
|
||||
let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
|
||||
let fp = T::XcmpQueue::footprint(sender);
|
||||
// Assume that it will not fit into the current page:
|
||||
let new_pages = fp.pages.saturating_add(1);
|
||||
let new_pages = fp.ready_pages.saturating_add(1);
|
||||
if new_pages > drop_threshold {
|
||||
// This should not happen since the channel should have been suspended in
|
||||
// [`on_queue_changed`].
|
||||
@@ -663,12 +663,12 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
|
||||
let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
|
||||
let suspended = suspended_channels.contains(¶);
|
||||
|
||||
if suspended && fp.pages <= resume_threshold {
|
||||
if suspended && fp.ready_pages <= resume_threshold {
|
||||
Self::send_signal(para, ChannelSignal::Resume);
|
||||
|
||||
suspended_channels.remove(¶);
|
||||
<InboundXcmpSuspended<T>>::put(suspended_channels);
|
||||
} else if !suspended && fp.pages >= suspend_threshold {
|
||||
} else if !suspended && fp.ready_pages >= suspend_threshold {
|
||||
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
|
||||
Self::send_signal(para, ChannelSignal::Suspend);
|
||||
|
||||
|
||||
@@ -247,6 +247,7 @@ impl<T: OnQueueChanged<ParaId>> EnqueueMessage<ParaId> for EnqueueToLocalStorage
|
||||
}
|
||||
}
|
||||
footprint.pages = footprint.storage.size as u32 / 16; // Number does not matter
|
||||
footprint.ready_pages = footprint.pages;
|
||||
footprint
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
title: "[XCMP] Use the number of 'ready' pages in XCMP suspend logic"
|
||||
|
||||
doc:
|
||||
- audience: Runtime Dev
|
||||
description: |
|
||||
Semantics of the suspension logic in the XCMP queue pallet change from using the number of
|
||||
total pages to the number of 'ready' pages. The number of ready pages is now also exposed by
|
||||
the `MessageQueue` pallet to downstream via the queue `footprint`.
|
||||
|
||||
crates:
|
||||
- name: cumulus-pallet-xcmp-queue
|
||||
bump: patch
|
||||
- name: pallet-message-queue
|
||||
bump: patch
|
||||
- name: frame-support
|
||||
bump: major
|
||||
@@ -330,6 +330,11 @@ fn process_some_messages(num_msgs: u32) {
|
||||
ServiceWeight::set(Some(weight));
|
||||
let consumed = next_block();
|
||||
|
||||
for origin in BookStateFor::<Test>::iter_keys() {
|
||||
let fp = MessageQueue::footprint(origin);
|
||||
assert_eq!(fp.pages, fp.ready_pages);
|
||||
}
|
||||
|
||||
assert_eq!(consumed, weight, "\n{}", MessageQueue::debug_info());
|
||||
assert_eq!(NumMessagesProcessed::take(), num_msgs as usize);
|
||||
}
|
||||
|
||||
@@ -208,8 +208,9 @@ use frame_support::{
|
||||
defensive,
|
||||
pallet_prelude::*,
|
||||
traits::{
|
||||
Defensive, DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint,
|
||||
ProcessMessage, ProcessMessageError, QueueFootprint, QueuePausedQuery, ServiceQueues,
|
||||
Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
|
||||
ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
|
||||
QueuePausedQuery, ServiceQueues,
|
||||
},
|
||||
BoundedSlice, CloneNoBound, DefaultNoBound,
|
||||
};
|
||||
@@ -442,6 +443,7 @@ impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
|
||||
fn from(book: BookState<MessageOrigin>) -> Self {
|
||||
QueueFootprint {
|
||||
pages: book.count,
|
||||
ready_pages: book.end.defensive_saturating_sub(book.begin),
|
||||
storage: Footprint { count: book.message_count, size: book.size },
|
||||
}
|
||||
}
|
||||
@@ -1281,6 +1283,9 @@ impl<T: Config> Pallet<T> {
|
||||
ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
|
||||
ensure!(book.size < 1 << 30, "Likely overflow or corruption");
|
||||
ensure!(book.count < 1 << 30, "Likely overflow or corruption");
|
||||
|
||||
let fp: QueueFootprint = book.into();
|
||||
ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
|
||||
}
|
||||
|
||||
//loop around this origin
|
||||
|
||||
@@ -355,8 +355,8 @@ pub fn num_overweight_enqueued_events() -> u32 {
|
||||
.count() as u32
|
||||
}
|
||||
|
||||
pub fn fp(pages: u32, count: u64, size: u64) -> QueueFootprint {
|
||||
QueueFootprint { storage: Footprint { count, size }, pages }
|
||||
pub fn fp(pages: u32, ready_pages: u32, count: u64, size: u64) -> QueueFootprint {
|
||||
QueueFootprint { storage: Footprint { count, size }, pages, ready_pages }
|
||||
}
|
||||
|
||||
/// A random seed that can be overwritten with `MQ_SEED`.
|
||||
|
||||
@@ -1064,13 +1064,13 @@ fn footprint_num_pages_works() {
|
||||
MessageQueue::enqueue_message(msg("weight=2"), Here);
|
||||
MessageQueue::enqueue_message(msg("weight=3"), Here);
|
||||
|
||||
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 16));
|
||||
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 2, 16));
|
||||
|
||||
// Mark the messages as overweight.
|
||||
assert_eq!(MessageQueue::service_queues(1.into_weight()), 0.into_weight());
|
||||
assert_eq!(System::events().len(), 2);
|
||||
// Overweight does not change the footprint.
|
||||
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 16));
|
||||
// `ready_pages` decreases but `page` count does not.
|
||||
assert_eq!(MessageQueue::footprint(Here), fp(2, 0, 2, 16));
|
||||
|
||||
// Now execute the second message.
|
||||
assert_eq!(
|
||||
@@ -1078,7 +1078,7 @@ fn footprint_num_pages_works() {
|
||||
.unwrap(),
|
||||
3.into_weight()
|
||||
);
|
||||
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 8));
|
||||
assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 8));
|
||||
// And the first one:
|
||||
assert_eq!(
|
||||
<MessageQueue as ServiceQueues>::execute_overweight(2.into_weight(), (Here, 0, 0))
|
||||
@@ -1086,6 +1086,11 @@ fn footprint_num_pages_works() {
|
||||
2.into_weight()
|
||||
);
|
||||
assert_eq!(MessageQueue::footprint(Here), Default::default());
|
||||
assert_eq!(MessageQueue::footprint(Here), fp(0, 0, 0, 0));
|
||||
|
||||
// `ready_pages` and normal `pages` increases again:
|
||||
MessageQueue::enqueue_message(msg("weight=3"), Here);
|
||||
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 1, 8));
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -123,6 +123,8 @@ impl<OverweightAddr> ServiceQueues for NoopServiceQueues<OverweightAddr> {
|
||||
pub struct QueueFootprint {
|
||||
/// The number of pages in the queue (including overweight pages).
|
||||
pub pages: u32,
|
||||
/// The number of pages that are ready (not yet processed and also not overweight).
|
||||
pub ready_pages: u32,
|
||||
/// The storage footprint of the queue (including overweight messages).
|
||||
pub storage: Footprint,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user