Yieldable queues for pallet MessageQueue (#13424)

* Add Yield message processing error

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add NoopServiceQueues

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Implement temporary error aka Yield

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Make NoopMessageProcessor generic

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Mock pausable message processor

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Test paused queues

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Integration test paused queues

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Use WeightMeter instead of weight return

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* fix

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Make compile

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add tests

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* ".git/.scripts/commands/bench/bench.sh" pallet dev pallet_message_queue

* Fix test

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

---------

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: command-bot <>
This commit is contained in:
Oliver Tale-Yazdi
2023-02-25 17:13:20 +01:00
committed by GitHub
parent 84638524e7
commit 16773d3696
9 changed files with 479 additions and 173 deletions
+40 -15
View File
@@ -533,6 +533,11 @@ pub mod pallet {
Queued,
/// There is temporarily not enough weight to continue servicing messages.
InsufficientWeight,
/// This message is temporarily unprocessable.
///
/// Such errors are expected, but not guaranteed, to resolve themselves eventually through
/// retrying.
TemporarilyUnprocessable,
}
/// The index of the first and last (non-empty) pages.
@@ -588,6 +593,9 @@ pub mod pallet {
/// Execute an overweight message.
///
/// Temporary processing errors will be propagated whereas permanent errors are treated
/// as success condition.
///
/// - `origin`: Must be `Signed`.
/// - `message_origin`: The origin from which the message to be executed arrived.
/// - `page`: The page in the queue in which the message to be executed is sitting.
@@ -621,6 +629,10 @@ pub mod pallet {
enum PageExecutionStatus {
/// The execution bailed because there was not enough weight remaining.
Bailed,
/// The page did not make any progress on its execution.
///
/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
NoProgress,
/// No more messages could be loaded. This does _not_ imply `page.is_complete()`.
///
/// The reasons for this status are:
@@ -634,6 +646,10 @@ enum PageExecutionStatus {
enum ItemExecutionStatus {
/// The execution bailed because there was not enough weight remaining.
Bailed,
/// The item did not make any progress on its execution.
///
/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
NoProgress,
/// The item was not found.
NoItem,
/// Whether the execution of an item resulted in it being processed.
@@ -651,8 +667,8 @@ enum MessageExecutionStatus {
Overweight,
/// The message was processed successfully.
Processed,
/// The message was processed and resulted in a permanent error.
Unprocessable,
/// The message was processed and resulted in a, possibly permanent, error.
Unprocessable { permanent: bool },
}
impl<T: Config> Pallet<T> {
@@ -814,7 +830,8 @@ impl<T: Config> Pallet<T> {
// additional overweight event being deposited.
) {
Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
Unprocessable | Processed => {
Unprocessable { permanent: false } => Err(Error::<T>::TemporarilyUnprocessable),
Unprocessable { permanent: true } | Processed => {
page.note_processed_at_pos(pos);
book_state.message_count.saturating_dec();
book_state.size.saturating_reduce(payload_len);
@@ -921,6 +938,7 @@ impl<T: Config> Pallet<T> {
weight: &mut WeightMeter,
overweight_limit: Weight,
) -> (bool, Option<MessageOriginOf<T>>) {
use PageExecutionStatus::*;
if !weight.check_accrue(
T::WeightInfo::service_queue_base().saturating_add(T::WeightInfo::ready_ring_unknit()),
) {
@@ -936,9 +954,9 @@ impl<T: Config> Pallet<T> {
total_processed.saturating_accrue(processed);
match status {
// Store the page progress and do not go to the next one.
PageExecutionStatus::Bailed => break,
Bailed | NoProgress => break,
// Go to the next page if this one is at the end.
PageExecutionStatus::NoMore => (),
NoMore => (),
};
book_state.begin.saturating_inc();
}
@@ -1003,6 +1021,7 @@ impl<T: Config> Pallet<T> {
) {
Bailed => break PageExecutionStatus::Bailed,
NoItem => break PageExecutionStatus::NoMore,
NoProgress => break PageExecutionStatus::NoProgress,
// Keep going as long as we make progress...
Executed(true) => total_processed.saturating_inc(),
Executed(false) => (),
@@ -1053,7 +1072,8 @@ impl<T: Config> Pallet<T> {
overweight_limit,
) {
InsufficientWeight => return ItemExecutionStatus::Bailed,
Processed | Unprocessable => true,
Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
Processed | Unprocessable { permanent: true } => true,
Overweight => false,
};
@@ -1125,12 +1145,14 @@ impl<T: Config> Pallet<T> {
page_index: PageIndex,
message_index: T::Size,
message: &[u8],
weight: &mut WeightMeter,
meter: &mut WeightMeter,
overweight_limit: Weight,
) -> MessageExecutionStatus {
let hash = T::Hashing::hash(message);
use ProcessMessageError::Overweight;
match T::MessageProcessor::process_message(message, origin.clone(), weight.remaining()) {
use ProcessMessageError::*;
let prev_consumed = meter.consumed;
match T::MessageProcessor::process_message(message, origin.clone(), meter) {
Err(Overweight(w)) if w.any_gt(overweight_limit) => {
// Permanently overweight.
Self::deposit_event(Event::<T>::OverweightEnqueued {
@@ -1146,16 +1168,19 @@ impl<T: Config> Pallet<T> {
// queue.
MessageExecutionStatus::InsufficientWeight
},
Err(error) => {
Err(Yield) => {
// Processing should be reattempted later.
MessageExecutionStatus::Unprocessable { permanent: false }
},
Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
// Permanent error - drop
Self::deposit_event(Event::<T>::ProcessingFailed { hash, origin, error });
MessageExecutionStatus::Unprocessable
MessageExecutionStatus::Unprocessable { permanent: true }
},
Ok((success, weight_used)) => {
Ok(success) => {
// Success
weight.defensive_saturating_accrue(weight_used);
let event = Event::<T>::Processed { hash, origin, weight_used, success };
Self::deposit_event(event);
let weight_used = meter.consumed.saturating_sub(prev_consumed);
Self::deposit_event(Event::<T>::Processed { hash, origin, weight_used, success });
MessageExecutionStatus::Processed
},
}