process enqueued messages on idle (#3844)

This will make it possible to use remaining weight on idle for
processing enqueued messages.
More context here https://github.com/paritytech/polkadot-sdk/issues/3709

---------

Co-authored-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
Ermal Kaleci
2024-03-27 15:51:45 +01:00
committed by GitHub
parent 417c54c61c
commit 8342947b8e
30 changed files with 114 additions and 2 deletions
@@ -69,6 +69,7 @@ impl pallet_message_queue::Config for Test {
type HeapSize = HeapSize;
type MaxStale = MaxStale;
type ServiceWeight = ServiceWeight;
type IdleMaxServiceWeight = ();
type QueuePausedQuery = ();
}
@@ -148,6 +148,7 @@ impl pallet_message_queue::Config for Test {
type HeapSize = HeapSize;
type MaxStale = MaxStale;
type ServiceWeight = ServiceWeight;
type IdleMaxServiceWeight = ();
type QueuePausedQuery = ();
}
@@ -125,6 +125,7 @@ impl pallet_message_queue::Config for Test {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MaxWeight;
type IdleMaxServiceWeight = ();
type WeightInfo = ();
}
@@ -660,6 +660,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl parachain_info::Config for Runtime {}
@@ -641,6 +641,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl cumulus_pallet_aura_ext::Config for Runtime {}
@@ -387,6 +387,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl cumulus_pallet_aura_ext::Config for Runtime {}
@@ -348,6 +348,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl cumulus_pallet_aura_ext::Config for Runtime {}
@@ -423,6 +423,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl cumulus_pallet_aura_ext::Config for Runtime {}
@@ -318,6 +318,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl cumulus_pallet_aura_ext::Config for Runtime {}
@@ -301,6 +301,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl parachain_info::Config for Runtime {}
@@ -301,6 +301,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl parachain_info::Config for Runtime {}
@@ -212,6 +212,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl parachain_info::Config for Runtime {}
@@ -282,6 +282,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
type WeightInfo = weights::pallet_message_queue::WeightInfo<Runtime>;
}
@@ -282,6 +282,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
type WeightInfo = weights::pallet_message_queue::WeightInfo<Runtime>;
}
@@ -232,6 +232,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl cumulus_pallet_aura_ext::Config for Runtime {}
@@ -540,6 +540,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
}
impl cumulus_pallet_aura_ext::Config for Runtime {}
@@ -320,6 +320,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = ();
}
impl cumulus_pallet_aura_ext::Config for Runtime {}
+1
View File
@@ -365,6 +365,7 @@ impl pallet_message_queue::Config for Test {
type HeapSize = ConstU32<65536>;
type MaxStale = ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = ();
}
parameter_types! {
+1
View File
@@ -987,6 +987,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = MessageQueueHeapSize;
type MaxStale = MessageQueueMaxStale;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
#[cfg(not(feature = "runtime-benchmarks"))]
type MessageProcessor = MessageProcessor;
#[cfg(feature = "runtime-benchmarks")]
+1
View File
@@ -1190,6 +1190,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = MessageQueueHeapSize;
type MaxStale = MessageQueueMaxStale;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = MessageQueueServiceWeight;
#[cfg(not(feature = "runtime-benchmarks"))]
type MessageProcessor = MessageProcessor;
#[cfg(feature = "runtime-benchmarks")]
@@ -275,6 +275,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = MessageQueueHeapSize;
type MaxStale = MessageQueueMaxStale;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = ();
type MessageProcessor = MessageProcessor;
type QueueChangeHandler = ();
type QueuePausedQuery = ();
@@ -232,6 +232,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = MessageQueueHeapSize;
type MaxStale = MessageQueueMaxStale;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = ();
#[cfg(not(feature = "runtime-benchmarks"))]
type MessageProcessor = MessageProcessor;
#[cfg(feature = "runtime-benchmarks")]
+25
View File
@@ -0,0 +1,25 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
title: Add the ability for MessageQueue to process enqueued messages on idle
doc:
- audience: Runtime Dev
description: |
Add the option to use remaining weight on idle for processing enqueued messages.
This will increase the chances of the messages enqueued during inherent extrinsics to be processed in the same block.
New config types is added on the message-queue `Config` trait:
- `IdleMaxServiceWeight`
example:
```rust
parameter_types! {
// The maximum weight to be used from remaining weight for processing enqueued messages on idle
pub const IdleMaxServiceWeight: Weight = Some(Weight);
}
type IdleMaxServiceWeight = IdleMaxServiceWeight; // or `()` to not use this feature
```
crates:
- name: pallet-message-queue
+1
View File
@@ -1303,6 +1303,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = ConstU32<{ 64 * 1024 }>;
type MaxStale = ConstU32<128>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = ();
}
parameter_types! {
@@ -225,6 +225,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = MessageQueueHeapSize;
type MaxStale = MessageQueueMaxStale;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = ();
type MessageProcessor = MessageProcessor;
type QueueChangeHandler = ();
type WeightInfo = ();
@@ -73,6 +73,7 @@ impl Config for Test {
type HeapSize = HeapSize;
type MaxStale = MaxStale;
type ServiceWeight = ServiceWeight;
type IdleMaxServiceWeight = ();
}
/// Simulates heavy usage by enqueueing and processing large amounts of messages.
+20 -2
View File
@@ -525,12 +525,21 @@ pub mod pallet {
type MaxStale: Get<u32>;
/// The amount of weight (if any) which should be provided to the message queue for
/// servicing enqueued items.
/// servicing enqueued items `on_initialize`.
///
/// This may be legitimately `None` in the case that you will call
/// `ServiceQueues::service_queues` manually.
/// `ServiceQueues::service_queues` manually or set [`Self::IdleMaxServiceWeight`] to have
/// it run in `on_idle`.
#[pallet::constant]
type ServiceWeight: Get<Option<Weight>>;
/// The maximum amount of weight (if any) to be used from remaining weight `on_idle` which
/// should be provided to the message queue for servicing enqueued items `on_idle`.
/// Useful for parachains to process messages at the same block they are received.
///
/// If `None`, it will not call `ServiceQueues::service_queues` in `on_idle`.
#[pallet::constant]
type IdleMaxServiceWeight: Get<Option<Weight>>;
}
#[pallet::event]
@@ -643,6 +652,15 @@ pub mod pallet {
}
}
fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
// Make use of the remaining weight to process enqueued messages.
Self::service_queues(weight_limit.min(remaining_weight))
} else {
Weight::zero()
}
}
#[cfg(feature = "try-runtime")]
fn try_state(_: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
Self::do_try_state()
@@ -56,6 +56,7 @@ impl Config for Test {
type HeapSize = HeapSize;
type MaxStale = MaxStale;
type ServiceWeight = ServiceWeight;
type IdleMaxServiceWeight = ServiceWeight;
}
/// Mocked `WeightInfo` impl with allows to set the weight per call.
@@ -1838,3 +1838,45 @@ fn with_service_mutex_works() {
with_service_mutex(|| called = 3).unwrap();
assert_eq!(called, 3);
}
#[test]
fn process_enqueued_on_idle() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
// Some messages enqueued on previous block.
MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here);
assert_eq!(BookStateFor::<Test>::iter().count(), 1);
// Process enqueued messages from previous block.
Pallet::<Test>::on_initialize(1);
assert_eq!(
MessagesProcessed::take(),
vec![(b"a".to_vec(), Here), (b"ab".to_vec(), Here), (b"abc".to_vec(), Here),]
);
MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
assert_eq!(BookStateFor::<Test>::iter().count(), 2);
// Enough weight to process on idle.
Pallet::<Test>::on_idle(1, Weight::from_parts(100, 100));
assert_eq!(
MessagesProcessed::take(),
vec![(b"x".to_vec(), There), (b"xy".to_vec(), There), (b"xyz".to_vec(), There)]
);
})
}
#[test]
fn process_enqueued_on_idle_requires_enough_weight() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
Pallet::<Test>::on_initialize(1);
MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
assert_eq!(BookStateFor::<Test>::iter().count(), 1);
// Not enough weight to process on idle.
Pallet::<Test>::on_idle(1, Weight::from_parts(0, 0));
assert_eq!(MessagesProcessed::take(), vec![]);
})
}
+1
View File
@@ -414,6 +414,7 @@ impl pallet_message_queue::Config for Runtime {
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MessageQueueServiceWeight;
type IdleMaxServiceWeight = ();
}
impl cumulus_pallet_aura_ext::Config for Runtime {}