mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-01 15:57:55 +00:00
pallet-message-queue: add queue pausing (#14318)
* pallet-message-queue: add queue pausing Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io> * Fix build Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io> * Remove check Otherwise it would not start servicing queues that started paused and became unpaused afterwards. Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io> --------- Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
This commit is contained in:
committed by
GitHub
parent
2119c80225
commit
4249643df2
@@ -27,22 +27,22 @@ use sp_core::blake2_256;
|
||||
|
||||
#[test]
|
||||
fn mocked_weight_works() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
assert!(<Test as Config>::WeightInfo::service_queue_base().is_zero());
|
||||
});
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("service_queue_base", Weight::MAX);
|
||||
assert_eq!(<Test as Config>::WeightInfo::service_queue_base(), Weight::MAX);
|
||||
});
|
||||
// The externalities reset it.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
assert!(<Test as Config>::WeightInfo::service_queue_base().is_zero());
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn enqueue_within_one_page_works() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
use MessageOrigin::*;
|
||||
MessageQueue::enqueue_message(msg("a"), Here);
|
||||
MessageQueue::enqueue_message(msg("b"), Here);
|
||||
@@ -77,7 +77,7 @@ fn enqueue_within_one_page_works() {
|
||||
|
||||
#[test]
|
||||
fn queue_priority_retains() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
use MessageOrigin::*;
|
||||
assert_ring(&[]);
|
||||
MessageQueue::enqueue_message(msg("a"), Everywhere(1));
|
||||
@@ -108,7 +108,7 @@ fn queue_priority_retains() {
|
||||
|
||||
#[test]
|
||||
fn queue_priority_reset_once_serviced() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
use MessageOrigin::*;
|
||||
MessageQueue::enqueue_message(msg("a"), Everywhere(1));
|
||||
MessageQueue::enqueue_message(msg("b"), Everywhere(2));
|
||||
@@ -135,7 +135,7 @@ fn queue_priority_reset_once_serviced() {
|
||||
#[test]
|
||||
fn service_queues_basic_works() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here);
|
||||
MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
|
||||
assert_eq!(QueueChanges::take(), vec![(Here, 3, 6), (There, 3, 6)]);
|
||||
@@ -146,13 +146,11 @@ fn service_queues_basic_works() {
|
||||
assert_eq!(QueueChanges::take(), vec![(Here, 2, 5)]);
|
||||
|
||||
// Service one message from `There`.
|
||||
ServiceHead::<Test>::set(There.into());
|
||||
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
|
||||
assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]);
|
||||
assert_eq!(QueueChanges::take(), vec![(There, 2, 5)]);
|
||||
|
||||
// Service the remaining from `Here`.
|
||||
ServiceHead::<Test>::set(Here.into());
|
||||
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
|
||||
assert_eq!(MessagesProcessed::take(), vec![(vmsg("ab"), Here), (vmsg("abc"), Here)]);
|
||||
assert_eq!(QueueChanges::take(), vec![(Here, 0, 0)]);
|
||||
@@ -167,7 +165,7 @@ fn service_queues_basic_works() {
|
||||
#[test]
|
||||
fn service_queues_failing_messages_works() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("service_page_item", 1.into_weight());
|
||||
MessageQueue::enqueue_message(msg("badformat"), Here);
|
||||
MessageQueue::enqueue_message(msg("corrupt"), Here);
|
||||
@@ -213,7 +211,7 @@ fn service_queues_failing_messages_works() {
|
||||
#[test]
|
||||
fn service_queues_suspension_works() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
MessageQueue::enqueue_messages(vec![msg("a"), msg("b"), msg("c")].into_iter(), Here);
|
||||
MessageQueue::enqueue_messages(vec![msg("x"), msg("y"), msg("z")].into_iter(), There);
|
||||
MessageQueue::enqueue_messages(
|
||||
@@ -227,8 +225,8 @@ fn service_queues_suspension_works() {
|
||||
assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]);
|
||||
assert_eq!(QueueChanges::take(), vec![(Here, 2, 2)]);
|
||||
|
||||
// Pause queue `Here` and `Everywhere(0)`.
|
||||
SuspendedQueues::set(vec![Here, Everywhere(0)]);
|
||||
// Make queue `Here` and `Everywhere(0)` yield.
|
||||
YieldingQueues::set(vec![Here, Everywhere(0)]);
|
||||
|
||||
// Service one message from `There`.
|
||||
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
|
||||
@@ -245,13 +243,13 @@ fn service_queues_suspension_works() {
|
||||
assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero());
|
||||
|
||||
// ... until we resume `Here`:
|
||||
SuspendedQueues::set(vec![Everywhere(0)]);
|
||||
YieldingQueues::set(vec![Everywhere(0)]);
|
||||
assert_eq!(MessageQueue::service_queues(Weight::MAX), 2.into_weight());
|
||||
assert_eq!(MessagesProcessed::take(), vec![(vmsg("b"), Here), (vmsg("c"), Here)]);
|
||||
|
||||
// Everywhere still won't move.
|
||||
assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero());
|
||||
SuspendedQueues::take();
|
||||
YieldingQueues::take();
|
||||
// Resume `Everywhere(0)` makes it work.
|
||||
assert_eq!(MessageQueue::service_queues(Weight::MAX), 3.into_weight());
|
||||
assert_eq!(
|
||||
@@ -268,7 +266,7 @@ fn service_queues_suspension_works() {
|
||||
#[test]
|
||||
fn reap_page_permanent_overweight_works() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
// Create 10 pages more than the stale limit.
|
||||
let n = (MaxStale::get() + 10) as usize;
|
||||
for _ in 0..n {
|
||||
@@ -308,7 +306,7 @@ fn reaping_overweight_fails_properly() {
|
||||
use MessageOrigin::*;
|
||||
assert_eq!(MaxStale::get(), 2, "The stale limit is two");
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
// page 0
|
||||
MessageQueue::enqueue_message(msg("weight=4"), Here);
|
||||
MessageQueue::enqueue_message(msg("a"), Here);
|
||||
@@ -378,7 +376,7 @@ fn reaping_overweight_fails_properly() {
|
||||
#[test]
|
||||
fn service_queue_bails() {
|
||||
// Not enough weight for `service_queue_base`.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("service_queue_base", 2.into_weight());
|
||||
let mut meter = WeightMeter::from_limit(1.into_weight());
|
||||
|
||||
@@ -386,7 +384,7 @@ fn service_queue_bails() {
|
||||
assert!(meter.consumed.is_zero());
|
||||
});
|
||||
// Not enough weight for `ready_ring_unknit`.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("ready_ring_unknit", 2.into_weight());
|
||||
let mut meter = WeightMeter::from_limit(1.into_weight());
|
||||
|
||||
@@ -394,7 +392,7 @@ fn service_queue_bails() {
|
||||
assert!(meter.consumed.is_zero());
|
||||
});
|
||||
// Not enough weight for `service_queue_base` and `ready_ring_unknit`.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("service_queue_base", 2.into_weight());
|
||||
set_weight("ready_ring_unknit", 2.into_weight());
|
||||
|
||||
@@ -409,7 +407,7 @@ fn service_page_works() {
|
||||
use super::integration_test::Test; // Run with larger page size.
|
||||
use MessageOrigin::*;
|
||||
use PageExecutionStatus::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("service_page_base_completion", 2.into_weight());
|
||||
set_weight("service_page_item", 3.into_weight());
|
||||
|
||||
@@ -446,7 +444,7 @@ fn service_page_works() {
|
||||
#[test]
|
||||
fn service_page_bails() {
|
||||
// Not enough weight for `service_page_base_completion`.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("service_page_base_completion", 2.into_weight());
|
||||
let mut meter = WeightMeter::from_limit(1.into_weight());
|
||||
|
||||
@@ -463,7 +461,7 @@ fn service_page_bails() {
|
||||
assert!(meter.consumed.is_zero());
|
||||
});
|
||||
// Not enough weight for `service_page_base_no_completion`.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("service_page_base_no_completion", 2.into_weight());
|
||||
let mut meter = WeightMeter::from_limit(1.into_weight());
|
||||
|
||||
@@ -483,7 +481,7 @@ fn service_page_bails() {
|
||||
|
||||
#[test]
|
||||
fn service_page_item_bails() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
let _guard = StorageNoopGuard::default();
|
||||
let (mut page, _) = full_page::<Test>();
|
||||
let mut weight = WeightMeter::from_limit(10.into_weight());
|
||||
@@ -510,7 +508,7 @@ fn service_page_suspension_works() {
|
||||
use MessageOrigin::*;
|
||||
use PageExecutionStatus::*;
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
let (page, mut msgs) = full_page::<Test>();
|
||||
assert!(msgs >= 10, "pre-condition: need at least 10 msgs per page");
|
||||
let mut book = book_for::<Test>(&page);
|
||||
@@ -527,7 +525,7 @@ fn service_page_suspension_works() {
|
||||
msgs -= 5;
|
||||
|
||||
// Then we pause the queue.
|
||||
SuspendedQueues::set(vec![Here]);
|
||||
YieldingQueues::set(vec![Here]);
|
||||
// Noting happens...
|
||||
for _ in 0..5 {
|
||||
let (_, status) = crate::Pallet::<Test>::service_page(
|
||||
@@ -541,7 +539,7 @@ fn service_page_suspension_works() {
|
||||
}
|
||||
|
||||
// Resume and process all remaining.
|
||||
SuspendedQueues::take();
|
||||
YieldingQueues::take();
|
||||
let (_, status) = crate::Pallet::<Test>::service_page(
|
||||
&Here,
|
||||
&mut book,
|
||||
@@ -558,7 +556,7 @@ fn service_page_suspension_works() {
|
||||
#[test]
|
||||
fn bump_service_head_works() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
// Create a ready ring with three queues.
|
||||
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
|
||||
knit(&Here);
|
||||
@@ -581,7 +579,7 @@ fn bump_service_head_works() {
|
||||
/// `bump_service_head` does nothing when called with an insufficient weight limit.
|
||||
#[test]
|
||||
fn bump_service_head_bails() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("bump_service_head", 2.into_weight());
|
||||
setup_bump_service_head::<Test>(0.into(), 10.into());
|
||||
|
||||
@@ -594,7 +592,7 @@ fn bump_service_head_bails() {
|
||||
|
||||
#[test]
|
||||
fn bump_service_head_trivial_works() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("bump_service_head", 2.into_weight());
|
||||
let mut meter = WeightMeter::max_limit();
|
||||
|
||||
@@ -615,7 +613,7 @@ fn bump_service_head_trivial_works() {
|
||||
#[test]
|
||||
fn bump_service_head_no_head_noops() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
// Create a ready ring with three queues.
|
||||
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
|
||||
knit(&Here);
|
||||
@@ -634,7 +632,7 @@ fn bump_service_head_no_head_noops() {
|
||||
|
||||
#[test]
|
||||
fn service_page_item_consumes_correct_weight() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
let mut page = page::<Test>(b"weight=3");
|
||||
let mut weight = WeightMeter::from_limit(10.into_weight());
|
||||
let overweight_limit = 0.into_weight();
|
||||
@@ -658,7 +656,7 @@ fn service_page_item_consumes_correct_weight() {
|
||||
/// `service_page_item` skips a permanently `Overweight` message and marks it as `unprocessed`.
|
||||
#[test]
|
||||
fn service_page_item_skips_perm_overweight_message() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
let mut page = page::<Test>(b"TooMuch");
|
||||
let mut weight = WeightMeter::from_limit(2.into_weight());
|
||||
let overweight_limit = 0.into_weight();
|
||||
@@ -697,7 +695,7 @@ fn service_page_item_skips_perm_overweight_message() {
|
||||
#[test]
|
||||
fn peek_index_works() {
|
||||
use super::integration_test::Test; // Run with larger page size.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
// Fill a page with messages.
|
||||
let (mut page, msgs) = full_page::<Test>();
|
||||
let msg_enc_len = ItemHeader::<<Test as Config>::Size>::max_encoded_len() + 4;
|
||||
@@ -718,7 +716,7 @@ fn peek_index_works() {
|
||||
#[test]
|
||||
fn peek_first_and_skip_first_works() {
|
||||
use super::integration_test::Test; // Run with larger page size.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
// Fill a page with messages.
|
||||
let (mut page, msgs) = full_page::<Test>();
|
||||
|
||||
@@ -741,7 +739,7 @@ fn peek_first_and_skip_first_works() {
|
||||
#[test]
|
||||
fn note_processed_at_pos_works() {
|
||||
use super::integration_test::Test; // Run with larger page size.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
let (mut page, msgs) = full_page::<Test>();
|
||||
|
||||
for i in 0..msgs {
|
||||
@@ -777,7 +775,7 @@ fn note_processed_at_pos_idempotent() {
|
||||
#[test]
|
||||
fn is_complete_works() {
|
||||
use super::integration_test::Test; // Run with larger page size.
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
let (mut page, msgs) = full_page::<Test>();
|
||||
assert!(msgs > 3, "Boring");
|
||||
let msg_enc_len = ItemHeader::<<Test as Config>::Size>::max_encoded_len() + 4;
|
||||
@@ -933,7 +931,7 @@ fn page_from_message_max_len_works() {
|
||||
#[test]
|
||||
fn sweep_queue_works() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
build_triple_ring();
|
||||
|
||||
let book = BookStateFor::<Test>::get(Here);
|
||||
@@ -969,7 +967,7 @@ fn sweep_queue_works() {
|
||||
#[test]
|
||||
fn sweep_queue_wraps_works() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
|
||||
knit(&Here);
|
||||
|
||||
@@ -982,14 +980,14 @@ fn sweep_queue_wraps_works() {
|
||||
#[test]
|
||||
fn sweep_queue_invalid_noops() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
assert_storage_noop!(MessageQueue::sweep_queue(Here));
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn footprint_works() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
let origin = MessageOrigin::Here;
|
||||
let (page, msgs) = full_page::<Test>();
|
||||
let book = book_for::<Test>(&page);
|
||||
@@ -1007,7 +1005,7 @@ fn footprint_works() {
|
||||
/// The footprint of an invalid queue is the default footprint.
|
||||
#[test]
|
||||
fn footprint_invalid_works() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
let origin = MessageOrigin::Here;
|
||||
assert_eq!(MessageQueue::footprint(origin), Default::default());
|
||||
})
|
||||
@@ -1017,7 +1015,7 @@ fn footprint_invalid_works() {
|
||||
#[test]
|
||||
fn footprint_on_swept_works() {
|
||||
use MessageOrigin::*;
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
let mut book = empty_book::<Test>();
|
||||
book.message_count = 3;
|
||||
book.size = 10;
|
||||
@@ -1033,7 +1031,7 @@ fn footprint_on_swept_works() {
|
||||
|
||||
#[test]
|
||||
fn execute_overweight_works() {
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("bump_service_head", 1.into_weight());
|
||||
set_weight("service_queue_base", 1.into_weight());
|
||||
set_weight("service_page_base_completion", 1.into_weight());
|
||||
@@ -1093,7 +1091,7 @@ fn execute_overweight_works() {
|
||||
fn permanently_overweight_book_unknits() {
|
||||
use MessageOrigin::*;
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("bump_service_head", 1.into_weight());
|
||||
set_weight("service_queue_base", 1.into_weight());
|
||||
set_weight("service_page_base_completion", 1.into_weight());
|
||||
@@ -1130,7 +1128,7 @@ fn permanently_overweight_book_unknits() {
|
||||
fn permanently_overweight_book_unknits_multiple() {
|
||||
use MessageOrigin::*;
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
set_weight("bump_service_head", 1.into_weight());
|
||||
set_weight("service_queue_base", 1.into_weight());
|
||||
set_weight("service_page_base_completion", 1.into_weight());
|
||||
@@ -1169,7 +1167,7 @@ fn permanently_overweight_book_unknits_multiple() {
|
||||
fn ready_but_empty_does_not_panic() {
|
||||
use MessageOrigin::*;
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
|
||||
BookStateFor::<Test>::insert(There, empty_book::<Test>());
|
||||
|
||||
@@ -1189,7 +1187,7 @@ fn ready_but_empty_does_not_panic() {
|
||||
fn ready_but_perm_overweight_does_not_panic() {
|
||||
use MessageOrigin::*;
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
MessageQueue::enqueue_message(msg("weight=9"), Here);
|
||||
assert_eq!(MessageQueue::service_queues(8.into_weight()), 0.into_weight());
|
||||
assert_ring(&[]);
|
||||
@@ -1209,7 +1207,7 @@ fn ready_but_perm_overweight_does_not_panic() {
|
||||
fn ready_ring_knit_basic_works() {
|
||||
use MessageOrigin::*;
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
|
||||
|
||||
for i in 0..10 {
|
||||
@@ -1229,12 +1227,15 @@ fn ready_ring_knit_basic_works() {
|
||||
fn ready_ring_knit_and_unknit_works() {
|
||||
use MessageOrigin::*;
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
// Place three queues into the storage.
|
||||
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
|
||||
BookStateFor::<Test>::insert(There, empty_book::<Test>());
|
||||
BookStateFor::<Test>::insert(Everywhere(0), empty_book::<Test>());
|
||||
|
||||
// Pausing should make no difference:
|
||||
PausedQueues::set(vec![Here, There, Everywhere(0)]);
|
||||
|
||||
// Knit them into the ready ring.
|
||||
assert_ring(&[]);
|
||||
knit(&Here);
|
||||
@@ -1260,7 +1261,7 @@ fn enqueue_message_works() {
|
||||
let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 /
|
||||
(ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1);
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
// Enqueue messages which should fill three pages.
|
||||
let n = max_msg_per_page * 3;
|
||||
for i in 1..=n {
|
||||
@@ -1290,7 +1291,7 @@ fn enqueue_messages_works() {
|
||||
let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 /
|
||||
(ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1);
|
||||
|
||||
new_test_ext::<Test>().execute_with(|| {
|
||||
test_closure(|| {
|
||||
// Enqueue messages which should fill three pages.
|
||||
let n = max_msg_per_page * 3;
|
||||
let msgs = vec![msg("a"); n as usize];
|
||||
@@ -1315,3 +1316,144 @@ fn enqueue_messages_works() {
|
||||
assert_eq!(book.count as usize, Pages::<Test>::iter().count());
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn service_queues_suspend_works() {
|
||||
use MessageOrigin::*;
|
||||
test_closure(|| {
|
||||
MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here);
|
||||
MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
|
||||
assert_eq!(QueueChanges::take(), vec![(Here, 3, 6), (There, 3, 6)]);
|
||||
|
||||
// Pause `Here` - execution starts `There`.
|
||||
PausedQueues::set(vec![Here]);
|
||||
assert_eq!(
|
||||
(true, false),
|
||||
(
|
||||
<Test as Config>::QueuePausedQuery::is_paused(&Here),
|
||||
<Test as Config>::QueuePausedQuery::is_paused(&There)
|
||||
)
|
||||
);
|
||||
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
|
||||
assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]);
|
||||
assert_eq!(QueueChanges::take(), vec![(There, 2, 5)]);
|
||||
|
||||
// Unpause `Here` - execution continues `There`.
|
||||
PausedQueues::take();
|
||||
assert_eq!(
|
||||
(false, false),
|
||||
(
|
||||
<Test as Config>::QueuePausedQuery::is_paused(&Here),
|
||||
<Test as Config>::QueuePausedQuery::is_paused(&There)
|
||||
)
|
||||
);
|
||||
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
|
||||
assert_eq!(MessagesProcessed::take(), vec![(vmsg("xy"), There)]);
|
||||
assert_eq!(QueueChanges::take(), vec![(There, 1, 3)]);
|
||||
|
||||
// Now it swaps to `Here`.
|
||||
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
|
||||
assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]);
|
||||
assert_eq!(QueueChanges::take(), vec![(Here, 2, 5)]);
|
||||
|
||||
// Pause `There` - execution continues `Here`.
|
||||
PausedQueues::set(vec![There]);
|
||||
assert_eq!(
|
||||
(false, true),
|
||||
(
|
||||
<Test as Config>::QueuePausedQuery::is_paused(&Here),
|
||||
<Test as Config>::QueuePausedQuery::is_paused(&There)
|
||||
)
|
||||
);
|
||||
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
|
||||
assert_eq!(MessagesProcessed::take(), vec![(vmsg("ab"), Here)]);
|
||||
assert_eq!(QueueChanges::take(), vec![(Here, 1, 3)]);
|
||||
|
||||
// Unpause `There` and service all remaining messages.
|
||||
PausedQueues::take();
|
||||
assert_eq!(
|
||||
(false, false),
|
||||
(
|
||||
<Test as Config>::QueuePausedQuery::is_paused(&Here),
|
||||
<Test as Config>::QueuePausedQuery::is_paused(&There)
|
||||
)
|
||||
);
|
||||
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
|
||||
assert_eq!(MessagesProcessed::take(), vec![(vmsg("abc"), Here), (vmsg("xyz"), There)]);
|
||||
assert_eq!(QueueChanges::take(), vec![(Here, 0, 0), (There, 0, 0)]);
|
||||
});
|
||||
}
|
||||
|
||||
/// Tests that manual overweight execution on a suspended queue errors with `QueueSuspended`.
|
||||
#[test]
|
||||
fn execute_overweight_respects_suspension() {
|
||||
test_closure(|| {
|
||||
let origin = MessageOrigin::Here;
|
||||
MessageQueue::enqueue_message(msg("weight=5"), origin);
|
||||
// Mark the message as permanently overweight.
|
||||
MessageQueue::service_queues(4.into_weight());
|
||||
assert_last_event::<Test>(
|
||||
Event::OverweightEnqueued {
|
||||
id: blake2_256(b"weight=5"),
|
||||
origin,
|
||||
message_index: 0,
|
||||
page_index: 0,
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
PausedQueues::set(vec![origin]);
|
||||
assert!(<Test as Config>::QueuePausedQuery::is_paused(&origin));
|
||||
|
||||
// Execution should fail.
|
||||
assert_eq!(
|
||||
<MessageQueue as ServiceQueues>::execute_overweight(Weight::MAX, (origin, 0, 0)),
|
||||
Err(ExecuteOverweightError::QueuePaused)
|
||||
);
|
||||
|
||||
PausedQueues::take();
|
||||
assert!(!<Test as Config>::QueuePausedQuery::is_paused(&origin));
|
||||
|
||||
// Execution should work again with same args.
|
||||
assert_ok!(<MessageQueue as ServiceQueues>::execute_overweight(
|
||||
Weight::MAX,
|
||||
(origin, 0, 0)
|
||||
));
|
||||
|
||||
assert_last_event::<Test>(
|
||||
Event::Processed {
|
||||
id: blake2_256(b"weight=5"),
|
||||
origin,
|
||||
weight_used: 5.into_weight(),
|
||||
success: true,
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn service_queue_suspension_ready_ring_works() {
|
||||
test_closure(|| {
|
||||
let origin = MessageOrigin::Here;
|
||||
PausedQueues::set(vec![origin]);
|
||||
MessageQueue::enqueue_message(msg("weight=5"), origin);
|
||||
|
||||
MessageQueue::service_queues(Weight::MAX);
|
||||
// It did not execute but is in the ready ring.
|
||||
assert!(System::events().is_empty(), "Paused");
|
||||
assert_ring(&[origin]);
|
||||
|
||||
// Now when we un-pause, it will execute.
|
||||
PausedQueues::take();
|
||||
MessageQueue::service_queues(Weight::MAX);
|
||||
assert_last_event::<Test>(
|
||||
Event::Processed {
|
||||
id: blake2_256(b"weight=5"),
|
||||
origin,
|
||||
weight_used: 5.into_weight(),
|
||||
success: true,
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user