Downward Message Processing implementation (#1859)

* DMP: data structures and plumbing

* DMP: Implement DMP logic in the router module

DMP: Integrate DMP parts into the inclusion module

* DMP: Introduce the max size limit for the size of a downward message

* DMP: Runtime API for accessing inbound messages

* OCD

Small clean ups

* DMP: fix the naming of the error

* DMP: add caution about a non-existent recipient
This commit is contained in:
Sergei Shulepov
2020-10-28 11:41:42 +01:00
committed by GitHub
parent 1a25c41277
commit 9903bca259
26 changed files with 556 additions and 36 deletions
@@ -58,6 +58,13 @@ pub struct HostConfiguration<BlockNumber> {
pub thread_availability_period: BlockNumber,
/// The amount of blocks ahead to schedule parachains and parathreads.
pub scheduling_lookahead: u32,
/// The maximum size of a message that can be put in a downward message queue.
///
/// Since we require receiving at least one DMP message the obvious upper bound of the size is
/// the PoV size. Of course, there is a lot of other different things that a parachain may
/// decide to do with its PoV so this value in practice will be picked as a fraction of the PoV
/// size.
pub max_downward_message_size: u32,
}
pub trait Trait: frame_system::Trait { }
@@ -190,6 +197,16 @@ decl_module! {
});
Ok(())
}
/// Set the critical downward message size.
#[weight = (1_000, DispatchClass::Operational)]
pub fn set_max_downward_message_size(origin, new: u32) -> DispatchResult {
ensure_root(origin)?;
Self::update_config_member(|config| {
sp_std::mem::replace(&mut config.max_downward_message_size, new) != new
});
Ok(())
}
}
}
@@ -268,6 +285,7 @@ mod tests {
chain_availability_period: 10,
thread_availability_period: 8,
scheduling_lookahead: 3,
max_downward_message_size: 2048,
};
assert!(<Configuration as Store>::PendingConfig::get().is_none());
@@ -305,6 +323,9 @@ mod tests {
Configuration::set_scheduling_lookahead(
Origin::root(), new_config.scheduling_lookahead,
).unwrap();
Configuration::set_max_downward_message_size(
Origin::root(), new_config.max_downward_message_size,
).unwrap();
assert_eq!(<Configuration as Store>::PendingConfig::get(), Some(new_config));
})
+21 -3
View File
@@ -36,7 +36,7 @@ use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
use sp_staking::SessionIndex;
use sp_runtime::{DispatchError, traits::{One, Saturating}};
use crate::{configuration, paras, scheduler::CoreAssignment};
use crate::{configuration, paras, router, scheduler::CoreAssignment};
/// A bitfield signed by a validator indicating that it is keeping its piece of the erasure-coding
/// for any backed candidates referred to by a `1` bit available.
@@ -86,7 +86,7 @@ impl<H, N> CandidatePendingAvailability<H, N> {
}
pub trait Trait:
frame_system::Trait + paras::Trait + configuration::Trait
frame_system::Trait + paras::Trait + router::Trait + configuration::Trait
{
type Event: From<Event<Self>> + Into<<Self as frame_system::Trait>::Event>;
}
@@ -153,6 +153,8 @@ decl_error! {
ValidationDataHashMismatch,
/// Internal error only returned when compiled with debug assertions.
InternalError,
/// The downward message queue is not processed correctly.
IncorrectDownwardMessageHandling,
}
}
@@ -409,6 +411,7 @@ impl<T: Trait> Module<T> {
para_id,
&candidate.candidate.commitments.head_data,
&candidate.candidate.commitments.new_validation_code,
candidate.candidate.commitments.processed_downward_messages,
)?;
for (i, assignment) in scheduled[skip..].iter().enumerate() {
@@ -540,6 +543,7 @@ impl<T: Trait> Module<T> {
para_id,
&validation_outputs.head_data,
&validation_outputs.new_validation_code,
validation_outputs.processed_downward_messages,
)
}
@@ -561,6 +565,12 @@ impl<T: Trait> Module<T> {
);
}
// enact the messaging facet of the candidate.
weight += <router::Module<T>>::prune_dmq(
receipt.descriptor.para_id,
commitments.processed_downward_messages,
);
Self::deposit_event(
Event::<T>::CandidateIncluded(plain, commitments.head_data.clone())
);
@@ -682,6 +692,7 @@ impl<T: Trait> CandidateCheckContext<T> {
para_id: ParaId,
head_data: &HeadData,
new_validation_code: &Option<primitives::v1::ValidationCode>,
processed_downward_messages: u32,
) -> Result<(), DispatchError> {
ensure!(
head_data.0.len() <= self.config.max_head_data_size as _,
@@ -703,7 +714,14 @@ impl<T: Trait> CandidateCheckContext<T> {
);
}
// TODO: messaging acceptance criteria rules will go here.
// check if the candidate passes the messaging acceptance criteria
ensure!(
<router::Module<T>>::check_processed_downward_messages(
para_id,
processed_downward_messages,
),
Error::<T>::IncorrectDownwardMessageHandling,
);
Ok(())
}
-2
View File
@@ -128,8 +128,6 @@ pub type Configuration = crate::configuration::Module<Test>;
pub type Paras = crate::paras::Module<Test>;
/// Mocked router.
// TODO: Will be used in the follow ups.
#[allow(dead_code)]
pub type Router = crate::router::Module<Test>;
/// Mocked scheduler.
+61 -4
View File
@@ -20,10 +20,17 @@
//! routing the messages at their destinations and informing the parachains about the incoming
//! messages.
use crate::{configuration, initializer};
use crate::{
configuration,
initializer,
};
use sp_std::prelude::*;
use frame_support::{decl_error, decl_module, decl_storage, weights::Weight};
use primitives::v1::{Id as ParaId};
use primitives::v1::{Id as ParaId, InboundDownwardMessage, Hash};
mod dmp;
pub use dmp::QueueDownwardMessageError;
pub trait Trait: frame_system::Trait + configuration::Trait {}
@@ -32,6 +39,23 @@ decl_storage! {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/*
* Downward Message Passing (DMP)
*
* Storage layout required for implementation of DMP.
*/
/// The downward messages addressed for a certain para.
DownwardMessageQueues: map hasher(twox_64_concat) ParaId => Vec<InboundDownwardMessage<T::BlockNumber>>;
/// A mapping that stores the downward message queue MQC head for each para.
///
/// Each link in this chain has a form:
/// `(prev_head, B, H(M))`, where
/// - `prev_head`: is the previous head hash or zero if none.
/// - `B`: is the relay-chain block number in which a message was appended.
/// - `H(M)`: is the hash of the message being appended.
DownwardMessageQueueHeads: map hasher(twox_64_concat) ParaId => Hash;
}
}
@@ -60,8 +84,8 @@ impl<T: Trait> Module<T> {
_notification: &initializer::SessionChangeNotification<T::BlockNumber>,
) {
let outgoing = OutgoingParas::take();
for _outgoing_para in outgoing {
for outgoing_para in outgoing {
Self::clean_dmp_after_outgoing(outgoing_para);
}
}
@@ -77,4 +101,37 @@ impl<T: Trait> Module<T> {
#[cfg(test)]
mod tests {
use super::*;
use primitives::v1::BlockNumber;
use frame_support::traits::{OnFinalize, OnInitialize};
use crate::mock::{System, Router, GenesisConfig as MockGenesisConfig};
pub(crate) fn run_to_block(to: BlockNumber, new_session: Option<Vec<BlockNumber>>) {
while System::block_number() < to {
let b = System::block_number();
Router::initializer_finalize();
System::on_finalize(b);
System::on_initialize(b + 1);
System::set_block_number(b + 1);
if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) {
Router::initializer_on_new_session(&Default::default());
}
Router::initializer_initialize(b + 1);
}
}
pub(crate) fn default_genesis_config() -> MockGenesisConfig {
MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: crate::configuration::HostConfiguration {
max_downward_message_size: 1024,
..Default::default()
},
},
..Default::default()
}
}
}
@@ -0,0 +1,272 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::{Trait, Module, Store};
use crate::configuration::HostConfiguration;
use frame_support::{StorageMap, weights::Weight, traits::Get};
use sp_std::prelude::*;
use sp_runtime::traits::{BlakeTwo256, Hash as HashT, SaturatedConversion};
use primitives::v1::{Id as ParaId, DownwardMessage, InboundDownwardMessage, Hash};
/// An error sending a downward message.
#[cfg_attr(test, derive(Debug))]
pub enum QueueDownwardMessageError {
/// The message being sent exceeds the configured max message size.
ExceedsMaxMessageSize,
}
/// Routines and getters related to downward message passing.
impl<T: Trait> Module<T> {
pub(crate) fn clean_dmp_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::DownwardMessageQueues::remove(&outgoing_para);
<Self as Store>::DownwardMessageQueueHeads::remove(&outgoing_para);
}
/// Enqueue a downward message to a specific recipient para.
///
/// When encoded, the message should not exceed the `config.max_downward_message_size`.
/// Otherwise, the message won't be sent and `Err` will be returned.
///
/// It is possible to send a downward message to a non-existent para. That, however, would lead
/// to a dangling storage. If the caller cannot statically prove that the recipient exists
/// then the caller should perform a runtime check.
pub fn queue_downward_message(
config: &HostConfiguration<T::BlockNumber>,
para: ParaId,
msg: DownwardMessage,
) -> Result<(), QueueDownwardMessageError> {
let serialized_len = msg.len() as u32;
if serialized_len > config.max_downward_message_size {
return Err(QueueDownwardMessageError::ExceedsMaxMessageSize);
}
let inbound = InboundDownwardMessage {
msg,
sent_at: <frame_system::Module<T>>::block_number(),
};
// obtain the new link in the MQC and update the head.
<Self as Store>::DownwardMessageQueueHeads::mutate(para, |head| {
let new_head =
BlakeTwo256::hash_of(&(*head, inbound.sent_at, T::Hashing::hash_of(&inbound.msg)));
*head = new_head;
});
<Self as Store>::DownwardMessageQueues::mutate(para, |v| {
v.push(inbound);
});
Ok(())
}
/// Checks if the number of processed downward messages is valid, i.e.:
///
/// - if there are pending messages then `processed_downward_messages` should be at least 1,
/// - `processed_downward_messages` should not be greater than the number of pending messages.
///
/// Returns true if all checks have been passed.
pub(crate) fn check_processed_downward_messages(
para: ParaId,
processed_downward_messages: u32,
) -> bool {
let dmq_length = Self::dmq_length(para);
if dmq_length > 0 && processed_downward_messages == 0 {
return false;
}
if dmq_length < processed_downward_messages {
return false;
}
true
}
/// Prunes the specified number of messages from the downward message queue of the given para.
pub(crate) fn prune_dmq(para: ParaId, processed_downward_messages: u32) -> Weight {
<Self as Store>::DownwardMessageQueues::mutate(para, |q| {
let processed_downward_messages = processed_downward_messages as usize;
if processed_downward_messages > q.len() {
// reaching this branch is unexpected due to the constraint established by
// `check_processed_downward_messages`. But better be safe than sorry.
q.clear();
} else {
*q = q.split_off(processed_downward_messages);
}
});
T::DbWeight::get().reads_writes(1, 1)
}
/// Returns the Head of Message Queue Chain for the given para or `None` if there is none
/// associated with it.
pub(crate) fn dmq_mqc_head(para: ParaId) -> Hash {
<Self as Store>::DownwardMessageQueueHeads::get(&para)
}
/// Returns the number of pending downward messages addressed to the given para.
///
/// Returns 0 if the para doesn't have an associated downward message queue.
pub(crate) fn dmq_length(para: ParaId) -> u32 {
<Self as Store>::DownwardMessageQueues::decode_len(&para)
.unwrap_or(0)
.saturated_into::<u32>()
}
/// Returns the downward message queue contents for the given para.
///
/// The most recent messages are the latest in the vector.
pub(crate) fn dmq_contents(recipient: ParaId) -> Vec<InboundDownwardMessage<T::BlockNumber>> {
<Self as Store>::DownwardMessageQueues::get(&recipient)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::{Configuration, Router, new_test_ext};
use crate::router::{
OutgoingParas,
tests::{default_genesis_config, run_to_block},
};
use frame_support::StorageValue;
use codec::Encode;
fn queue_downward_message(
para_id: ParaId,
msg: DownwardMessage,
) -> Result<(), QueueDownwardMessageError> {
Router::queue_downward_message(&Configuration::config(), para_id, msg)
}
#[test]
fn scheduled_cleanup_performed() {
let a = ParaId::from(1312);
let b = ParaId::from(228);
let c = ParaId::from(123);
new_test_ext(default_genesis_config()).execute_with(|| {
run_to_block(1, None);
// enqueue downward messages to A, B and C.
queue_downward_message(a, vec![1, 2, 3]).unwrap();
queue_downward_message(b, vec![4, 5, 6]).unwrap();
queue_downward_message(c, vec![7, 8, 9]).unwrap();
Router::schedule_para_cleanup(a);
// run to block without session change.
run_to_block(2, None);
assert!(!<Router as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(!<Router as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Router as Store>::DownwardMessageQueues::get(&c).is_empty());
Router::schedule_para_cleanup(b);
// run to block changing the session.
run_to_block(3, Some(vec![3]));
assert!(<Router as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(<Router as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Router as Store>::DownwardMessageQueues::get(&c).is_empty());
// verify that the outgoing paras are emptied.
assert!(OutgoingParas::get().is_empty())
});
}
#[test]
fn dmq_length_and_head_updated_properly() {
let a = ParaId::from(1312);
let b = ParaId::from(228);
new_test_ext(default_genesis_config()).execute_with(|| {
assert_eq!(Router::dmq_length(a), 0);
assert_eq!(Router::dmq_length(b), 0);
queue_downward_message(a, vec![1, 2, 3]).unwrap();
assert_eq!(Router::dmq_length(a), 1);
assert_eq!(Router::dmq_length(b), 0);
assert!(!Router::dmq_mqc_head(a).is_zero());
assert!(Router::dmq_mqc_head(b).is_zero());
});
}
#[test]
fn check_processed_downward_messages() {
let a = ParaId::from(1312);
new_test_ext(default_genesis_config()).execute_with(|| {
// processed_downward_messages=0 is allowed when the DMQ is empty.
assert!(Router::check_processed_downward_messages(a, 0));
queue_downward_message(a, vec![1, 2, 3]).unwrap();
queue_downward_message(a, vec![4, 5, 6]).unwrap();
queue_downward_message(a, vec![7, 8, 9]).unwrap();
// 0 doesn't pass if the DMQ has msgs.
assert!(!Router::check_processed_downward_messages(a, 0));
// a candidate can consume up to 3 messages
assert!(Router::check_processed_downward_messages(a, 1));
assert!(Router::check_processed_downward_messages(a, 2));
assert!(Router::check_processed_downward_messages(a, 3));
// there is no 4 messages in the queue
assert!(!Router::check_processed_downward_messages(a, 4));
});
}
#[test]
fn dmq_pruning() {
let a = ParaId::from(1312);
new_test_ext(default_genesis_config()).execute_with(|| {
assert_eq!(Router::dmq_length(a), 0);
queue_downward_message(a, vec![1, 2, 3]).unwrap();
queue_downward_message(a, vec![4, 5, 6]).unwrap();
queue_downward_message(a, vec![7, 8, 9]).unwrap();
assert_eq!(Router::dmq_length(a), 3);
// pruning 0 elements shouldn't change anything.
Router::prune_dmq(a, 0);
assert_eq!(Router::dmq_length(a), 3);
Router::prune_dmq(a, 2);
assert_eq!(Router::dmq_length(a), 1);
});
}
#[test]
fn queue_downward_message_critical() {
let a = ParaId::from(1312);
let mut genesis = default_genesis_config();
genesis.configuration.config.max_downward_message_size = 7;
new_test_ext(genesis).execute_with(|| {
let smol = [0; 3].to_vec();
let big = [0; 8].to_vec();
// still within limits
assert_eq!(smol.encode().len(), 4);
assert!(queue_downward_message(a, smol).is_ok());
// that's too big
assert_eq!(big.encode().len(), 9);
assert!(queue_downward_message(a, big).is_err());
});
}
}
@@ -23,10 +23,11 @@ use primitives::v1::{
Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode,
CommittedCandidateReceipt, ScheduledCore, OccupiedCore, CoreOccupied, CoreIndex,
GroupIndex, CandidateEvent, PersistedValidationData, AuthorityDiscoveryId,
InboundDownwardMessage,
};
use sp_runtime::traits::Zero;
use frame_support::debug;
use crate::{initializer, inclusion, scheduler, configuration, paras};
use crate::{initializer, inclusion, scheduler, configuration, paras, router};
/// Implementation for the `validators` function of the runtime API.
pub fn validators<T: initializer::Trait>() -> Vec<ValidatorId> {
@@ -299,3 +300,10 @@ where
validator_index.and_then(|i| authorities.get(i).cloned())
}).collect()
}
/// Implementation for the `dmq_contents` function of the runtime API.
pub fn dmq_contents<T: router::Trait>(
recipient: ParaId,
) -> Vec<InboundDownwardMessage<T::BlockNumber>> {
<router::Module<T>>::dmq_contents(recipient)
}
+5 -3
View File
@@ -21,12 +21,12 @@ use sp_runtime::traits::{One, Saturating};
use primitives::v1::{Id as ParaId, PersistedValidationData, TransientValidationData};
use sp_std::prelude::*;
use crate::{configuration, paras};
use crate::{configuration, paras, router};
/// Make the persisted validation data for a particular parachain.
///
/// This ties together the storage of several modules.
pub fn make_persisted_validation_data<T: paras::Trait>(
pub fn make_persisted_validation_data<T: paras::Trait + router::Trait>(
para_id: ParaId,
) -> Option<PersistedValidationData<T::BlockNumber>> {
let relay_parent_number = <frame_system::Module<T>>::block_number() - One::one();
@@ -35,13 +35,14 @@ pub fn make_persisted_validation_data<T: paras::Trait>(
parent_head: <paras::Module<T>>::para_head(&para_id)?,
block_number: relay_parent_number,
hrmp_mqc_heads: Vec::new(),
dmq_mqc_head: <router::Module<T>>::dmq_mqc_head(para_id),
})
}
/// Make the transient validation data for a particular parachain.
///
/// This ties together the storage of several modules.
pub fn make_transient_validation_data<T: paras::Trait>(
pub fn make_transient_validation_data<T: paras::Trait + router::Trait>(
para_id: ParaId,
) -> Option<TransientValidationData<T::BlockNumber>> {
let config = <configuration::Module<T>>::config();
@@ -67,5 +68,6 @@ pub fn make_transient_validation_data<T: paras::Trait>(
max_head_data_size: config.max_head_data_size,
balance: 0,
code_upgrade_allowed,
dmq_length: <router::Module<T>>::dmq_length(para_id),
})
}