dmp-queue: Store messages if already processed more than the maximum (#2343)

* dmp-queue: Store messages if already processed more than the maximum

* Put new event at the end
This commit is contained in:
Bastian Köcher
2023-03-17 14:45:01 +01:00
committed by EgorPopelyaev
parent 5e23bb7579
commit 71b6d4a0fe
+81 -39
View File
@@ -189,6 +189,8 @@ pub mod pallet {
},
/// Downward message from the overweight queue was executed.
OverweightServiced { overweight_index: OverweightIndex, weight_used: Weight },
/// The maximum number of downward messages was.
MaxMessagesExhausted { message_id: MessageId },
}
impl<T: Config> Pallet<T> {
@@ -306,46 +308,53 @@ pub mod pallet {
};
for (i, (sent_at, data)) in iter.enumerate() {
if messages_processed >= MAX_MESSAGES_PER_BLOCK {
break
}
if maybe_enqueue_page.is_none() {
// We're not currently enqueuing - try to execute inline.
let remaining_weight = limit.saturating_sub(used);
messages_processed += 1;
match Self::try_service_message(remaining_weight, sent_at, &data[..]) {
Ok(consumed) => used += consumed,
Err((message_id, required_weight)) =>
// Too much weight required right now.
{
let is_under_limit = Overweight::<T>::count() < MAX_OVERWEIGHT_MESSAGES;
used.saturating_accrue(T::DbWeight::get().reads(1));
if required_weight.any_gt(config.max_individual) && is_under_limit {
// overweight - add to overweight queue and continue with
// message execution.
let overweight_index = page_index.overweight_count;
Overweight::<T>::insert(overweight_index, (sent_at, data));
Self::deposit_event(Event::OverweightEnqueued {
message_id,
overweight_index,
required_weight,
});
page_index.overweight_count += 1;
// Not needed for control flow, but only to ensure that the compiler
// understands that we won't attempt to re-use `data` later.
continue
} else {
// not overweight. stop executing inline and enqueue normally
// from here on.
let item_count_left = item_count.saturating_sub(i);
maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));
Self::deposit_event(Event::WeightExhausted {
message_id,
remaining_weight,
required_weight,
});
}
},
if messages_processed >= MAX_MESSAGES_PER_BLOCK {
let item_count_left = item_count.saturating_sub(i);
maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));
Self::deposit_event(Event::MaxMessagesExhausted {
message_id: sp_io::hashing::blake2_256(&data),
});
} else {
// We're not currently enqueuing - try to execute inline.
let remaining_weight = limit.saturating_sub(used);
messages_processed += 1;
match Self::try_service_message(remaining_weight, sent_at, &data[..]) {
Ok(consumed) => used += consumed,
Err((message_id, required_weight)) =>
// Too much weight required right now.
{
let is_under_limit =
Overweight::<T>::count() < MAX_OVERWEIGHT_MESSAGES;
used.saturating_accrue(T::DbWeight::get().reads(1));
if required_weight.any_gt(config.max_individual) && is_under_limit {
// overweight - add to overweight queue and continue with
// message execution.
let overweight_index = page_index.overweight_count;
Overweight::<T>::insert(overweight_index, (sent_at, data));
Self::deposit_event(Event::OverweightEnqueued {
message_id,
overweight_index,
required_weight,
});
page_index.overweight_count += 1;
// Not needed for control flow, but only to ensure that the compiler
// understands that we won't attempt to re-use `data` later.
continue
} else {
// not overweight. stop executing inline and enqueue normally
// from here on.
let item_count_left = item_count.saturating_sub(i);
maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));
Self::deposit_event(Event::WeightExhausted {
message_id,
remaining_weight,
required_weight,
});
}
},
}
}
}
// Cannot be an `else` here since the `maybe_enqueue_page` may have changed.
@@ -888,4 +897,37 @@ mod tests {
assert_eq!(pages_queued(), 1);
});
}
#[test]
fn handle_max_messages_per_block() {
new_test_ext().execute_with(|| {
enqueue(&vec![msg(1000), msg(1001)]);
enqueue(&vec![msg(1002), msg(1003)]);
enqueue(&vec![msg(1004), msg(1005)]);
let incoming = (0..MAX_MESSAGES_PER_BLOCK)
.into_iter()
.map(|i| msg(1006 + i as u64))
.collect::<Vec<_>>();
handle_messages(&incoming, Weight::from_parts(25000, 25000));
assert_eq!(
take_trace(),
(0..MAX_MESSAGES_PER_BLOCK)
.into_iter()
.map(|i| msg_complete(1000 + i as u64))
.collect::<Vec<_>>(),
);
assert_eq!(pages_queued(), 1);
handle_messages(&[], Weight::from_parts(25000, 25000));
assert_eq!(
take_trace(),
(MAX_MESSAGES_PER_BLOCK..MAX_MESSAGES_PER_BLOCK + 6)
.into_iter()
.map(|i| msg_complete(1000 + i as u64))
.collect::<Vec<_>>(),
);
});
}
}