DMP Queue pallet (#416)

* Introduce the converter into the hub

* Parachain recognises Rococo governance body as admin

* Whitespace

* Use UsingComponents for fee payment in XCM

* Fixes

* Fixes for XCM permissions

* Remove encode_call test

* Fixes

* Rococo Collator supports Shell runtime

* Fixes

* Fixes

* Initial draft of DMP Queue pallet

* DMP Queue builds.

* Companion for Polkadot gav-allow-xcm-exec

* Bump

* Fix std

* Fixes

* fix and improve docs

* fix compile errors in tests

* add test for try_service_message

* update cargo.lock

* Fixes

* Make test name read well

* Fixes

* Add a couple of simple tests

* Tests

* Tests

* Update pallets/dmp-queue/src/lib.rs

Co-authored-by: Alexander Popiak <alexander.popiak@parity.io>

* Update pallets/dmp-queue/src/lib.rs

Co-authored-by: Alexander Popiak <alexander.popiak@parity.io>

* Update pallets/dmp-queue/src/lib.rs

Co-authored-by: Alexander Popiak <alexander.popiak@parity.io>

* Update pallets/dmp-queue/src/lib.rs

Co-authored-by: Alexander Popiak <alexander.popiak@parity.io>

* Update pallets/dmp-queue/src/lib.rs

Co-authored-by: Alexander Popiak <alexander.popiak@parity.io>

* Update pallets/dmp-queue/src/lib.rs

Co-authored-by: Alexander Popiak <alexander.popiak@parity.io>

* Update pallets/dmp-queue/src/lib.rs

Co-authored-by: Alexander Popiak <alexander.popiak@parity.io>

* Chain ID and ParaID don't collide

* Fixes

* Update pallets/dmp-queue/src/lib.rs

Co-authored-by: Shawn Tabrizi <shawntabrizi@gmail.com>

* Update pallets/dmp-queue/src/lib.rs

Co-authored-by: Shawn Tabrizi <shawntabrizi@gmail.com>

* Fixes

Co-authored-by: Alexander Popiak <alexander.popiak@parity.io>
Co-authored-by: Shawn Tabrizi <shawntabrizi@gmail.com>
This commit is contained in:
Gavin Wood
2021-05-02 16:11:58 +02:00
committed by GitHub
parent 67102885dd
commit dd5ad841a0
14 changed files with 1279 additions and 369 deletions
+54 -50
View File
@@ -42,7 +42,7 @@ use polkadot_parachain::primitives::RelayChainBlockNumber;
use cumulus_primitives_core::{
relay_chain,
well_known_keys::{self, NEW_VALIDATION_CODE},
AbridgedHostConfiguration, DownwardMessageHandler, XcmpMessageHandler,
AbridgedHostConfiguration, DmpMessageHandler, XcmpMessageHandler,
InboundDownwardMessage, InboundHrmpMessage, OnValidationData, OutboundHrmpMessage, ParaId,
PersistedValidationData, UpwardMessage, UpwardMessageSender, MessageSendError,
XcmpMessageSource, ChannelStatus, GetChannelInfo,
@@ -70,13 +70,16 @@ pub trait Config: frame_system::Config<OnSetCode = ParachainSetCode<Self>> {
/// Returns the parachain ID we are running with.
type SelfParaId: Get<ParaId>;
/// The downward message handlers that will be informed when a message is received.
type DownwardMessageHandlers: DownwardMessageHandler;
/// The place where outbound XCMP messages come from. This is queried in `finalize_block`.
type OutboundXcmpMessageSource: XcmpMessageSource;
/// The HRMP message handlers that will be informed when a message is received.
/// The message handler that will be invoked when messages are received via DMP.
type DmpMessageHandler: DmpMessageHandler;
/// The weight we reserve at the beginning of the block for processing DMP messages.
type ReservedDmpWeight: Get<Weight>;
/// The message handler that will be invoked when messages are received via XCMP.
///
/// The messages are dispatched in the order they were relayed by the relay chain. If multiple
/// messages were relayed at one block, these will be dispatched in ascending order of the
@@ -147,6 +150,10 @@ decl_storage! {
/// overrides the amount set in the Config trait.
ReservedXcmpWeightOverride: Option<Weight>;
/// The weight we reserve at the beginning of the block for processing DMP messages. This
/// overrides the amount set in the Config trait.
ReservedDmpWeightOverride: Option<Weight>;
/// The next authorized upgrade, if there is one.
AuthorizedUpgrade: Option<T::Hash>;
}
@@ -353,7 +360,7 @@ decl_module! {
storage::unhashed::put(well_known_keys::HRMP_OUTBOUND_MESSAGES, &outbound_messages);
}
fn on_initialize(n: T::BlockNumber) -> Weight {
fn on_initialize(_n: T::BlockNumber) -> Weight {
// To prevent removing `NEW_VALIDATION_CODE` that was set by another `on_initialize` like
// for example from scheduler, we only kill the storage entry if it was not yet updated
// in the current block.
@@ -430,6 +437,9 @@ impl<T: Config> sp_runtime::traits::ValidateUnsigned for Module<T> {
})
}
}
if let Call::set_validation_data(..) = call {
return Ok(Default::default())
}
Err(InvalidTransaction::Call.into())
}
}
@@ -520,41 +530,28 @@ impl<T: Config> Module<T> {
downward_messages: Vec<InboundDownwardMessage>,
) -> Weight {
let dm_count = downward_messages.len() as u32;
let mut dmq_head = LastDmqMqcHead::get();
let mut weight_used = 0;
if dm_count != 0 {
let mut processed_count = 0;
Self::deposit_event(RawEvent::DownwardMessagesReceived(dm_count));
let max_weight = ReservedDmpWeightOverride::get().unwrap_or_else(T::ReservedDmpWeight::get);
// Reference fu to avoid the `move` capture.
let weight_used_ref = &mut weight_used;
let processed_count_ref = &mut processed_count;
let result_mqc_head = LastDmqMqcHead::mutate(move |mqc| {
for downward_message in downward_messages {
mqc.extend_downward(&downward_message);
*weight_used_ref += T::DownwardMessageHandlers::handle_downward_message(downward_message);
*processed_count_ref += 1;
}
mqc.0
});
let message_iter = downward_messages.into_iter()
.inspect(|m| { dmq_head.extend_downward(m); })
.map(|m| (m.sent_at, m.msg));
weight_used += T::DmpMessageHandler::handle_dmp_messages(message_iter, max_weight);
LastDmqMqcHead::put(&dmq_head);
Self::deposit_event(RawEvent::DownwardMessagesProcessed(
processed_count,
weight_used,
result_mqc_head.clone(),
expected_dmq_mqc_head.clone(),
));
Self::deposit_event(RawEvent::DownwardMessagesProcessed(weight_used, dmq_head.0));
};
// After hashing each message in the message queue chain submitted by the collator, we should
// arrive to the MQC head provided by the relay chain.
//
// A mismatch means that at least some of the submitted messages were altered, omitted or added
// improperly.
assert_eq!(result_mqc_head, expected_dmq_mqc_head);
} else {
assert_eq!(LastDmqMqcHead::get().0, expected_dmq_mqc_head);
}
// After hashing each message in the message queue chain submitted by the collator, we should
// arrive to the MQC head provided by the relay chain.
//
// A mismatch means that at least some of the submitted messages were altered, omitted or added
// improperly.
assert_eq!(dmq_head.0, expected_dmq_mqc_head);
// Store the processed_downward_messages here so that it will be accessible from
// PVF's `validate_block` wrapper and collation pipeline.
@@ -854,12 +851,12 @@ decl_event! {
ValidationFunctionApplied(RelayChainBlockNumber),
/// An upgrade has been authorized.
UpgradeAuthorized(Hash),
/// Downward messages were processed using the given weight.
/// \[ count, weight_used, result_mqc_head, expected_mqc_head \]
DownwardMessagesProcessed(u32, Weight, relay_chain::Hash, relay_chain::Hash),
/// Some downward messages have been received and will be processed.
/// \[ count \]
DownwardMessagesReceived(u32),
/// Downward messages were processed using the given weight.
/// \[ weight_used, result_mqc_head \]
DownwardMessagesProcessed(Weight, relay_chain::Hash),
}
}
@@ -938,6 +935,7 @@ mod tests {
};
pub const ParachainId: ParaId = ParaId::new(200);
pub const ReservedXcmpWeight: Weight = 0;
pub const ReservedDmpWeight: Weight = 0;
}
impl frame_system::Config for Test {
type Origin = Origin;
@@ -968,9 +966,10 @@ mod tests {
type Event = Event;
type OnValidationData = ();
type SelfParaId = ParachainId;
type DownwardMessageHandlers = SaveIntoThreadLocal;
type XcmpMessageHandler = SaveIntoThreadLocal;
type OutboundXcmpMessageSource = FromThreadLocal;
type DmpMessageHandler = SaveIntoThreadLocal;
type ReservedDmpWeight = ReservedDmpWeight;
type XcmpMessageHandler = SaveIntoThreadLocal;
type ReservedXcmpWeight = ReservedXcmpWeight;
}
@@ -978,7 +977,7 @@ mod tests {
pub struct SaveIntoThreadLocal;
std::thread_local! {
static HANDLED_DOWNWARD_MESSAGES: RefCell<Vec<InboundDownwardMessage>> = RefCell::new(Vec::new());
static HANDLED_DMP_MESSAGES: RefCell<Vec<(relay_chain::BlockNumber, Vec<u8>)>> = RefCell::new(Vec::new());
static HANDLED_XCMP_MESSAGES: RefCell<Vec<(ParaId, relay_chain::BlockNumber, Vec<u8>)>> = RefCell::new(Vec::new());
static SENT_MESSAGES: RefCell<Vec<(ParaId, Vec<u8>)>> = RefCell::new(Vec::new());
}
@@ -1013,12 +1012,17 @@ mod tests {
}
}
impl DownwardMessageHandler for SaveIntoThreadLocal {
fn handle_downward_message(msg: InboundDownwardMessage) -> Weight {
HANDLED_DOWNWARD_MESSAGES.with(|m| {
m.borrow_mut().push(msg);
});
0
impl DmpMessageHandler for SaveIntoThreadLocal {
fn handle_dmp_messages(
iter: impl Iterator<Item=(RelayBlockNumber, Vec<u8>)>,
_max_weight: Weight,
) -> Weight {
HANDLED_DMP_MESSAGES.with(|m| {
for i in iter {
m.borrow_mut().push(i);
}
0
})
}
}
@@ -1039,7 +1043,7 @@ mod tests {
// This function basically just builds a genesis storage key/value store according to
// our desired mockup.
fn new_test_ext() -> sp_io::TestExternalities {
HANDLED_DOWNWARD_MESSAGES.with(|m| m.borrow_mut().clear());
HANDLED_DMP_MESSAGES.with(|m| m.borrow_mut().clear());
HANDLED_XCMP_MESSAGES.with(|m| m.borrow_mut().clear());
frame_system::GenesisConfig::default()
@@ -1606,9 +1610,9 @@ mod tests {
_ => unreachable!(),
})
.add(1, || {
HANDLED_DOWNWARD_MESSAGES.with(|m| {
HANDLED_DMP_MESSAGES.with(|m| {
let mut m = m.borrow_mut();
assert_eq!(&*m, &[MSG.clone()]);
assert_eq!(&*m, &[(MSG.sent_at, MSG.msg.clone())]);
m.clear();
});
});
@@ -20,7 +20,7 @@ use frame_support::traits::ExecuteBlock;
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT, NumberFor};
use sp_io::KillChildStorageResult;
use sp_std::{boxed::Box, vec::Vec};
use sp_std::prelude::*;
use hash_db::{HashDB, EMPTY_PREFIX};
@@ -35,13 +35,10 @@ use cumulus_primitives_core::{
HRMP_OUTBOUND_MESSAGES, HRMP_WATERMARK, NEW_VALIDATION_CODE, PROCESSED_DOWNWARD_MESSAGES,
UPWARD_MESSAGES,
},
OutboundHrmpMessage, PersistedValidationData, UpwardMessage,
OutboundHrmpMessage, UpwardMessage,
};
use sp_core::storage::{ChildInfo, TrackedStorageKey};
use sp_externalities::{
set_and_run_with_externalities, Error, Extension, ExtensionStore, Externalities,
};
use sp_std::any::{Any, TypeId};
use sp_core::storage::ChildInfo;
use sp_externalities::{set_and_run_with_externalities, Externalities};
use sp_trie::MemoryDB;
type Ext<'a, B> = sp_state_machine::Ext<