General Message Queue Pallet (#12485)

* The message queue

* Make fully generic

* Refactor

* Docs

* Refactor

* Use iter not slice

* Per-origin queues

* Multi-queue processing

* Introduce MaxReady

* Remove MaxReady in favour of ready ring

* Cleanups

* ReadyRing and tests

* Stale page reaping

* from_components -> from_parts

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Move WeightCounter to sp_weights

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add MockedWeightInfo

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Deploy to kitchensink

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Use WeightCounter

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Small fixes and logging

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add service_page

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Typo

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Move service_page below service_queue

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add service_message

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Use correct weight function

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Overweight execution

* Refactor

* Missing file

* Fix WeightCounter usage in scheduler

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fix peek_index

Take into account that decoding from a mutable slice modifies it.

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add tests and bench service_page_item

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add debug_info

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add no-progress check to service_queues

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add more benches

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Bound from_message and try_append_message

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add PageReaped event

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Rename BookStateOf and BookStateFor

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Update tests and remove logging

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Remove redundant per-message origins; add footprint() and sweep_queue()

* Move testing stuff to mock.rs

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add integration test

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fix no-progress check

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fix debug_info

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fixup merge and tests

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fix footprint tracking

* Introduce

* Formatting

* OverweightEnqueued event, auto-servicing config item

* Update tests and benchmarks

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Clippy

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add tests

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Provide change handler

* Add missing BookStateFor::insert and call QueueChangeHandler

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Docs

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Update benchmarks and weights

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* More tests...

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Use weight metering functions

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* weightInfo::process_message_payload is gone

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add defensive_saturating_accrue

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Rename WeightCounter to WeightMeter

Ctr+Shift+H should do the trick.

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Test on_initialize

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add module docs

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Remove origin from MaxMessageLen

The message origin is not encoded into the heap and does
therefore not influence the max message length anymore.

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add BoundedVec::as_slice

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Test Page::{from_message, try_append_message}

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fixup docs

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Docs

* Do nothing in sweep_queue if the queue does not exist

... otherwise it inserts default values into the storage.

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Test ring (un)knitting

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Upgrade stress-test

Change the test to not assume that all queued messages will be
processed in the next block but split it over multiple.

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* More tests...

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Beauty fixes

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* clippy

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Rename BoundedVec::as_slice to as_bounded_slice

Conflicts with deref().as_slice() otherwise.

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fix imports

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Remove ReadyRing struct

Was used for testing only. Instead use 'fn assert_ring' which also
check the service head and backlinks.

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Beauty fixes

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fix stale page watermark

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Cleanup

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fix test feature and clippy

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* QueueChanged handler is called correctly

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Update benches

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Abstract testing functions

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* More tests

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Cleanup

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Clippy

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* fmt

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Simplify tests

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Make stuff compile

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Extend overweight execution benchmark

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Remove TODOs

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Test service queue with faulty MessageProcessor

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* fmt

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Update pallet ui tests to 1.65

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* More docs

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Review doc fixes

Co-authored-by: Robert Klotzner <eskimor@users.noreply.github.com>
Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Add weight_limit to extrinsic weight of execute_overweight

* Correctly return unused weight

* Return actual weight consumed in do_execute_overweight

* Review fixes

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Set version 7.0.0-dev

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Make it compile

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Switch message_size to u64

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Switch message_count to u64

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Fix benchmarks

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Make CI green

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Docs

* Update tests

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* ".git/.scripts/bench-bot.sh" pallet dev pallet_message_queue

* Dont mention README.md in the Cargo.toml

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>

* Remove reference to readme

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: parity-processbot <>
Co-authored-by: Robert Klotzner <eskimor@users.noreply.github.com>
Co-authored-by: Keith Yeung <kungfukeith11@gmail.com>
This commit is contained in:
Gavin Wood
2022-12-09 10:38:24 +00:00
committed by GitHub
parent 487ed143df
commit 6f3d1a8143
20 changed files with 3883 additions and 15 deletions
+6
View File
@@ -112,6 +112,12 @@ pub use voting::{
mod preimages;
pub use preimages::{Bounded, BoundedInline, FetchResult, Hash, QueryPreimage, StorePreimage};
mod messages;
pub use messages::{
EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError,
ServiceQueues,
};
#[cfg(feature = "try-runtime")]
mod try_runtime;
#[cfg(feature = "try-runtime")]
@@ -0,0 +1,202 @@
// This file is part of Substrate.
// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Traits for managing message queuing and handling.
use codec::{Decode, Encode, FullCodec, MaxEncodedLen};
use scale_info::TypeInfo;
use sp_core::{ConstU32, Get, TypedGet};
use sp_runtime::{traits::Convert, BoundedSlice, RuntimeDebug};
use sp_std::{fmt::Debug, marker::PhantomData, prelude::*};
use sp_weights::Weight;
/// Errors that can happen when attempting to process a message with
/// [`ProcessMessage::process_message()`].
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug)]
pub enum ProcessMessageError {
/// The message data format is unknown (e.g. unrecognised header)
BadFormat,
/// The message data is bad (e.g. decoding returns an error).
Corrupt,
/// The message format is unsupported (e.g. old XCM version).
Unsupported,
/// Message processing was not attempted because it was not certain that the weight limit
/// would be respected. The parameter gives the maximum weight which the message could take
/// to process.
Overweight(Weight),
}
/// Can process messages from a specific origin.
pub trait ProcessMessage {
/// The transport from where a message originates.
type Origin: FullCodec + MaxEncodedLen + Clone + Eq + PartialEq + TypeInfo + Debug;
/// Process the given message, using no more than `weight_limit` in weight to do so.
fn process_message(
message: &[u8],
origin: Self::Origin,
weight_limit: Weight,
) -> Result<(bool, Weight), ProcessMessageError>;
}
/// Errors that can happen when attempting to execute an overweight message with
/// [`ServiceQueues::execute_overweight()`].
#[derive(Eq, PartialEq, RuntimeDebug)]
pub enum ExecuteOverweightError {
/// The referenced message was not found.
NotFound,
/// The available weight was insufficient to execute the message.
InsufficientWeight,
}
/// Can service queues and execute overweight messages.
pub trait ServiceQueues {
/// Addresses a specific overweight message.
type OverweightMessageAddress;
/// Service all message queues in some fair manner.
///
/// - `weight_limit`: The maximum amount of dynamic weight that this call can use.
///
/// Returns the dynamic weight used by this call; is never greater than `weight_limit`.
fn service_queues(weight_limit: Weight) -> Weight;
/// Executes a message that could not be executed by [`Self::service_queues()`] because it was
/// temporarily overweight.
fn execute_overweight(
_weight_limit: Weight,
_address: Self::OverweightMessageAddress,
) -> Result<Weight, ExecuteOverweightError> {
Err(ExecuteOverweightError::NotFound)
}
}
/// The resource footprint of a queue.
#[derive(Default, Copy, Clone, Eq, PartialEq, RuntimeDebug)]
pub struct Footprint {
pub count: u64,
pub size: u64,
}
/// Can enqueue messages for multiple origins.
pub trait EnqueueMessage<Origin: MaxEncodedLen> {
/// The maximal length any enqueued message may have.
type MaxMessageLen: Get<u32>;
/// Enqueue a single `message` from a specific `origin`.
fn enqueue_message(message: BoundedSlice<u8, Self::MaxMessageLen>, origin: Origin);
/// Enqueue multiple `messages` from a specific `origin`.
fn enqueue_messages<'a>(
messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
origin: Origin,
);
/// Any remaining unprocessed messages should happen only lazily, not proactively.
fn sweep_queue(origin: Origin);
/// Return the state footprint of the given queue.
fn footprint(origin: Origin) -> Footprint;
}
impl<Origin: MaxEncodedLen> EnqueueMessage<Origin> for () {
type MaxMessageLen = ConstU32<0>;
fn enqueue_message(_: BoundedSlice<u8, Self::MaxMessageLen>, _: Origin) {}
fn enqueue_messages<'a>(
_: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
_: Origin,
) {
}
fn sweep_queue(_: Origin) {}
fn footprint(_: Origin) -> Footprint {
Footprint::default()
}
}
/// Transform the origin of an [`EnqueueMessage`] via `C::convert`.
pub struct TransformOrigin<E, O, N, C>(PhantomData<(E, O, N, C)>);
impl<E: EnqueueMessage<O>, O: MaxEncodedLen, N: MaxEncodedLen, C: Convert<N, O>> EnqueueMessage<N>
for TransformOrigin<E, O, N, C>
{
type MaxMessageLen = E::MaxMessageLen;
fn enqueue_message(message: BoundedSlice<u8, Self::MaxMessageLen>, origin: N) {
E::enqueue_message(message, C::convert(origin));
}
fn enqueue_messages<'a>(
messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
origin: N,
) {
E::enqueue_messages(messages, C::convert(origin));
}
fn sweep_queue(origin: N) {
E::sweep_queue(C::convert(origin));
}
fn footprint(origin: N) -> Footprint {
E::footprint(C::convert(origin))
}
}
/// Handles incoming messages for a single origin.
pub trait HandleMessage {
/// The maximal length any enqueued message may have.
type MaxMessageLen: Get<u32>;
/// Enqueue a single `message` with an implied origin.
fn handle_message(message: BoundedSlice<u8, Self::MaxMessageLen>);
/// Enqueue multiple `messages` from an implied origin.
fn handle_messages<'a>(
messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
);
/// Any remaining unprocessed messages should happen only lazily, not proactively.
fn sweep_queue();
/// Return the state footprint of the queue.
fn footprint() -> Footprint;
}
/// Adapter type to transform an [`EnqueueMessage`] with an origin into a [`HandleMessage`] impl.
pub struct EnqueueWithOrigin<E, O>(PhantomData<(E, O)>);
impl<E: EnqueueMessage<O::Type>, O: TypedGet> HandleMessage for EnqueueWithOrigin<E, O>
where
O::Type: MaxEncodedLen,
{
type MaxMessageLen = E::MaxMessageLen;
fn handle_message(message: BoundedSlice<u8, Self::MaxMessageLen>) {
E::enqueue_message(message, O::get());
}
fn handle_messages<'a>(
messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
) {
E::enqueue_messages(messages, O::get());
}
fn sweep_queue() {
E::sweep_queue(O::get());
}
fn footprint() -> Footprint {
E::footprint(O::get())
}
}