Use Message Queue as DMP and XCMP dispatch queue (#1246)

(imported from https://github.com/paritytech/cumulus/pull/2157)

## Changes

This MR refactores the XCMP, Parachains System and DMP pallets to use
the [MessageQueue](https://github.com/paritytech/substrate/pull/12485)
for delayed execution of incoming messages. The DMP pallet is entirely
replaced by the MQ and thereby removed. This allows for PoV-bounded
execution and resolves a number of issues that stem from the current
work-around.

All System Parachains adopt this change.  
The most important changes are in `primitives/core/src/lib.rs`,
`parachains/common/src/process_xcm_message.rs`,
`pallets/parachain-system/src/lib.rs`, `pallets/xcmp-queue/src/lib.rs`
and the runtime configs.

### DMP Queue Pallet

The pallet got removed and its logic refactored into parachain-system.
Overweight message management can be done directly through the MQ
pallet.

Final undeployment migrations are provided by
`cumulus_pallet_dmp_queue::UndeployDmpQueue` and `DeleteDmpQueue` that
can be configured with an aux config trait like:

```rust
parameter_types! {
	pub const DmpQueuePalletName: &'static str = \"DmpQueue\" < CHANGE ME;
	pub const RelayOrigin: AggregateMessageOrigin = AggregateMessageOrigin::Parent;
}

impl cumulus_pallet_dmp_queue::MigrationConfig for Runtime {
	type PalletName = DmpQueuePalletName;
	type DmpHandler = frame_support::traits::EnqueueWithOrigin<MessageQueue, RelayOrigin>;
	type DbWeight = <Runtime as frame_system::Config>::DbWeight;
}

// And adding them to your Migrations tuple:
pub type Migrations = (
	...
	cumulus_pallet_dmp_queue::UndeployDmpQueue<Runtime>,
	cumulus_pallet_dmp_queue::DeleteDmpQueue<Runtime>,
);
```

### XCMP Queue pallet

Removed all dispatch queue functionality. Incoming XCMP messages are now
either: Immediately handled if they are Signals, enqueued into the MQ
pallet otherwise.

New config items for the XCMP queue pallet:
```rust
/// The actual queue implementation that retains the messages for later processing.
type XcmpQueue: EnqueueMessage<ParaId>;

/// How a XCM over HRMP from a sibling parachain should be processed.
type XcmpProcessor: ProcessMessage<Origin = ParaId>;

/// The maximal number of suspended XCMP channels at the same time.
#[pallet::constant]
type MaxInboundSuspended: Get<u32>;
```

How to configure those:

```rust
// Use the MessageQueue pallet to store messages for later processing. The `TransformOrigin` is needed since
// the MQ pallet itself operators on `AggregateMessageOrigin` but we want to enqueue `ParaId`s.
type XcmpQueue = TransformOrigin<MessageQueue, AggregateMessageOrigin, ParaId, ParaIdToSibling>;

// Process XCMP messages from siblings. This is type-safe to only accept `ParaId`s. They will be dispatched
// with origin `Junction::Sibling(…)`.
type XcmpProcessor = ProcessFromSibling<
	ProcessXcmMessage<
		AggregateMessageOrigin,
		xcm_executor::XcmExecutor<xcm_config::XcmConfig>,
		RuntimeCall,
	>,
>;

// Not really important what to choose here. Just something larger than the maximal number of channels.
type MaxInboundSuspended = sp_core::ConstU32<1_000>;
```

The `InboundXcmpStatus` storage item was replaced by
`InboundXcmpSuspended` since it now only tracks inbound queue suspension
and no message indices anymore.

Now only sends the most recent channel `Signals`, as all prio ones are
out-dated anyway.

### Parachain System pallet

For `DMP` messages instead of forwarding them to the `DMP` pallet, it
now pushes them to the configured `DmpQueue`. The message processing
which was triggered in `set_validation_data` is now being done by the MQ
pallet `on_initialize`.

XCMP messages are still handed off to the `XcmpMessageHandler`
(XCMP-Queue pallet) - no change here.

New config items for the parachain system pallet:
```rust
/// Queues inbound downward messages for delayed processing. 
///
/// Analogous to the `XcmpQueue` of the XCMP queue pallet.
type DmpQueue: EnqueueMessage<AggregateMessageOrigin>;
``` 

How to configure:
```rust
/// Use the MQ pallet to store DMP messages for delayed processing.
type DmpQueue = MessageQueue;
``` 

## Message Flow

The flow of messages on the parachain side. Messages come in from the
left via the `Validation Data` and finally end up at the `Xcm Executor`
on the right.

![Untitled
(1)](https://github.com/paritytech/cumulus/assets/10380170/6cf8b377-88c9-4aed-96df-baace266e04d)

## Further changes

- Bumped the default suspension, drop and resume thresholds in
`QueueConfigData::default()`.
- `XcmpQueue::{suspend_xcm_execution, resume_xcm_execution}` errors when
they would be a noop.
- Properly validate the `QueueConfigData` before setting it.
- Marked weight files as auto-generated so they wont auto-expand in the
MR files view.
- Move the `hypothetical` asserts to `frame_support` under the name
`experimental_hypothetically`

Questions:
- [ ] What about the ugly `#[cfg(feature = \"runtime-benchmarks\")]` in
the runtimes? Not sure how to best fix. Just having them like this makes
tests fail that rely on the real message processor when the feature is
enabled.
- [ ] Need a good weight for `MessageQueueServiceWeight`. The scheduler
already takes 80% so I put it to 10% but that is quite low.

TODO:
- [x] Remove c&p code after
https://github.com/paritytech/polkadot/pull/6271
- [x] Use `HandleMessage` once it is public in Substrate
- [x] fix `runtime-benchmarks` feature
https://github.com/paritytech/polkadot/pull/6966
- [x] Benchmarks
- [x] Tests
- [ ] Migrate `InboundXcmpStatus` to `InboundXcmpSuspended`
- [x] Possibly cleanup Migrations (DMP+XCMP)
- [x] optional: create `TransformProcessMessageOrigin` in Substrate and
replace `ProcessFromSibling`
- [ ] Rerun weights on ref HW

---------

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: Liam Aharon <liam.aharon@hotmail.com>
Co-authored-by: joe petrowski <25483142+joepetrowski@users.noreply.github.com>
Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com>
Co-authored-by: command-bot <>
This commit is contained in:
Oliver Tale-Yazdi
2023-11-02 15:31:38 +01:00
committed by GitHub
parent 7df0417bcd
commit e1c033ebe1
277 changed files with 11604 additions and 4733 deletions
+108 -21
View File
@@ -24,19 +24,23 @@ pub use std::{
};
// Substrate
pub use cumulus_primitives_core::AggregateMessageOrigin as CumulusAggregateMessageOrigin;
pub use frame_support::{
assert_ok,
sp_runtime::{traits::Header as HeaderT, DispatchResult},
traits::{
EnqueueMessage, Get, Hooks, OriginTrait, ProcessMessage, ProcessMessageError, ServiceQueues,
EnqueueMessage, ExecuteOverweightError, Get, Hooks, OnInitialize, OriginTrait,
ProcessMessage, ProcessMessageError, ServiceQueues,
},
weights::{Weight, WeightMeter},
};
pub use frame_system::{Config as SystemConfig, Pallet as SystemPallet};
pub use pallet_balances::AccountData;
pub use pallet_message_queue;
pub use sp_arithmetic::traits::Bounded;
pub use sp_core::{blake2_256, parameter_types, sr25519, storage::Storage, Pair};
pub use sp_io::TestExternalities;
pub use sp_runtime::BoundedSlice;
pub use sp_std::{cell::RefCell, collections::vec_deque::VecDeque, fmt::Debug};
pub use sp_tracing;
@@ -227,8 +231,8 @@ pub trait Chain: TestExt + NetworkComponent {
}
pub trait RelayChain: Chain {
type MessageProcessor: ProcessMessage;
type SovereignAccountOf: ConvertLocation<AccountIdOf<Self::Runtime>>;
type MessageProcessor: ProcessMessage<Origin = ParaId> + ServiceQueues;
fn child_location_of(id: ParaId) -> MultiLocation {
(Ancestor(0), ParachainJunction(id.into())).into()
@@ -245,10 +249,10 @@ pub trait RelayChain: Chain {
pub trait Parachain: Chain {
type XcmpMessageHandler: XcmpMessageHandler;
type DmpMessageHandler: DmpMessageHandler;
type LocationToAccountId: ConvertLocation<AccountIdOf<Self::Runtime>>;
type ParachainInfo: Get<ParaId>;
type ParachainSystem;
type MessageProcessor: ProcessMessage<Origin = CumulusAggregateMessageOrigin> + ServiceQueues;
fn init();
@@ -346,7 +350,6 @@ macro_rules! decl_test_relay_chains {
core = {
MessageProcessor: $mp:path,
SovereignAccountOf: $sovereign_acc_of:path,
},
pallets = {
$($pallet_name:ident: $pallet_path:path,)*
@@ -569,9 +572,9 @@ macro_rules! decl_test_parachains {
runtime = $runtime:ident,
core = {
XcmpMessageHandler: $xcmp_message_handler:path,
DmpMessageHandler: $dmp_message_handler:path,
LocationToAccountId: $location_to_account:path,
ParachainInfo: $parachain_info:path,
MessageProcessor: $message_processor:path,
},
pallets = {
$($pallet_name:ident: $pallet_path:path,)*
@@ -606,10 +609,10 @@ macro_rules! decl_test_parachains {
impl $crate::Parachain for $name {
type XcmpMessageHandler = $xcmp_message_handler;
type DmpMessageHandler = $dmp_message_handler;
type LocationToAccountId = $location_to_account;
type ParachainSystem = $crate::ParachainSystemPallet<<Self as $crate::Chain>::Runtime>;
type ParachainInfo = $parachain_info;
type MessageProcessor = $message_processor;
// We run an empty block during initialisation to open HRMP channels
// and have them ready for the next block
@@ -968,7 +971,7 @@ macro_rules! decl_test_networks {
}
fn process_downward_messages() {
use $crate::{DmpMessageHandler, Bounded, Parachain, RelayChainBlockNumber, TestExt};
use $crate::{DmpMessageHandler, Bounded, Parachain, RelayChainBlockNumber, TestExt, Encode};
while let Some((to_para_id, messages))
= $crate::DOWNWARD_MESSAGES.with(|b| b.borrow_mut().get_mut(Self::name()).unwrap().pop_front()) {
@@ -983,16 +986,25 @@ macro_rules! decl_test_networks {
msg_dedup.dedup();
let msgs = msg_dedup.clone().into_iter().filter(|m| {
!$crate::DMP_DONE.with(|b| b.borrow_mut().get_mut(Self::name()).unwrap_or(&mut $crate::VecDeque::new()).contains(&(to_para_id, m.0, m.1.clone())))
!$crate::DMP_DONE.with(|b| b.borrow().get(stringify!($name))
.unwrap_or(&mut $crate::VecDeque::new())
.contains(&(to_para_id, m.0, m.1.clone()))
)
}).collect::<Vec<(RelayChainBlockNumber, Vec<u8>)>>();
if msgs.len() != 0 {
use $crate::{ProcessMessage, CumulusAggregateMessageOrigin, BoundedSlice, WeightMeter, TestExt};
for (block, msg) in msgs.clone().into_iter() {
let mut weight_meter = WeightMeter::new();
<$parachain>::ext_wrapper(|| {
<$parachain as Parachain>::DmpMessageHandler::handle_dmp_messages(msgs.clone().into_iter(), $crate::Weight::max_value());
let _ = <$parachain as Parachain>::MessageProcessor::process_message(
&msg[..],
$crate::CumulusAggregateMessageOrigin::Parent,
&mut weight_meter,
&mut msg.using_encoded(sp_core::blake2_256),
);
});
$crate::log::debug!(target: concat!("dmp::", stringify!($name)) , "DMP messages processed {:?} to para_id {:?}", msgs.clone(), &to_para_id);
for m in msgs {
$crate::DMP_DONE.with(|b| b.borrow_mut().get_mut(Self::name()).unwrap().push_back((to_para_id, m.0, m.1)));
}
$crate::DMP_DONE.with(|b| b.borrow_mut().get_mut(Self::name()).unwrap().push_back((to_para_id, block, msg)));
}
}
)*
@@ -1000,7 +1012,7 @@ macro_rules! decl_test_networks {
}
fn process_horizontal_messages() {
use $crate::{XcmpMessageHandler, Bounded, Parachain, TestExt};
use $crate::{XcmpMessageHandler, ServiceQueues, Bounded};
while let Some((to_para_id, messages))
= $crate::HORIZONTAL_MESSAGES.with(|b| b.borrow_mut().get_mut(Self::name()).unwrap().pop_front()) {
@@ -1010,7 +1022,9 @@ macro_rules! decl_test_networks {
if $crate::PARA_IDS.with(|b| b.borrow_mut().get_mut(Self::name()).unwrap().contains(&to_para_id)) && para_id == to_para_id {
<$parachain>::ext_wrapper(|| {
<$parachain as Parachain>::XcmpMessageHandler::handle_xcmp_messages(iter.clone(), $crate::Weight::max_value());
<$parachain as Parachain>::XcmpMessageHandler::handle_xcmp_messages(iter.clone(), $crate::Weight::MAX);
// Nudge the MQ pallet to process immediately instead of in the next block.
let _ = <$parachain as Parachain>::MessageProcessor::service_queues($crate::Weight::MAX);
});
$crate::log::debug!(target: concat!("hrmp::", stringify!($name)) , "HRMP messages processed {:?} to para_id {:?}", &messages, &to_para_id);
}
@@ -1019,10 +1033,10 @@ macro_rules! decl_test_networks {
}
fn process_upward_messages() {
use $crate::{Encode, ProcessMessage, TestExt};
use $crate::{Encode, ProcessMessage, TestExt, WeightMeter};
while let Some((from_para_id, msg)) = $crate::UPWARD_MESSAGES.with(|b| b.borrow_mut().get_mut(Self::name()).unwrap().pop_front()) {
let mut weight_meter = $crate::WeightMeter::new();
let mut weight_meter = WeightMeter::new();
<$relay_chain>::ext_wrapper(|| {
let _ = <$relay_chain as $crate::RelayChain>::MessageProcessor::process_message(
&msg[..],
@@ -1199,6 +1213,7 @@ macro_rules! assert_expected_events {
let mut event_message: Vec<String> = Vec::new();
for (index, event) in events.iter().enumerate() {
$crate::log::debug!(target: concat!("events::", stringify!($chain)), "{:?}", event);
// Have to reset the variable to override a previous partial match
meet_conditions = true;
match event {
@@ -1244,7 +1259,7 @@ macro_rules! assert_expected_events {
)
);
} else if !event_received {
message.push(format!("\n\n{}::\x1b[31m{}\x1b[0m was never received", stringify!($chain), stringify!($event_pat)));
message.push(format!("\n\n{}::\x1b[31m{}\x1b[0m was never received. All events:\n{:#?}", stringify!($chain), stringify!($event_pat), <$chain>::events()));
} else {
// If we find a perfect match we remove the event to avoid being potentially assessed multiple times
events.remove(index_match);
@@ -1282,10 +1297,60 @@ macro_rules! decl_test_sender_receiver_accounts_parameter_types {
};
}
pub struct DefaultMessageProcessor<T>(PhantomData<T>);
impl<T> ProcessMessage for DefaultMessageProcessor<T>
pub struct DefaultParaMessageProcessor<T>(PhantomData<T>);
// Process HRMP messages from sibling paraids
impl<T> ProcessMessage for DefaultParaMessageProcessor<T>
where
T: Chain + RelayChain,
T: Parachain,
T::Runtime: MessageQueueConfig,
<<T::Runtime as MessageQueueConfig>::MessageProcessor as ProcessMessage>::Origin:
PartialEq<CumulusAggregateMessageOrigin>,
MessageQueuePallet<T::Runtime>: EnqueueMessage<CumulusAggregateMessageOrigin> + ServiceQueues,
{
type Origin = CumulusAggregateMessageOrigin;
fn process_message(
msg: &[u8],
orig: Self::Origin,
_meter: &mut WeightMeter,
_id: &mut XcmHash,
) -> Result<bool, ProcessMessageError> {
MessageQueuePallet::<T::Runtime>::enqueue_message(
msg.try_into().expect("Message too long"),
orig.clone(),
);
MessageQueuePallet::<T::Runtime>::service_queues(Weight::MAX);
Ok(true)
}
}
impl<T> ServiceQueues for DefaultParaMessageProcessor<T>
where
T: Parachain,
T::Runtime: MessageQueueConfig,
<<T::Runtime as MessageQueueConfig>::MessageProcessor as ProcessMessage>::Origin:
PartialEq<CumulusAggregateMessageOrigin>,
MessageQueuePallet<T::Runtime>: EnqueueMessage<CumulusAggregateMessageOrigin> + ServiceQueues,
{
type OverweightMessageAddress = ();
fn service_queues(weight_limit: Weight) -> Weight {
MessageQueuePallet::<T::Runtime>::service_queues(weight_limit)
}
fn execute_overweight(
_weight_limit: Weight,
_address: Self::OverweightMessageAddress,
) -> Result<Weight, ExecuteOverweightError> {
unimplemented!()
}
}
pub struct DefaultRelayMessageProcessor<T>(PhantomData<T>);
// Process UMP messages on the relay
impl<T> ProcessMessage for DefaultRelayMessageProcessor<T>
where
T: RelayChain,
T::Runtime: MessageQueueConfig,
<<T::Runtime as MessageQueueConfig>::MessageProcessor as ProcessMessage>::Origin:
PartialEq<AggregateMessageOrigin>,
@@ -1309,6 +1374,28 @@ where
}
}
impl<T> ServiceQueues for DefaultRelayMessageProcessor<T>
where
T: RelayChain,
T::Runtime: MessageQueueConfig,
<<T::Runtime as MessageQueueConfig>::MessageProcessor as ProcessMessage>::Origin:
PartialEq<AggregateMessageOrigin>,
MessageQueuePallet<T::Runtime>: EnqueueMessage<AggregateMessageOrigin> + ServiceQueues,
{
type OverweightMessageAddress = ();
fn service_queues(weight_limit: Weight) -> Weight {
MessageQueuePallet::<T::Runtime>::service_queues(weight_limit)
}
fn execute_overweight(
_weight_limit: Weight,
_address: Self::OverweightMessageAddress,
) -> Result<Weight, ExecuteOverweightError> {
unimplemented!()
}
}
/// Struct that keeps account's id and balance
#[derive(Clone)]
pub struct TestAccount<R: Chain> {