mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-06 03:18:01 +00:00
Switch to relay_dispatch_queue_remaining_capacity (#2608)
* Switch to `relay_dispatch_queue_remaining_capacity` This switches the parachain runtimes to use `relay_dispatch_queue_remaining_capacity` when possible. If the data is not yet available on the relay chain it falls back to `relay_dispatch_queue_size`. It will require that all parachains migrate to `relay_dispatch_queue_remaining_capacity` before we can start removing the call to `relay_dipatch_queue_size`. Besides that the pr adapts the xcm exumulator to make it work with the message queue. * Fix test and use correct types * ".git/.scripts/commands/fmt/fmt.sh" --------- Co-authored-by: command-bot <>
This commit is contained in:
Generated
+296
-258
File diff suppressed because it is too large
Load Diff
@@ -233,13 +233,13 @@ pub mod pallet {
|
||||
};
|
||||
|
||||
<PendingUpwardMessages<T>>::mutate(|up| {
|
||||
let (count, size) = relevant_messaging_state.relay_dispatch_queue_size;
|
||||
let queue_size = relevant_messaging_state.relay_dispatch_queue_size;
|
||||
|
||||
let available_capacity = cmp::min(
|
||||
host_config.max_upward_queue_count.saturating_sub(count),
|
||||
host_config.max_upward_message_num_per_candidate,
|
||||
queue_size.remaining_count,
|
||||
host_config.max_upward_message_num_per_candidate.into(),
|
||||
);
|
||||
let available_size = host_config.max_upward_queue_size.saturating_sub(size);
|
||||
let available_size = queue_size.remaining_size;
|
||||
|
||||
// Count the number of messages we can possibly fit in the given constraints, i.e.
|
||||
// available_capacity and available_size.
|
||||
@@ -431,7 +431,7 @@ pub mod pallet {
|
||||
.read_abridged_host_configuration()
|
||||
.expect("Invalid host configuration in relay chain state proof");
|
||||
let relevant_messaging_state = relay_state_proof
|
||||
.read_messaging_state_snapshot()
|
||||
.read_messaging_state_snapshot(&host_config)
|
||||
.expect("Invalid messaging state in relay chain state proof");
|
||||
|
||||
<ValidationData<T>>::put(&vfp);
|
||||
|
||||
@@ -24,6 +24,17 @@ use sp_state_machine::{Backend, TrieBackend, TrieBackendBuilder};
|
||||
use sp_std::vec::Vec;
|
||||
use sp_trie::{HashDBT, MemoryDB, StorageProof, EMPTY_PREFIX};
|
||||
|
||||
/// The capacity of the upward message queue of a parachain on the relay chain.
|
||||
// The field order should stay the same as the data can be found in the proof to ensure both are
|
||||
// have the same encoded representation.
|
||||
#[derive(Clone, Encode, Decode, TypeInfo, Default)]
|
||||
pub struct RelayDispachQueueSize {
|
||||
/// The number of additional messages that can be enqueued.
|
||||
pub remaining_count: u32,
|
||||
/// The total size of additional messages that can be enqueued.
|
||||
pub remaining_size: u32,
|
||||
}
|
||||
|
||||
/// A snapshot of some messaging related state of relay chain pertaining to the current parachain.
|
||||
///
|
||||
/// This data is essential for making sure that the parachain is aware of current resource use on
|
||||
@@ -37,10 +48,7 @@ pub struct MessagingStateSnapshot {
|
||||
pub dmq_mqc_head: relay_chain::Hash,
|
||||
|
||||
/// The current capacity of the upward message queue of the current parachain on the relay chain.
|
||||
///
|
||||
/// The capacity is represented by a tuple that consist of the `count` of the messages and the
|
||||
/// `total_size` expressed as the sum of byte sizes of all messages in the queue.
|
||||
pub relay_dispatch_queue_size: (u32, u32),
|
||||
pub relay_dispatch_queue_size: RelayDispachQueueSize,
|
||||
|
||||
/// Information about all the inbound HRMP channels.
|
||||
///
|
||||
@@ -164,7 +172,10 @@ impl RelayChainStateProof {
|
||||
/// Read the [`MessagingStateSnapshot`] from the relay chain state proof.
|
||||
///
|
||||
/// Returns an error if anything failed at reading or decoding.
|
||||
pub fn read_messaging_state_snapshot(&self) -> Result<MessagingStateSnapshot, Error> {
|
||||
pub fn read_messaging_state_snapshot(
|
||||
&self,
|
||||
host_config: &AbridgedHostConfiguration,
|
||||
) -> Result<MessagingStateSnapshot, Error> {
|
||||
let dmq_mqc_head: relay_chain::Hash = read_entry(
|
||||
&self.trie_backend,
|
||||
&relay_chain::well_known_keys::dmq_mqc_head(self.para_id),
|
||||
@@ -172,12 +183,35 @@ impl RelayChainStateProof {
|
||||
)
|
||||
.map_err(Error::DmqMqcHead)?;
|
||||
|
||||
let relay_dispatch_queue_size: (u32, u32) = read_entry(
|
||||
let relay_dispatch_queue_size = read_optional_entry::<RelayDispachQueueSize, _>(
|
||||
&self.trie_backend,
|
||||
&relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
|
||||
Some((0, 0)),
|
||||
)
|
||||
.map_err(Error::RelayDispatchQueueSize)?;
|
||||
&relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(self.para_id)
|
||||
.key,
|
||||
);
|
||||
|
||||
// TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size`
|
||||
//
|
||||
// When the relay chain and all parachains support `relay_dispatch_queue_remaining_capacity`,
|
||||
// this code here needs to be removed and above needs to be changed to `read_entry` that
|
||||
// returns an error if `relay_dispatch_queue_remaining_capacity` can not be found/decoded.
|
||||
//
|
||||
// For now we just fallback to the old dispatch queue size if there is an error.
|
||||
let relay_dispatch_queue_size = match relay_dispatch_queue_size {
|
||||
Ok(Some(r)) => r,
|
||||
_ => {
|
||||
let res = read_entry::<(u32, u32), _>(
|
||||
&self.trie_backend,
|
||||
#[allow(deprecated)]
|
||||
&relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
|
||||
Some((0, 0)),
|
||||
)
|
||||
.map_err(Error::RelayDispatchQueueSize)?;
|
||||
|
||||
let remaining_count = host_config.max_upward_queue_count.saturating_sub(res.0);
|
||||
let remaining_size = host_config.max_upward_queue_size.saturating_sub(res.1);
|
||||
RelayDispachQueueSize { remaining_count, remaining_size }
|
||||
},
|
||||
};
|
||||
|
||||
let ingress_channel_index: Vec<ParaId> = read_entry(
|
||||
&self.trie_backend,
|
||||
|
||||
@@ -513,7 +513,7 @@ fn send_upward_message_num_per_candidate() {
|
||||
BlockTests::new()
|
||||
.with_relay_sproof_builder(|_, _, sproof| {
|
||||
sproof.host_config.max_upward_message_num_per_candidate = 1;
|
||||
sproof.relay_dispatch_queue_size = None;
|
||||
sproof.relay_dispatch_queue_remaining_capacity = None;
|
||||
})
|
||||
.add_with_post_test(
|
||||
1,
|
||||
@@ -544,8 +544,8 @@ fn send_upward_message_relay_bottleneck() {
|
||||
sproof.host_config.max_upward_queue_count = 5;
|
||||
|
||||
match relay_block_num {
|
||||
1 => sproof.relay_dispatch_queue_size = Some((5, 0)),
|
||||
2 => sproof.relay_dispatch_queue_size = Some((4, 0)),
|
||||
1 => sproof.relay_dispatch_queue_remaining_capacity = Some((0, 2048)),
|
||||
2 => sproof.relay_dispatch_queue_remaining_capacity = Some((1, 2048)),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
})
|
||||
|
||||
@@ -24,6 +24,7 @@ decl_test_relay_chains! {
|
||||
RuntimeOrigin: polkadot_runtime::RuntimeOrigin,
|
||||
RuntimeCall: polkadot_runtime::RuntimeCall,
|
||||
RuntimeEvent: polkadot_runtime::RuntimeEvent,
|
||||
MessageQueue: polkadot_runtime::MessageQueue,
|
||||
XcmConfig: polkadot_runtime::xcm_config::XcmConfig,
|
||||
SovereignAccountOf: polkadot_runtime::xcm_config::SovereignAccountOf,
|
||||
System: polkadot_runtime::System,
|
||||
@@ -41,6 +42,7 @@ decl_test_relay_chains! {
|
||||
RuntimeOrigin: kusama_runtime::RuntimeOrigin,
|
||||
RuntimeCall: polkadot_runtime::RuntimeCall,
|
||||
RuntimeEvent: kusama_runtime::RuntimeEvent,
|
||||
MessageQueue: polkadot_runtime::MessageQueue,
|
||||
XcmConfig: kusama_runtime::xcm_config::XcmConfig,
|
||||
SovereignAccountOf: kusama_runtime::xcm_config::SovereignAccountOf,
|
||||
System: kusama_runtime::System,
|
||||
|
||||
@@ -99,7 +99,11 @@ async fn collect_relay_storage_proof(
|
||||
relay_well_known_keys::CURRENT_SLOT.to_vec(),
|
||||
relay_well_known_keys::ACTIVE_CONFIG.to_vec(),
|
||||
relay_well_known_keys::dmq_mqc_head(para_id),
|
||||
// TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size`
|
||||
// We need to keep this here until all parachains have migrated to `relay_dispatch_queue_remaining_capacity`.
|
||||
#[allow(deprecated)]
|
||||
relay_well_known_keys::relay_dispatch_queue_size(para_id),
|
||||
relay_well_known_keys::relay_dispatch_queue_remaining_capacity(para_id).key,
|
||||
relay_well_known_keys::hrmp_ingress_channel_index(para_id),
|
||||
relay_well_known_keys::hrmp_egress_channel_index(para_id),
|
||||
relay_well_known_keys::upgrade_go_ahead_signal(para_id),
|
||||
|
||||
@@ -38,7 +38,7 @@ pub struct RelayStateSproofBuilder {
|
||||
pub host_config: AbridgedHostConfiguration,
|
||||
pub dmq_mqc_head: Option<relay_chain::Hash>,
|
||||
pub upgrade_go_ahead: Option<UpgradeGoAhead>,
|
||||
pub relay_dispatch_queue_size: Option<(u32, u32)>,
|
||||
pub relay_dispatch_queue_remaining_capacity: Option<(u32, u32)>,
|
||||
pub hrmp_ingress_channel_index: Option<Vec<ParaId>>,
|
||||
pub hrmp_egress_channel_index: Option<Vec<ParaId>>,
|
||||
pub hrmp_channels: BTreeMap<relay_chain::HrmpChannelId, AbridgedHrmpChannel>,
|
||||
@@ -65,7 +65,7 @@ impl Default for RelayStateSproofBuilder {
|
||||
},
|
||||
dmq_mqc_head: None,
|
||||
upgrade_go_ahead: None,
|
||||
relay_dispatch_queue_size: None,
|
||||
relay_dispatch_queue_remaining_capacity: None,
|
||||
hrmp_ingress_channel_index: None,
|
||||
hrmp_egress_channel_index: None,
|
||||
hrmp_channels: BTreeMap::new(),
|
||||
@@ -124,9 +124,12 @@ impl RelayStateSproofBuilder {
|
||||
dmq_mqc_head.encode(),
|
||||
);
|
||||
}
|
||||
if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_size {
|
||||
if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_remaining_capacity {
|
||||
insert(
|
||||
relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
|
||||
relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(
|
||||
self.para_id,
|
||||
)
|
||||
.key,
|
||||
relay_dispatch_queue_size.encode(),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ sp-std = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-arithmetic = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
pallet-balances = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
pallet-message-queue = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
cumulus-primitives-core = { path = "../../primitives/core"}
|
||||
cumulus-pallet-xcmp-queue = { path = "../../pallets/xcmp-queue" }
|
||||
|
||||
@@ -18,8 +18,8 @@ pub use casey::pascal;
|
||||
pub use codec::Encode;
|
||||
pub use frame_support::{
|
||||
sp_runtime::BuildStorage,
|
||||
traits::{Get, Hooks},
|
||||
weights::Weight,
|
||||
traits::{EnqueueMessage, Get, Hooks, ProcessMessage, ProcessMessageError, ServiceQueues},
|
||||
weights::{Weight, WeightMeter},
|
||||
};
|
||||
pub use frame_system::AccountInfo;
|
||||
pub use log;
|
||||
@@ -41,13 +41,14 @@ pub use cumulus_primitives_core::{
|
||||
pub use cumulus_primitives_parachain_inherent::ParachainInherentData;
|
||||
pub use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
|
||||
pub use cumulus_test_service::get_account_id_from_seed;
|
||||
pub use pallet_message_queue;
|
||||
pub use parachain_info;
|
||||
pub use parachains_common::{AccountId, BlockNumber};
|
||||
|
||||
pub use polkadot_primitives;
|
||||
pub use polkadot_runtime_parachains::{
|
||||
dmp,
|
||||
ump::{MessageId, UmpSink, XcmSink},
|
||||
inclusion::{AggregateMessageOrigin, UmpQueueId},
|
||||
};
|
||||
pub use std::{collections::HashMap, thread::LocalKey};
|
||||
pub use xcm::{v3::prelude::*, VersionedXcm};
|
||||
@@ -164,7 +165,7 @@ pub trait NetworkComponent<N: Network> {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait RelayChain: UmpSink {
|
||||
pub trait RelayChain: ProcessMessage {
|
||||
type Runtime;
|
||||
type RuntimeOrigin;
|
||||
type RuntimeCall;
|
||||
@@ -198,14 +199,15 @@ macro_rules! decl_test_relay_chains {
|
||||
genesis = $genesis:expr,
|
||||
on_init = $on_init:expr,
|
||||
runtime = {
|
||||
Runtime: $($runtime:tt)::*,
|
||||
RuntimeOrigin: $($runtime_origin:tt)::*,
|
||||
RuntimeCall: $($runtime_call:tt)::*,
|
||||
RuntimeEvent: $($runtime_event:tt)::*,
|
||||
XcmConfig: $($xcm_config:tt)::*,
|
||||
SovereignAccountOf: $($sovereign_acc_of:tt)::*,
|
||||
System: $($system:tt)::*,
|
||||
Balances: $($balances:tt)::*,
|
||||
Runtime: $runtime:path,
|
||||
RuntimeOrigin: $runtime_origin:path,
|
||||
RuntimeCall: $runtime_call:path,
|
||||
RuntimeEvent: $runtime_event:path,
|
||||
MessageQueue: $mq:path,
|
||||
XcmConfig: $xcm_config:path,
|
||||
SovereignAccountOf: $sovereign_acc_of:path,
|
||||
System: $system:path,
|
||||
Balances: $balances:path,
|
||||
},
|
||||
pallets_extra = {
|
||||
$($pallet_name:ident: $pallet_path:path,)*
|
||||
@@ -218,14 +220,14 @@ macro_rules! decl_test_relay_chains {
|
||||
pub struct $name;
|
||||
|
||||
impl RelayChain for $name {
|
||||
type Runtime = $($runtime)::*;
|
||||
type RuntimeOrigin = $($runtime_origin)::*;
|
||||
type RuntimeCall = $($runtime_call)::*;
|
||||
type RuntimeEvent = $($runtime_event)::*;
|
||||
type XcmConfig = $($xcm_config)::*;
|
||||
type SovereignAccountOf = $($sovereign_acc_of)::*;
|
||||
type System = $($system)::*;
|
||||
type Balances = $($balances)::*;
|
||||
type Runtime = $runtime;
|
||||
type RuntimeOrigin = $runtime_origin;
|
||||
type RuntimeCall = $runtime_call;
|
||||
type RuntimeEvent = $runtime_event;
|
||||
type XcmConfig = $xcm_config;
|
||||
type SovereignAccountOf = $sovereign_acc_of;
|
||||
type System = $system;
|
||||
type Balances = $balances;
|
||||
}
|
||||
|
||||
$crate::paste::paste! {
|
||||
@@ -242,34 +244,46 @@ macro_rules! decl_test_relay_chains {
|
||||
}
|
||||
}
|
||||
|
||||
$crate::__impl_xcm_handlers_for_relay_chain!($name);
|
||||
impl $crate::ProcessMessage for $name {
|
||||
type Origin = $crate::ParaId;
|
||||
|
||||
fn process_message(
|
||||
msg: &[u8],
|
||||
para: Self::Origin,
|
||||
meter: &mut $crate::WeightMeter,
|
||||
) -> Result<bool, $crate::ProcessMessageError> {
|
||||
use $crate::{Weight, AggregateMessageOrigin, UmpQueueId, ServiceQueues, EnqueueMessage};
|
||||
use $mq as message_queue;
|
||||
use $runtime_event as runtime_event;
|
||||
|
||||
Self::execute_with(|| {
|
||||
<$mq as EnqueueMessage<AggregateMessageOrigin>>::enqueue_message(
|
||||
msg.try_into().expect("Message too long"),
|
||||
AggregateMessageOrigin::Ump(UmpQueueId::Para(para.clone()))
|
||||
);
|
||||
|
||||
<$system>::reset_events();
|
||||
<$mq as ServiceQueues>::service_queues(Weight::MAX);
|
||||
let events = <$system>::events();
|
||||
let event = events.last().expect("There must be at least one event");
|
||||
|
||||
match &event.event {
|
||||
runtime_event::MessageQueue(
|
||||
$crate::pallet_message_queue::Event::Processed {origin, ..}) => {
|
||||
assert_eq!(origin, &AggregateMessageOrigin::Ump(UmpQueueId::Para(para)));
|
||||
},
|
||||
event => panic!("Unexpected event: {:#?}", event),
|
||||
}
|
||||
Ok(true)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
$crate::__impl_test_ext_for_relay_chain!($name, $genesis, $on_init);
|
||||
)+
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! __impl_xcm_handlers_for_relay_chain {
|
||||
($name:ident) => {
|
||||
impl $crate::UmpSink for $name {
|
||||
fn process_upward_message(
|
||||
origin: $crate::ParaId,
|
||||
msg: &[u8],
|
||||
max_weight: $crate::Weight,
|
||||
) -> Result<$crate::Weight, ($crate::MessageId, $crate::Weight)> {
|
||||
use $crate::{TestExt, UmpSink};
|
||||
|
||||
Self::execute_with(|| {
|
||||
$crate::XcmSink::<
|
||||
$crate::XcmExecutor<<Self as RelayChain>::XcmConfig>,
|
||||
<Self as RelayChain>::Runtime,
|
||||
>::process_upward_message(origin, msg, max_weight)
|
||||
})
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! __impl_test_ext_for_relay_chain {
|
||||
// entry point: generate ext name
|
||||
@@ -800,12 +814,13 @@ macro_rules! decl_test_networks {
|
||||
}
|
||||
|
||||
fn _process_upward_messages() {
|
||||
use $crate::{UmpSink, Bounded};
|
||||
use $crate::{Bounded, ProcessMessage, WeightMeter};
|
||||
while let Some((from_para_id, msg)) = $crate::UPWARD_MESSAGES.with(|b| b.borrow_mut().get_mut(stringify!($name)).unwrap().pop_front()) {
|
||||
let _ = <$relay_chain>::process_upward_message(
|
||||
from_para_id.into(),
|
||||
let mut weight_meter = WeightMeter::max_limit();
|
||||
let _ = <$relay_chain>::process_message(
|
||||
&msg[..],
|
||||
$crate::Weight::max_value(),
|
||||
from_para_id.into(),
|
||||
&mut weight_meter,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user