[FRAME] Message Queue use proper overweight limit (#1873)

Changes:
- Use a sensible limit for the overweight-cutoff of a single messages
instead of the full configured `ServiceWeight`.
- Add/Update tests

---------

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
This commit is contained in:
Oliver Tale-Yazdi
2023-10-19 12:02:13 +02:00
committed by GitHub
parent 21b32849db
commit 099ef8fe11
5 changed files with 282 additions and 22 deletions
@@ -17,6 +17,13 @@
//! Stress tests pallet-message-queue. Defines its own runtime config to use larger constants for
//! `HeapSize` and `MaxStale`.
//!
//! The tests in this file are ignored by default, since they are quite slow. You can run them
//! manually like this:
//!
//! ```sh
//! RUST_LOG=info cargo test -p pallet-message-queue --profile testnet -- --ignored
//! ```
#![cfg(test)]
@@ -96,9 +103,6 @@ impl Config for Test {
/// Simulates heavy usage by enqueueing and processing large amounts of messages.
///
/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p
/// pallet-message-queue -- --ignored`.
///
/// # Example output
///
/// ```pre
@@ -121,7 +125,7 @@ fn stress_test_enqueue_and_service() {
let max_queues = 10_000;
let max_messages_per_queue = 10_000;
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(42);
let mut rng = StdRng::seed_from_u64(43);
build_and_execute::<Test>(|| {
let mut msgs_remaining = 0;
@@ -145,9 +149,6 @@ fn stress_test_enqueue_and_service() {
/// Simulates heavy usage of the suspension logic via `Yield`.
///
/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p
/// pallet-message-queue -- --ignored`.
///
/// # Example output
///
/// ```pre
@@ -169,7 +170,7 @@ fn stress_test_queue_suspension() {
let max_messages_per_queue = 10_000;
let (max_suspend_per_block, max_resume_per_block) = (100, 50);
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(41);
let mut rng = StdRng::seed_from_u64(43);
build_and_execute::<Test>(|| {
let mut suspended = BTreeSet::<u32>::new();
+50 -4
View File
@@ -584,8 +584,9 @@ pub mod pallet {
}
/// Check all compile-time assumptions about [`crate::Config`].
#[cfg(test)]
fn integrity_test() {
assert!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
Self::do_integrity_test().expect("Pallet config is valid; qed")
}
}
@@ -759,6 +760,47 @@ impl<T: Config> Pallet<T> {
}
}
/// The maximal weight that a single message can consume.
///
/// Any message using more than this will be marked as permanently overweight and not
/// automatically re-attempted. Returns `None` if the servicing of a message cannot begin.
/// `Some(0)` means that only messages with no weight may be served.
fn max_message_weight(limit: Weight) -> Option<Weight> {
limit.checked_sub(&Self::single_msg_overhead())
}
/// The overhead of servicing a single message.
fn single_msg_overhead() -> Weight {
T::WeightInfo::bump_service_head()
.saturating_add(T::WeightInfo::service_queue_base())
.saturating_add(
T::WeightInfo::service_page_base_completion()
.max(T::WeightInfo::service_page_base_no_completion()),
)
.saturating_add(T::WeightInfo::service_page_item())
.saturating_add(T::WeightInfo::ready_ring_unknit())
}
/// Checks invariants of the pallet config.
///
/// The results of this can only be relied upon if the config values are set to constants.
#[cfg(test)]
fn do_integrity_test() -> Result<(), String> {
ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
if let Some(service) = T::ServiceWeight::get() {
if Self::max_message_weight(service).is_none() {
return Err(format!(
"ServiceWeight too low: {}. Must be at least {}",
service,
Self::single_msg_overhead(),
))
}
}
Ok(())
}
fn do_enqueue_message(
origin: &MessageOriginOf<T>,
message: BoundedSlice<u8, MaxMessageLenOf<T>>,
@@ -1360,10 +1402,14 @@ impl<T: Config> ServiceQueues for Pallet<T> {
type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
fn service_queues(weight_limit: Weight) -> Weight {
// The maximum weight that processing a single message may take.
let overweight_limit = weight_limit;
let mut weight = WeightMeter::with_limit(weight_limit);
// Get the maximum weight that processing a single message may take:
let max_weight = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
defensive!("Not enough weight to service a single message.");
Weight::zero()
});
let mut next = match Self::bump_service_head(&mut weight) {
Some(h) => h,
None => return weight.consumed(),
@@ -1374,7 +1420,7 @@ impl<T: Config> ServiceQueues for Pallet<T> {
let mut last_no_progress = None;
loop {
let (progressed, n) = Self::service_queue(next.clone(), &mut weight, overweight_limit);
let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight);
next = match n {
Some(n) =>
if !progressed {
+26 -10
View File
@@ -71,7 +71,7 @@ impl frame_system::Config for Test {
parameter_types! {
pub const HeapSize: u32 = 24;
pub const MaxStale: u32 = 2;
pub const ServiceWeight: Option<Weight> = Some(Weight::from_parts(10, 10));
pub const ServiceWeight: Option<Weight> = Some(Weight::from_parts(100, 100));
}
impl Config for Test {
type RuntimeEvent = RuntimeEvent;
@@ -91,6 +91,7 @@ pub struct MockedWeightInfo;
parameter_types! {
/// Storage for `MockedWeightInfo`, do not use directly.
pub static WeightForCall: BTreeMap<String, Weight> = Default::default();
pub static DefaultWeightForCall: Weight = Weight::zero();
}
/// Set the return value for a function from the `WeightInfo` trait.
@@ -111,40 +112,55 @@ impl crate::weights::WeightInfo for MockedWeightInfo {
WeightForCall::get()
.get("execute_overweight_page_updated")
.copied()
.unwrap_or_default()
.unwrap_or(DefaultWeightForCall::get())
}
fn execute_overweight_page_removed() -> Weight {
WeightForCall::get()
.get("execute_overweight_page_removed")
.copied()
.unwrap_or_default()
.unwrap_or(DefaultWeightForCall::get())
}
fn service_page_base_completion() -> Weight {
WeightForCall::get()
.get("service_page_base_completion")
.copied()
.unwrap_or_default()
.unwrap_or(DefaultWeightForCall::get())
}
fn service_page_base_no_completion() -> Weight {
WeightForCall::get()
.get("service_page_base_no_completion")
.copied()
.unwrap_or_default()
.unwrap_or(DefaultWeightForCall::get())
}
fn service_queue_base() -> Weight {
WeightForCall::get().get("service_queue_base").copied().unwrap_or_default()
WeightForCall::get()
.get("service_queue_base")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
fn bump_service_head() -> Weight {
WeightForCall::get().get("bump_service_head").copied().unwrap_or_default()
WeightForCall::get()
.get("bump_service_head")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
fn service_page_item() -> Weight {
WeightForCall::get().get("service_page_item").copied().unwrap_or_default()
WeightForCall::get()
.get("service_page_item")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
fn ready_ring_knit() -> Weight {
WeightForCall::get().get("ready_ring_knit").copied().unwrap_or_default()
WeightForCall::get()
.get("ready_ring_knit")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
fn ready_ring_unknit() -> Weight {
WeightForCall::get().get("ready_ring_unknit").copied().unwrap_or_default()
WeightForCall::get()
.get("ready_ring_unknit")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
}
+182
View File
@@ -266,6 +266,44 @@ fn service_queues_suspension_works() {
});
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "Not enough weight to service a single message.")]
fn service_queues_low_weight_defensive() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
DefaultWeightForCall::set(21.into());
// Check that the integrity test would catch this:
assert!(MessageQueue::do_integrity_test().is_err());
MessageQueue::enqueue_message(msg("weight=0"), Here);
MessageQueue::service_queues(104.into_weight());
});
}
/// Regression test for <https://github.com/paritytech/polkadot-sdk/pull/1873>.
#[test]
fn service_queues_regression_1873() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
DefaultWeightForCall::set(20.into());
MessageQueue::enqueue_message(msg("weight=100"), Here);
assert_eq!(MessageQueue::service_queues(100.into_weight()), 100.into());
// Before the MQ this would not emit any events:
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=100"),
origin: MessageOrigin::Here,
message_index: 0,
page_index: 0,
}
.into(),
);
});
}
#[test]
fn reap_page_permanent_overweight_works() {
use MessageOrigin::*;
@@ -1150,6 +1188,116 @@ fn permanently_overweight_book_unknits_multiple() {
});
}
#[test]
fn permanently_overweight_limit_is_valid_basic() {
use MessageOrigin::*;
for w in 50..300 {
build_and_execute::<Test>(|| {
DefaultWeightForCall::set(Weight::MAX);
set_weight("bump_service_head", 10.into());
set_weight("service_queue_base", 10.into());
set_weight("service_page_base_no_completion", 10.into());
set_weight("service_page_base_completion", 0.into());
set_weight("service_page_item", 10.into());
set_weight("ready_ring_unknit", 10.into());
let m = "weight=200".to_string();
MessageQueue::enqueue_message(msg(&m), Here);
MessageQueue::service_queues(w.into());
let last_event =
frame_system::Pallet::<Test>::events().into_iter().last().expect("No event");
// The weight overhead for a single message is set to 50. The message itself needs 200.
// Every weight in range `[50, 249]` should result in a permanently overweight message:
if w < 250 {
assert_eq!(
last_event.event,
RuntimeEvent::MessageQueue(Event::OverweightEnqueued {
id: blake2_256(m.as_bytes()),
origin: Here,
message_index: 0,
page_index: 0,
})
);
} else {
// Otherwise it is processed as normal:
assert_eq!(
last_event.event,
RuntimeEvent::MessageQueue(Event::Processed {
origin: Here,
weight_used: 200.into(),
id: blake2_256(m.as_bytes()),
success: true,
})
);
}
});
}
}
#[test]
fn permanently_overweight_limit_is_valid_fuzzy() {
use MessageOrigin::*;
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
for _ in 0..10 {
// Brainlet code, but works...
let (s1, s2) = (rng.gen_range(0..=10), rng.gen_range(0..=10));
let (s3, s4) = (rng.gen_range(0..=10), rng.gen_range(0..=10));
let s5 = rng.gen_range(0..=10);
let o = s1 + s2 + s3 + s4 + s5;
for w in o..=o + 300 {
build_and_execute::<Test>(|| {
DefaultWeightForCall::set(Weight::MAX);
set_weight("bump_service_head", s1.into());
set_weight("service_queue_base", s2.into());
// Only the larger one of these two is taken:
set_weight("service_page_base_no_completion", s3.into());
set_weight("service_page_base_completion", 0.into());
set_weight("service_page_item", s4.into());
set_weight("ready_ring_unknit", s5.into());
let m = "weight=200".to_string();
MessageQueue::enqueue_message(msg(&m), Here);
MessageQueue::service_queues(w.into());
let last_event =
frame_system::Pallet::<Test>::events().into_iter().last().expect("No event");
if w < o + 200 {
assert_eq!(
last_event.event,
RuntimeEvent::MessageQueue(Event::OverweightEnqueued {
id: blake2_256(m.as_bytes()),
origin: Here,
message_index: 0,
page_index: 0,
})
);
} else {
assert_eq!(
last_event.event,
RuntimeEvent::MessageQueue(Event::Processed {
origin: Here,
weight_used: 200.into(),
id: blake2_256(m.as_bytes()),
success: true,
})
);
}
});
}
}
}
/// We don't want empty books in the ready ring, but if they somehow make their way in there, it
/// should not panic.
#[test]
@@ -1447,3 +1595,37 @@ fn service_queue_suspension_ready_ring_works() {
);
});
}
#[test]
fn integrity_test_checks_service_weight() {
build_and_execute::<Test>(|| {
assert_eq!(<Test as Config>::ServiceWeight::get(), Some(100.into()), "precond");
assert!(MessageQueue::do_integrity_test().is_ok(), "precond");
// Enough for all:
DefaultWeightForCall::set(20.into());
assert!(MessageQueue::do_integrity_test().is_ok());
// Not enough for anything:
DefaultWeightForCall::set(101.into());
assert_eq!(MessageQueue::single_msg_overhead(), 505.into());
assert!(MessageQueue::do_integrity_test().is_err());
// Not enough for a single function:
for f in [
"bump_service_head",
"service_queue_base",
"service_page_base_completion",
"service_page_base_no_completion",
"service_page_item",
"ready_ring_unknit",
] {
WeightForCall::take();
DefaultWeightForCall::set(Zero::zero());
assert!(MessageQueue::do_integrity_test().is_ok());
set_weight(f, 101.into());
assert!(MessageQueue::do_integrity_test().is_err());
}
});
}