mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 08:11:03 +00:00
Inform the PVF with the latest relevant relay chain state (#279)
* Update polkadot * Extend cumulus primitives with some relay chain exports Follow https://github.com/paritytech/polkadot/pull/2194 to see the polkadot PR * collator: collect the state proof This commit changes cumulus-collator so that it takes the relay chain state at the relay parent and creates a storage proof that contains all the required data for PVF. * parachain-upgrade: use the proofs instead This change is needed to make cumulus logic to not longer depend on the transient validation data. As part of this change, in order to preserve the current behavior `code_upgrade_allowed` now is computed on the parachain side, rather than provided by polkadot. Turned out that this requires to know the self parachain id so it was added where needed. * message-broker: use relay state to track limits this should make sending messages safe from accidentally running over the relay chain limits that were previously unknown. * Update polkadot So that `relay_storage_root` is available through `ValidationParams` * Check `relay_storage_root` matches expected Check that `relay_storage_root` submitted by the collator matches the one that we receive in `validate_block` through `ValidationParams` * Add a missing check for `dmq_mqc_head` while we are at it * Update polkadot * Fix tests that use the relay storage root * Apply suggestions from code review Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update message-broker/src/lib.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Remove unneeded (&_) * Fix unwraps * Polish basti's suggestion * Fix merge * Bring back the System::can_set_code check Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -16,6 +16,7 @@ codec = { package = "parity-scale-codec", version = "1.3.0", features = [ "deriv
|
||||
|
||||
# Cumulus dependencies
|
||||
cumulus-primitives = { path = "../primitives", default-features = false }
|
||||
cumulus-parachain-upgrade = { path = "../parachain-upgrade", default-features = false }
|
||||
|
||||
[features]
|
||||
default = [ "std" ]
|
||||
@@ -24,5 +25,6 @@ std = [
|
||||
"frame-system/std",
|
||||
"sp-inherents/std",
|
||||
"cumulus-primitives/std",
|
||||
"cumulus-parachain-upgrade/std",
|
||||
"sp-std/std"
|
||||
]
|
||||
|
||||
@@ -37,12 +37,8 @@ use cumulus_primitives::{
|
||||
OutboundHrmpMessage, ParaId, UpwardMessage, UpwardMessageSender,
|
||||
};
|
||||
|
||||
// TODO: these should be not a constant, but sourced from the relay-chain configuration.
|
||||
const UMP_MSG_NUM_PER_CANDIDATE: usize = 5;
|
||||
const HRMP_MSG_NUM_PER_CANDIDATE: usize = 5;
|
||||
|
||||
/// Configuration trait of the message broker pallet.
|
||||
pub trait Config: frame_system::Config {
|
||||
pub trait Config: frame_system::Config + cumulus_parachain_upgrade::Config {
|
||||
/// The downward message handlers that will be informed when a message is received.
|
||||
type DownwardMessageHandlers: DownwardMessageHandler;
|
||||
/// The HRMP message handlers that will be informed when a message is received.
|
||||
@@ -58,6 +54,9 @@ decl_storage! {
|
||||
/// HRMP channels with the given recipients are awaiting to be processed. If a `ParaId` is
|
||||
/// present in this vector then `OutboundHrmpMessages` for it should be not empty.
|
||||
NonEmptyHrmpChannels: Vec<ParaId>;
|
||||
/// The number of HRMP messages we observed in `on_initialize` and thus used that number for
|
||||
/// announcing the weight of `on_initialize` and `on_finialize`.
|
||||
AnnouncedHrmpMessagesPerCandidate: u32;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,53 +126,158 @@ decl_module! {
|
||||
storage::unhashed::kill(well_known_keys::UPWARD_MESSAGES);
|
||||
storage::unhashed::kill(well_known_keys::HRMP_OUTBOUND_MESSAGES);
|
||||
|
||||
// Reads and writes performed by `on_finalize`. This may actually turn out to be lower,
|
||||
// but we should err on the safe side.
|
||||
// Here, in `on_initialize` we must report the weight for both `on_initialize` and
|
||||
// `on_finalize`.
|
||||
//
|
||||
// One complication here, is that the `host_configuration` is updated by an inherent and
|
||||
// those are processed after the block initialization phase. Therefore, we have to be
|
||||
// content only with the configuration as per the previous block. That means that
|
||||
// the configuration can be either stale (or be abscent altogether in case of the
|
||||
// beginning of the chain).
|
||||
//
|
||||
// In order to mitigate this, we do the following. At the time, we are only concerned
|
||||
// about `hrmp_max_message_num_per_candidate`. We reserve the amount of weight to process
|
||||
// the number of HRMP messages according to the potentially stale configuration. In
|
||||
// `on_finalize` we will process only the maximum between the announced number of messages
|
||||
// and the actual received in the fresh configuration.
|
||||
//
|
||||
// In the common case, they will be the same. In the case the actual value is smaller
|
||||
// than the announced, we would waste some of weight. In the case the actual value is
|
||||
// greater than the announced, we will miss opportunity to send a couple of messages.
|
||||
weight += T::DbWeight::get().reads_writes(1, 1);
|
||||
let hrmp_max_message_num_per_candidate =
|
||||
<cumulus_parachain_upgrade::Module<T>>::host_configuration()
|
||||
.map(|cfg| cfg.hrmp_max_message_num_per_candidate)
|
||||
.unwrap_or(0);
|
||||
AnnouncedHrmpMessagesPerCandidate::put(hrmp_max_message_num_per_candidate);
|
||||
|
||||
// NOTE that the actual weight consumed by `on_finalize` may turn out lower.
|
||||
weight += T::DbWeight::get().reads_writes(
|
||||
2 + HRMP_MSG_NUM_PER_CANDIDATE as u64,
|
||||
4 + HRMP_MSG_NUM_PER_CANDIDATE as u64,
|
||||
3 + hrmp_max_message_num_per_candidate as u64,
|
||||
4 + hrmp_max_message_num_per_candidate as u64,
|
||||
);
|
||||
|
||||
weight
|
||||
}
|
||||
|
||||
fn on_finalize() {
|
||||
let host_config = <cumulus_parachain_upgrade::Module<T>>::host_configuration()
|
||||
.expect("host configuration is promised to set until `on_finalize`; qed");
|
||||
let relevant_messaging_state = <cumulus_parachain_upgrade::Module<T>>::relevant_messaging_state()
|
||||
.expect("relevant messaging state is promised to be set until `on_finalize`; qed");
|
||||
|
||||
<Self as Store>::PendingUpwardMessages::mutate(|up| {
|
||||
let num = cmp::min(UMP_MSG_NUM_PER_CANDIDATE, up.len());
|
||||
storage::unhashed::put(
|
||||
well_known_keys::UPWARD_MESSAGES,
|
||||
&up[0..num],
|
||||
let (count, size) = relevant_messaging_state.relay_dispatch_queue_size;
|
||||
|
||||
let available_capacity = cmp::min(
|
||||
host_config.max_upward_queue_count.saturating_sub(count),
|
||||
host_config.max_upward_message_num_per_candidate,
|
||||
);
|
||||
let available_size = host_config.max_upward_queue_size.saturating_sub(size);
|
||||
|
||||
// Count the number of messages we can possibly fit in the given constraints, i.e.
|
||||
// available_capacity and available_size.
|
||||
let num = up
|
||||
.iter()
|
||||
.scan(
|
||||
(available_capacity as usize, available_size as usize),
|
||||
|(cnt, size), msg| match (cnt.checked_sub(1), size.checked_sub(msg.len())) {
|
||||
(Some(cnt), Some(size)) => Some((cnt, size)),
|
||||
_ => None,
|
||||
},
|
||||
)
|
||||
.count();
|
||||
|
||||
// TODO: #274 Return back messages that do not longer fit into the queue.
|
||||
|
||||
storage::unhashed::put(well_known_keys::UPWARD_MESSAGES, &up[0..num]);
|
||||
*up = up.split_off(num);
|
||||
});
|
||||
|
||||
// Sending HRMP messages is a little bit more involved. On top of the number of messages
|
||||
// per block limit, there is also a constraint that it's possible to send only a single
|
||||
// message to a given recipient per candidate.
|
||||
// Sending HRMP messages is a little bit more involved. There are the following
|
||||
// constraints:
|
||||
//
|
||||
// - a channel should exist (and it can be closed while a message is buffered),
|
||||
// - at most one message can be sent in a channel,
|
||||
// - the sent out messages should be ordered by ascension of recipient para id.
|
||||
// - the capacity and total size of the channel is limited,
|
||||
// - the maximum size of a message is limited (and can potentially be changed),
|
||||
|
||||
let mut non_empty_hrmp_channels = NonEmptyHrmpChannels::get();
|
||||
let outbound_hrmp_num = cmp::min(HRMP_MSG_NUM_PER_CANDIDATE, non_empty_hrmp_channels.len());
|
||||
// The number of messages we can send is limited by all of:
|
||||
// - the number of non empty channels
|
||||
// - the maximum number of messages per candidate according to the fresh config
|
||||
// - the maximum number of messages per candidate according to the stale config
|
||||
let outbound_hrmp_num =
|
||||
non_empty_hrmp_channels.len()
|
||||
.min(host_config.hrmp_max_message_num_per_candidate as usize)
|
||||
.min(AnnouncedHrmpMessagesPerCandidate::get() as usize);
|
||||
|
||||
let mut outbound_hrmp_messages = Vec::with_capacity(outbound_hrmp_num);
|
||||
let mut prune_empty = Vec::with_capacity(outbound_hrmp_num);
|
||||
|
||||
for &recipient in non_empty_hrmp_channels.iter().take(outbound_hrmp_num) {
|
||||
let (message_payload, became_empty) =
|
||||
<Self as Store>::OutboundHrmpMessages::mutate(&recipient, |v| {
|
||||
// this panics if `v` is empty. However, we are iterating only once over non-empty
|
||||
// channels, therefore it cannot panic.
|
||||
let first = v.remove(0);
|
||||
let became_empty = v.is_empty();
|
||||
(first, became_empty)
|
||||
});
|
||||
for &recipient in non_empty_hrmp_channels.iter() {
|
||||
let idx = match relevant_messaging_state
|
||||
.egress_channels
|
||||
.binary_search_by_key(&recipient, |(recipient, _)| *recipient)
|
||||
{
|
||||
Ok(m) => m,
|
||||
Err(_) => {
|
||||
// TODO: #274 This means that there is no such channel anymore. Means that we should
|
||||
// return back the messages from this channel.
|
||||
//
|
||||
// Until then pretend it became empty
|
||||
prune_empty.push(recipient);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let channel_meta = &relevant_messaging_state.egress_channels[idx].1;
|
||||
if channel_meta.msg_count + 1 > channel_meta.max_capacity {
|
||||
// The channel is at its capacity. Skip it for now.
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut pending = <Self as Store>::OutboundHrmpMessages::get(&recipient);
|
||||
|
||||
// This panics if `v` is empty. However, we are iterating only once over non-empty
|
||||
// channels, therefore it cannot panic.
|
||||
let message_payload = pending.remove(0);
|
||||
let became_empty = pending.is_empty();
|
||||
|
||||
if channel_meta.total_size + message_payload.len() as u32 > channel_meta.max_total_size {
|
||||
// Sending this message will make the channel total size overflow. Skip it for now.
|
||||
continue;
|
||||
}
|
||||
|
||||
// If we reached here, then the channel has capacity to receive this message. However,
|
||||
// it doesn't mean that we are sending it just yet.
|
||||
if became_empty {
|
||||
OutboundHrmpMessages::remove(&recipient);
|
||||
prune_empty.push(recipient);
|
||||
} else {
|
||||
OutboundHrmpMessages::insert(&recipient, pending);
|
||||
}
|
||||
|
||||
if message_payload.len() as u32 > channel_meta.max_message_size {
|
||||
// Apparently, the max message size was decreased since the message while the
|
||||
// message was buffered. While it's possible to make another iteration to fetch
|
||||
// the next message, we just keep going here to not complicate the logic too much.
|
||||
//
|
||||
// TODO: #274 Return back this message to sender.
|
||||
continue;
|
||||
}
|
||||
|
||||
outbound_hrmp_messages.push(OutboundHrmpMessage {
|
||||
recipient,
|
||||
data: message_payload,
|
||||
});
|
||||
if became_empty {
|
||||
prune_empty.push(recipient);
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the outbound messages by asceding recipient para id to satisfy the acceptance
|
||||
// criteria requirement.
|
||||
outbound_hrmp_messages.sort_by_key(|m| m.recipient);
|
||||
|
||||
// Prune hrmp channels that became empty. Additionally, because it may so happen that we
|
||||
// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
|
||||
// change the order. Otherwise, the next `on_finalize` we will again give attention
|
||||
@@ -210,20 +314,83 @@ pub enum SendHorizontalErr {
|
||||
|
||||
impl<T: Config> Module<T> {
|
||||
pub fn send_upward_message(message: UpwardMessage) -> Result<(), SendUpErr> {
|
||||
// TODO: check the message against the limit. The limit should be sourced from the
|
||||
// relay-chain configuration.
|
||||
// Check if the message fits into the relay-chain constraints.
|
||||
//
|
||||
// Note, that we are using `host_configuration` here which may be from the previous
|
||||
// block, in case this is called from `on_initialize`, i.e. before the inherent with fresh
|
||||
// data is submitted.
|
||||
//
|
||||
// That shouldn't be a problem since this is a preliminary check and the actual check would
|
||||
// be performed just before submitting the message from the candidate, and it already can
|
||||
// happen that during the time the message is buffered for sending the relay-chain setting
|
||||
// may change so that the message is no longer valid.
|
||||
//
|
||||
// However, changing this setting is expected to be rare.
|
||||
match <cumulus_parachain_upgrade::Module<T>>::host_configuration() {
|
||||
Some(cfg) => {
|
||||
if message.len() > cfg.max_upward_message_size as usize {
|
||||
return Err(SendUpErr::TooBig)
|
||||
}
|
||||
},
|
||||
None => {
|
||||
// This storage field should carry over from the previous block. So if it's None
|
||||
// then it must be that this is an edge-case where a message is attempted to be
|
||||
// sent at the first block.
|
||||
//
|
||||
// Let's pass this message through. I think it's not unreasonable to expect that the
|
||||
// message is not huge and it comes through, but if it doesn't it can be returned
|
||||
// back to the sender.
|
||||
//
|
||||
// Thus fall through here.
|
||||
}
|
||||
};
|
||||
<Self as Store>::PendingUpwardMessages::append(message);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn send_hrmp_message(message: OutboundHrmpMessage) -> Result<(), SendHorizontalErr> {
|
||||
// TODO:
|
||||
// (a) check against the size limit sourced from the relay-chain configuration
|
||||
// (b) check if the channel to the recipient is actually opened.
|
||||
|
||||
let OutboundHrmpMessage { recipient, data } = message;
|
||||
<Self as Store>::OutboundHrmpMessages::append(&recipient, data);
|
||||
|
||||
// First, check if the message is addressed into an opened channel.
|
||||
//
|
||||
// Note, that we are using `relevant_messaging_state` which may be from the previous
|
||||
// block, in case this is called from `on_initialize`, i.e. before the inherent with fresh
|
||||
// data is submitted.
|
||||
//
|
||||
// That shouldn't be a problem though because this is anticipated and already can happen.
|
||||
// This is because sending implies that a message is buffered until there is space to send
|
||||
// a message in the candidate. After a while waiting in a buffer, it may be discovered that
|
||||
// the channel to which a message were addressed is now closed. Another possibility, is that
|
||||
// the maximum message size was decreased so that a message in the bufer doesn't fit. Should
|
||||
// any of that happen the sender should be notified about the message was discarded.
|
||||
//
|
||||
// Here it a similar case, with the difference that the realization that the channel is closed
|
||||
// came the same block.
|
||||
let relevant_messaging_state = match <cumulus_parachain_upgrade::Module<T>>::relevant_messaging_state() {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
// This storage field should carry over from the previous block. So if it's None
|
||||
// then it must be that this is an edge-case where a message is attempted to be
|
||||
// sent at the first block. It should be safe to assume that there are no channels
|
||||
// opened at all so early. At least, relying on this assumption seems to be a better
|
||||
// tradeoff, compared to introducing an error variant that the clients should be
|
||||
// prepared to handle.
|
||||
return Err(SendHorizontalErr::NoChannel)
|
||||
}
|
||||
};
|
||||
let channel_meta = match relevant_messaging_state
|
||||
.egress_channels
|
||||
.binary_search_by_key(&recipient, |(recipient, _)| *recipient)
|
||||
{
|
||||
Ok(idx) => &relevant_messaging_state.egress_channels[idx].1,
|
||||
Err(_) => return Err(SendHorizontalErr::NoChannel),
|
||||
};
|
||||
if data.len() as u32 > channel_meta.max_message_size {
|
||||
return Err(SendHorizontalErr::TooBig)
|
||||
}
|
||||
|
||||
// And then at last update the storage.
|
||||
<Self as Store>::OutboundHrmpMessages::append(&recipient, data);
|
||||
<Self as Store>::NonEmptyHrmpChannels::mutate(|v| {
|
||||
if !v.contains(&recipient) {
|
||||
v.push(recipient);
|
||||
|
||||
Reference in New Issue
Block a user