Breakdown the Router module on Dmp, Ump, Hrmp modules (#1939)

* Guide: Split router module in guide.

Now we have: DMP, UMP and Router module.

* Add a glossary entry for what used to be called Router

* Extract DMP

* Extract UMP

* Extract HRMP

* Switch over to new modules

* Router: goodbye sweet prince

* Link to messaging overview for details.

* Update missed rococo and test runtimes.

* Commit destroyed by rebase changes

* Don't deprecate Router but rather make it a meta-project

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* Fix typos suggestion

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* Fix repetition in the impl guide

* Clarify that processed_downward_messages has the u32 type

* Remove the router subdir.

* Deabbreviate DMP,UMP,HRMP

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
This commit is contained in:
Sergei Shulepov
2020-11-16 15:02:01 +01:00
committed by GitHub
parent 0ab81c907f
commit c69e5766db
24 changed files with 853 additions and 689 deletions
@@ -14,9 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::{Trait, Module, Store};
use crate::configuration::HostConfiguration;
use frame_support::{StorageMap, weights::Weight, traits::Get};
use crate::{
configuration::{self, HostConfiguration},
initializer,
};
use frame_support::{decl_module, decl_storage, StorageMap, weights::Weight, traits::Get};
use sp_std::{fmt, prelude::*};
use sp_runtime::traits::{BlakeTwo256, Hash as HashT, SaturatedConversion};
use primitives::v1::{Id as ParaId, DownwardMessage, InboundDownwardMessage, Hash};
@@ -60,13 +62,72 @@ impl fmt::Debug for ProcessedDownwardMessagesAcceptanceErr {
}
}
pub trait Trait: frame_system::Trait + configuration::Trait {}
decl_storage! {
trait Store for Module<T: Trait> as Dmp {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/// The downward messages addressed for a certain para.
DownwardMessageQueues: map hasher(twox_64_concat) ParaId => Vec<InboundDownwardMessage<T::BlockNumber>>;
/// A mapping that stores the downward message queue MQC head for each para.
///
/// Each link in this chain has a form:
/// `(prev_head, B, H(M))`, where
/// - `prev_head`: is the previous head hash or zero if none.
/// - `B`: is the relay-chain block number in which a message was appended.
/// - `H(M)`: is the hash of the message being appended.
DownwardMessageQueueHeads: map hasher(twox_64_concat) ParaId => Hash;
}
}
decl_module! {
/// The DMP module.
pub struct Module<T: Trait> for enum Call where origin: <T as frame_system::Trait>::Origin { }
}
/// Routines and getters related to downward message passing.
impl<T: Trait> Module<T> {
pub(crate) fn clean_dmp_after_outgoing(outgoing_para: ParaId) {
/// Block initialization logic, called by initializer.
pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight {
0
}
/// Block finalization logic, called by initializer.
pub(crate) fn initializer_finalize() {}
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
_notification: &initializer::SessionChangeNotification<T::BlockNumber>,
) {
Self::perform_outgoing_para_cleanup();
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
for outgoing_para in outgoing {
Self::clean_dmp_after_outgoing(outgoing_para);
}
}
fn clean_dmp_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::DownwardMessageQueues::remove(&outgoing_para);
<Self as Store>::DownwardMessageQueueHeads::remove(&outgoing_para);
}
/// Schedule a para to be cleaned up at the start of the next session.
pub(crate) fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
}
/// Enqueue a downward message to a specific recipient para.
///
/// When encoded, the message should not exceed the `config.max_downward_message_size`.
@@ -165,19 +226,45 @@ impl<T: Trait> Module<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::{Configuration, Router, new_test_ext};
use crate::router::{
OutgoingParas,
tests::{default_genesis_config, run_to_block},
};
use primitives::v1::BlockNumber;
use frame_support::StorageValue;
use frame_support::traits::{OnFinalize, OnInitialize};
use codec::Encode;
use crate::mock::{Configuration, new_test_ext, System, Dmp, GenesisConfig as MockGenesisConfig};
pub(crate) fn run_to_block(to: BlockNumber, new_session: Option<Vec<BlockNumber>>) {
while System::block_number() < to {
let b = System::block_number();
Dmp::initializer_finalize();
System::on_finalize(b);
System::on_initialize(b + 1);
System::set_block_number(b + 1);
if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) {
Dmp::initializer_on_new_session(&Default::default());
}
Dmp::initializer_initialize(b + 1);
}
}
fn default_genesis_config() -> MockGenesisConfig {
MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: crate::configuration::HostConfiguration {
max_downward_message_size: 1024,
..Default::default()
},
},
..Default::default()
}
}
fn queue_downward_message(
para_id: ParaId,
msg: DownwardMessage,
) -> Result<(), QueueDownwardMessageError> {
Router::queue_downward_message(&Configuration::config(), para_id, msg)
Dmp::queue_downward_message(&Configuration::config(), para_id, msg)
}
#[test]
@@ -194,23 +281,23 @@ mod tests {
queue_downward_message(b, vec![4, 5, 6]).unwrap();
queue_downward_message(c, vec![7, 8, 9]).unwrap();
Router::schedule_para_cleanup(a);
Dmp::schedule_para_cleanup(a);
// run to block without session change.
run_to_block(2, None);
assert!(!<Router as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(!<Router as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Router as Store>::DownwardMessageQueues::get(&c).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&c).is_empty());
Router::schedule_para_cleanup(b);
Dmp::schedule_para_cleanup(b);
// run to block changing the session.
run_to_block(3, Some(vec![3]));
assert!(<Router as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(<Router as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Router as Store>::DownwardMessageQueues::get(&c).is_empty());
assert!(<Dmp as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(<Dmp as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&c).is_empty());
// verify that the outgoing paras are emptied.
assert!(OutgoingParas::get().is_empty())
@@ -223,15 +310,15 @@ mod tests {
let b = ParaId::from(228);
new_test_ext(default_genesis_config()).execute_with(|| {
assert_eq!(Router::dmq_length(a), 0);
assert_eq!(Router::dmq_length(b), 0);
assert_eq!(Dmp::dmq_length(a), 0);
assert_eq!(Dmp::dmq_length(b), 0);
queue_downward_message(a, vec![1, 2, 3]).unwrap();
assert_eq!(Router::dmq_length(a), 1);
assert_eq!(Router::dmq_length(b), 0);
assert!(!Router::dmq_mqc_head(a).is_zero());
assert!(Router::dmq_mqc_head(b).is_zero());
assert_eq!(Dmp::dmq_length(a), 1);
assert_eq!(Dmp::dmq_length(b), 0);
assert!(!Dmp::dmq_mqc_head(a).is_zero());
assert!(Dmp::dmq_mqc_head(b).is_zero());
});
}
@@ -241,20 +328,20 @@ mod tests {
new_test_ext(default_genesis_config()).execute_with(|| {
// processed_downward_messages=0 is allowed when the DMQ is empty.
assert!(Router::check_processed_downward_messages(a, 0).is_ok());
assert!(Dmp::check_processed_downward_messages(a, 0).is_ok());
queue_downward_message(a, vec![1, 2, 3]).unwrap();
queue_downward_message(a, vec![4, 5, 6]).unwrap();
queue_downward_message(a, vec![7, 8, 9]).unwrap();
// 0 doesn't pass if the DMQ has msgs.
assert!(!Router::check_processed_downward_messages(a, 0).is_ok());
assert!(!Dmp::check_processed_downward_messages(a, 0).is_ok());
// a candidate can consume up to 3 messages
assert!(Router::check_processed_downward_messages(a, 1).is_ok());
assert!(Router::check_processed_downward_messages(a, 2).is_ok());
assert!(Router::check_processed_downward_messages(a, 3).is_ok());
assert!(Dmp::check_processed_downward_messages(a, 1).is_ok());
assert!(Dmp::check_processed_downward_messages(a, 2).is_ok());
assert!(Dmp::check_processed_downward_messages(a, 3).is_ok());
// there is no 4 messages in the queue
assert!(!Router::check_processed_downward_messages(a, 4).is_ok());
assert!(!Dmp::check_processed_downward_messages(a, 4).is_ok());
});
}
@@ -263,19 +350,19 @@ mod tests {
let a = ParaId::from(1312);
new_test_ext(default_genesis_config()).execute_with(|| {
assert_eq!(Router::dmq_length(a), 0);
assert_eq!(Dmp::dmq_length(a), 0);
queue_downward_message(a, vec![1, 2, 3]).unwrap();
queue_downward_message(a, vec![4, 5, 6]).unwrap();
queue_downward_message(a, vec![7, 8, 9]).unwrap();
assert_eq!(Router::dmq_length(a), 3);
assert_eq!(Dmp::dmq_length(a), 3);
// pruning 0 elements shouldn't change anything.
Router::prune_dmq(a, 0);
assert_eq!(Router::dmq_length(a), 3);
Dmp::prune_dmq(a, 0);
assert_eq!(Dmp::dmq_length(a), 3);
Router::prune_dmq(a, 2);
assert_eq!(Router::dmq_length(a), 1);
Dmp::prune_dmq(a, 2);
assert_eq!(Dmp::dmq_length(a), 1);
});
}
@@ -14,19 +14,26 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::{dmp, Error as DispatchError, Module, Store, Trait};
use crate::{
ensure_parachain,
configuration::{self, HostConfiguration},
paras,
initializer, paras, dmp,
};
use codec::{Decode, Encode};
use frame_support::{ensure, traits::Get, weights::Weight, StorageMap, StorageValue};
use frame_support::{
decl_storage, decl_module, decl_error, ensure, traits::Get, weights::Weight, StorageMap,
StorageValue, dispatch::DispatchResult,
};
use primitives::v1::{
Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
SessionIndex,
};
use sp_runtime::traits::{BlakeTwo256, Hash as HashT};
use sp_std::{mem, fmt, collections::{btree_map::BTreeMap, btree_set::BTreeSet}, prelude::*};
use sp_std::{
mem, fmt,
collections::{btree_map::BTreeMap, btree_set::BTreeSet},
prelude::*,
};
/// A description of a request to open an HRMP channel.
#[derive(Encode, Decode)]
@@ -135,8 +142,7 @@ where
} => write!(
fmt,
"the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
new_watermark,
last_watermark,
new_watermark, last_watermark,
),
AheadRelayParent {
new_watermark,
@@ -144,13 +150,12 @@ where
} => write!(
fmt,
"the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
new_watermark,
relay_chain_parent_number,
new_watermark, relay_chain_parent_number
),
LandsOnBlockWithNoMessages { new_watermark } => write!(
fmt,
"the HRMP watermark ({:?}) doesn't land on a block with messages received",
new_watermark,
new_watermark
),
}
}
@@ -163,8 +168,7 @@ impl fmt::Debug for OutboundHrmpAcceptanceErr {
MoreMessagesThanPermitted { sent, permitted } => write!(
fmt,
"more HRMP messages than permitted by config ({} > {})",
sent,
permitted,
sent, permitted,
),
NotSorted { idx } => write!(
fmt,
@@ -174,9 +178,7 @@ impl fmt::Debug for OutboundHrmpAcceptanceErr {
NoSuchChannel { idx, channel_id } => write!(
fmt,
"the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
idx,
channel_id.sender,
channel_id.recipient,
idx, channel_id.sender, channel_id.recipient,
),
MaxMessageSizeExceeded {
idx,
@@ -185,9 +187,7 @@ impl fmt::Debug for OutboundHrmpAcceptanceErr {
} => write!(
fmt,
"the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
idx,
msg_size,
max_size,
idx, msg_size, max_size,
),
TotalSizeExceeded {
idx,
@@ -196,23 +196,205 @@ impl fmt::Debug for OutboundHrmpAcceptanceErr {
} => write!(
fmt,
"sending the HRMP message at index {} would exceed the neogitiated channel total size ({} > {})",
idx,
total_size,
limit,
idx, total_size, limit,
),
CapacityExceeded { idx, count, limit } => write!(
fmt,
"sending the HRMP message at index {} would exceed the neogitiated channel capacity ({} > {})",
idx,
count,
limit,
idx, count, limit,
),
}
}
}
pub trait Trait: frame_system::Trait + configuration::Trait + paras::Trait + dmp::Trait {
type Origin: From<crate::Origin>
+ From<<Self as frame_system::Trait>::Origin>
+ Into<Result<crate::Origin, <Self as Trait>::Origin>>;
}
decl_storage! {
trait Store for Module<T: Trait> as Hrmp {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/// The set of pending HRMP open channel requests.
///
/// The set is accompanied by a list for iteration.
///
/// Invariant:
/// - There are no channels that exists in list but not in the set and vice versa.
HrmpOpenChannelRequests: map hasher(twox_64_concat) HrmpChannelId => Option<HrmpOpenChannelRequest>;
HrmpOpenChannelRequestsList: Vec<HrmpChannelId>;
/// This mapping tracks how many open channel requests are inititated by a given sender para.
/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items that has `(X, _)`
/// as the number of `HrmpOpenChannelRequestCount` for `X`.
HrmpOpenChannelRequestCount: map hasher(twox_64_concat) ParaId => u32;
/// This mapping tracks how many open channel requests were accepted by a given recipient para.
/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items `(_, X)` with
/// `confirmed` set to true, as the number of `HrmpAcceptedChannelRequestCount` for `X`.
HrmpAcceptedChannelRequestCount: map hasher(twox_64_concat) ParaId => u32;
/// A set of pending HRMP close channel requests that are going to be closed during the session change.
/// Used for checking if a given channel is registered for closure.
///
/// The set is accompanied by a list for iteration.
///
/// Invariant:
/// - There are no channels that exists in list but not in the set and vice versa.
HrmpCloseChannelRequests: map hasher(twox_64_concat) HrmpChannelId => Option<()>;
HrmpCloseChannelRequestsList: Vec<HrmpChannelId>;
/// The HRMP watermark associated with each para.
/// Invariant:
/// - each para `P` used here as a key should satisfy `Paras::is_valid_para(P)` within a session.
HrmpWatermarks: map hasher(twox_64_concat) ParaId => Option<T::BlockNumber>;
/// HRMP channel data associated with each para.
/// Invariant:
/// - each participant in the channel should satisfy `Paras::is_valid_para(P)` within a session.
HrmpChannels: map hasher(twox_64_concat) HrmpChannelId => Option<HrmpChannel>;
/// Ingress/egress indexes allow to find all the senders and receivers given the opposite
/// side. I.e.
///
/// (a) ingress index allows to find all the senders for a given recipient.
/// (b) egress index allows to find all the recipients for a given sender.
///
/// Invariants:
/// - for each ingress index entry for `P` each item `I` in the index should present in `HrmpChannels`
/// as `(I, P)`.
/// - for each egress index entry for `P` each item `E` in the index should present in `HrmpChannels`
/// as `(P, E)`.
/// - there should be no other dangling channels in `HrmpChannels`.
/// - the vectors are sorted.
HrmpIngressChannelsIndex: map hasher(twox_64_concat) ParaId => Vec<ParaId>;
HrmpEgressChannelsIndex: map hasher(twox_64_concat) ParaId => Vec<ParaId>;
/// Storage for the messages for each channel.
/// Invariant: cannot be non-empty if the corresponding channel in `HrmpChannels` is `None`.
HrmpChannelContents: map hasher(twox_64_concat) HrmpChannelId => Vec<InboundHrmpMessage<T::BlockNumber>>;
/// Maintains a mapping that can be used to answer the question:
/// What paras sent a message at the given block number for a given reciever.
/// Invariants:
/// - The inner `Vec<ParaId>` is never empty.
/// - The inner `Vec<ParaId>` cannot store two same `ParaId`.
/// - The outer vector is sorted ascending by block number and cannot store two items with the same
/// block number.
HrmpChannelDigests: map hasher(twox_64_concat) ParaId => Vec<(T::BlockNumber, Vec<ParaId>)>;
}
}
decl_error! {
pub enum Error for Module<T: Trait> {
/// The sender tried to open a channel to themselves.
OpenHrmpChannelToSelf,
/// The recipient is not a valid para.
OpenHrmpChannelInvalidRecipient,
/// The requested capacity is zero.
OpenHrmpChannelZeroCapacity,
/// The requested capacity exceeds the global limit.
OpenHrmpChannelCapacityExceedsLimit,
/// The requested maximum message size is 0.
OpenHrmpChannelZeroMessageSize,
/// The open request requested the message size that exceeds the global limit.
OpenHrmpChannelMessageSizeExceedsLimit,
/// The channel already exists
OpenHrmpChannelAlreadyExists,
/// There is already a request to open the same channel.
OpenHrmpChannelAlreadyRequested,
/// The sender already has the maximum number of allowed outbound channels.
OpenHrmpChannelLimitExceeded,
/// The channel from the sender to the origin doesn't exist.
AcceptHrmpChannelDoesntExist,
/// The channel is already confirmed.
AcceptHrmpChannelAlreadyConfirmed,
/// The recipient already has the maximum number of allowed inbound channels.
AcceptHrmpChannelLimitExceeded,
/// The origin tries to close a channel where it is neither the sender nor the recipient.
CloseHrmpChannelUnauthorized,
/// The channel to be closed doesn't exist.
CloseHrmpChannelDoesntExist,
/// The channel close request is already requested.
CloseHrmpChannelAlreadyUnderway,
}
}
decl_module! {
/// The HRMP module.
pub struct Module<T: Trait> for enum Call where origin: <T as frame_system::Trait>::Origin {
type Error = Error<T>;
#[weight = 0]
fn hrmp_init_open_channel(
origin,
recipient: ParaId,
proposed_max_capacity: u32,
proposed_max_message_size: u32,
) -> DispatchResult {
let origin = ensure_parachain(<T as Trait>::Origin::from(origin))?;
Self::init_open_channel(
origin,
recipient,
proposed_max_capacity,
proposed_max_message_size
)?;
Ok(())
}
#[weight = 0]
fn hrmp_accept_open_channel(origin, sender: ParaId) -> DispatchResult {
let origin = ensure_parachain(<T as Trait>::Origin::from(origin))?;
Self::accept_open_channel(origin, sender)?;
Ok(())
}
#[weight = 0]
fn hrmp_close_channel(origin, channel_id: HrmpChannelId) -> DispatchResult {
let origin = ensure_parachain(<T as Trait>::Origin::from(origin))?;
Self::close_channel(origin, channel_id)?;
Ok(())
}
}
}
/// Routines and getters related to HRMP.
impl<T: Trait> Module<T> {
/// Block initialization logic, called by initializer.
pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight {
0
}
/// Block finalization logic, called by initializer.
pub(crate) fn initializer_finalize() {}
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
notification: &initializer::SessionChangeNotification<T::BlockNumber>,
) {
Self::perform_outgoing_para_cleanup();
Self::process_hrmp_open_channel_requests(&notification.prev_config);
Self::process_hrmp_close_channel_requests();
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
for outgoing_para in outgoing {
Self::clean_hrmp_after_outgoing(outgoing_para);
}
}
/// Schedule a para to be cleaned up at the start of the next session.
pub(crate) fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
}
/// Remove all storage entries associated with the given para.
pub(super) fn clean_hrmp_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::HrmpOpenChannelRequestCount::remove(&outgoing_para);
@@ -631,32 +813,29 @@ impl<T: Trait> Module<T> {
recipient: ParaId,
proposed_max_capacity: u32,
proposed_max_message_size: u32,
) -> Result<(), DispatchError<T>> {
ensure!(
origin != recipient,
DispatchError::<T>::OpenHrmpChannelToSelf
);
) -> Result<(), Error<T>> {
ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
ensure!(
<paras::Module<T>>::is_valid_para(recipient),
DispatchError::<T>::OpenHrmpChannelInvalidRecipient,
Error::<T>::OpenHrmpChannelInvalidRecipient,
);
let config = <configuration::Module<T>>::config();
ensure!(
proposed_max_capacity > 0,
DispatchError::<T>::OpenHrmpChannelZeroCapacity,
Error::<T>::OpenHrmpChannelZeroCapacity,
);
ensure!(
proposed_max_capacity <= config.hrmp_channel_max_capacity,
DispatchError::<T>::OpenHrmpChannelCapacityExceedsLimit,
Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
);
ensure!(
proposed_max_message_size > 0,
DispatchError::<T>::OpenHrmpChannelZeroMessageSize,
Error::<T>::OpenHrmpChannelZeroMessageSize,
);
ensure!(
proposed_max_message_size <= config.hrmp_channel_max_message_size,
DispatchError::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
);
let channel_id = HrmpChannelId {
@@ -665,11 +844,11 @@ impl<T: Trait> Module<T> {
};
ensure!(
<Self as Store>::HrmpOpenChannelRequests::get(&channel_id).is_none(),
DispatchError::<T>::OpenHrmpChannelAlreadyExists,
Error::<T>::OpenHrmpChannelAlreadyExists,
);
ensure!(
<Self as Store>::HrmpChannels::get(&channel_id).is_none(),
DispatchError::<T>::OpenHrmpChannelAlreadyRequested,
Error::<T>::OpenHrmpChannelAlreadyRequested,
);
let egress_cnt =
@@ -682,7 +861,7 @@ impl<T: Trait> Module<T> {
};
ensure!(
egress_cnt + open_req_cnt < channel_num_limit,
DispatchError::<T>::OpenHrmpChannelLimitExceeded,
Error::<T>::OpenHrmpChannelLimitExceeded,
);
// TODO: Deposit https://github.com/paritytech/polkadot/issues/1907
@@ -713,7 +892,7 @@ impl<T: Trait> Module<T> {
.encode()
};
if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
Self::queue_downward_message(&config, recipient, notification_bytes)
<dmp::Module<T>>::queue_downward_message(&config, recipient, notification_bytes)
{
// this should never happen unless the max downward message size is configured to an
// jokingly small number.
@@ -723,19 +902,16 @@ impl<T: Trait> Module<T> {
Ok(())
}
pub(super) fn accept_open_channel(
origin: ParaId,
sender: ParaId,
) -> Result<(), DispatchError<T>> {
pub(super) fn accept_open_channel(origin: ParaId, sender: ParaId) -> Result<(), Error<T>> {
let channel_id = HrmpChannelId {
sender,
recipient: origin,
};
let mut channel_req = <Self as Store>::HrmpOpenChannelRequests::get(&channel_id)
.ok_or(DispatchError::<T>::AcceptHrmpChannelDoesntExist)?;
.ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
ensure!(
!channel_req.confirmed,
DispatchError::<T>::AcceptHrmpChannelAlreadyConfirmed,
Error::<T>::AcceptHrmpChannelAlreadyConfirmed,
);
// check if by accepting this open channel request, this parachain would exceed the
@@ -751,7 +927,7 @@ impl<T: Trait> Module<T> {
let accepted_cnt = <Self as Store>::HrmpAcceptedChannelRequestCount::get(&origin);
ensure!(
ingress_cnt + accepted_cnt < channel_num_limit,
DispatchError::<T>::AcceptHrmpChannelLimitExceeded,
Error::<T>::AcceptHrmpChannelLimitExceeded,
);
// TODO: Deposit https://github.com/paritytech/polkadot/issues/1907
@@ -772,7 +948,7 @@ impl<T: Trait> Module<T> {
.encode()
};
if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
Self::queue_downward_message(&config, sender, notification_bytes)
<dmp::Module<T>>::queue_downward_message(&config, sender, notification_bytes)
{
// this should never happen unless the max downward message size is configured to an
// jokingly small number.
@@ -782,26 +958,23 @@ impl<T: Trait> Module<T> {
Ok(())
}
pub(super) fn close_channel(
origin: ParaId,
channel_id: HrmpChannelId,
) -> Result<(), DispatchError<T>> {
pub(super) fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
// check if the origin is allowed to close the channel.
ensure!(
origin == channel_id.sender || origin == channel_id.recipient,
DispatchError::<T>::CloseHrmpChannelUnauthorized,
Error::<T>::CloseHrmpChannelUnauthorized,
);
// check if the channel requested to close does exist.
ensure!(
<Self as Store>::HrmpChannels::get(&channel_id).is_some(),
DispatchError::<T>::CloseHrmpChannelDoesntExist,
Error::<T>::CloseHrmpChannelDoesntExist,
);
// check that there is no outstanding close request for this channel
ensure!(
<Self as Store>::HrmpCloseChannelRequests::get(&channel_id).is_none(),
DispatchError::<T>::CloseHrmpChannelAlreadyUnderway,
Error::<T>::CloseHrmpChannelAlreadyUnderway,
);
<Self as Store>::HrmpCloseChannelRequests::insert(&channel_id, ());
@@ -825,7 +998,7 @@ impl<T: Trait> Module<T> {
channel_id.sender
};
if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
Self::queue_downward_message(&config, opposite_party, notification_bytes)
<dmp::Module<T>>::queue_downward_message(&config, opposite_party, notification_bytes)
{
// this should never happen unless the max downward message size is configured to an
// jokingly small number.
@@ -876,19 +1049,20 @@ impl<T: Trait> Module<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::{new_test_ext, Configuration, Paras, Router, System};
use crate::router::tests::default_genesis_config;
use crate::mock::{
new_test_ext, Configuration, Paras, Hrmp, System, GenesisConfig as MockGenesisConfig,
};
use primitives::v1::BlockNumber;
use std::collections::{BTreeMap, HashSet};
pub(crate) fn run_to_block(to: BlockNumber, new_session: Option<Vec<BlockNumber>>) {
fn run_to_block(to: BlockNumber, new_session: Option<Vec<BlockNumber>>) {
use frame_support::traits::{OnFinalize as _, OnInitialize as _};
while System::block_number() < to {
let b = System::block_number();
// NOTE: this is in reverse initialization order.
Router::initializer_finalize();
Hrmp::initializer_finalize();
Paras::initializer_finalize();
System::on_finalize(b);
@@ -899,12 +1073,12 @@ mod tests {
if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) {
// NOTE: this is in initialization order.
Paras::initializer_on_new_session(&Default::default());
Router::initializer_on_new_session(&Default::default());
Hrmp::initializer_on_new_session(&Default::default());
}
// NOTE: this is in initialization order.
Paras::initializer_initialize(b + 1);
Router::initializer_initialize(b + 1);
Hrmp::initializer_initialize(b + 1);
}
}
@@ -951,6 +1125,18 @@ mod tests {
}
}
fn default_genesis_config() -> MockGenesisConfig {
MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: crate::configuration::HostConfiguration {
max_downward_message_size: 1024,
..Default::default()
},
},
..Default::default()
}
}
fn register_parachain(id: ParaId) {
Paras::schedule_para_initialize(
id,
@@ -967,17 +1153,17 @@ mod tests {
}
fn channel_exists(sender: ParaId, recipient: ParaId) -> bool {
<Router as Store>::HrmpChannels::get(&HrmpChannelId { sender, recipient }).is_some()
<Hrmp as Store>::HrmpChannels::get(&HrmpChannelId { sender, recipient }).is_some()
}
fn assert_storage_consistency_exhaustive() {
use frame_support::IterableStorageMap;
assert_eq!(
<Router as Store>::HrmpOpenChannelRequests::iter()
<Hrmp as Store>::HrmpOpenChannelRequests::iter()
.map(|(k, _)| k)
.collect::<HashSet<_>>(),
<Router as Store>::HrmpOpenChannelRequestsList::get()
<Hrmp as Store>::HrmpOpenChannelRequestsList::get()
.into_iter()
.collect::<HashSet<_>>(),
);
@@ -987,17 +1173,17 @@ mod tests {
//
// having ensured that, we can go ahead and go over all counts and verify that they match.
assert_eq!(
<Router as Store>::HrmpOpenChannelRequestCount::iter()
<Hrmp as Store>::HrmpOpenChannelRequestCount::iter()
.map(|(k, _)| k)
.collect::<HashSet<_>>(),
<Router as Store>::HrmpOpenChannelRequests::iter()
<Hrmp as Store>::HrmpOpenChannelRequests::iter()
.map(|(k, _)| k.sender)
.collect::<HashSet<_>>(),
);
for (open_channel_initiator, expected_num) in
<Router as Store>::HrmpOpenChannelRequestCount::iter()
<Hrmp as Store>::HrmpOpenChannelRequestCount::iter()
{
let actual_num = <Router as Store>::HrmpOpenChannelRequests::iter()
let actual_num = <Hrmp as Store>::HrmpOpenChannelRequests::iter()
.filter(|(ch, _)| ch.sender == open_channel_initiator)
.count() as u32;
assert_eq!(expected_num, actual_num);
@@ -1006,28 +1192,28 @@ mod tests {
// The same as above, but for accepted channel request count. Note that we are interested
// only in confirmed open requests.
assert_eq!(
<Router as Store>::HrmpAcceptedChannelRequestCount::iter()
<Hrmp as Store>::HrmpAcceptedChannelRequestCount::iter()
.map(|(k, _)| k)
.collect::<HashSet<_>>(),
<Router as Store>::HrmpOpenChannelRequests::iter()
<Hrmp as Store>::HrmpOpenChannelRequests::iter()
.filter(|(_, v)| v.confirmed)
.map(|(k, _)| k.recipient)
.collect::<HashSet<_>>(),
);
for (channel_recipient, expected_num) in
<Router as Store>::HrmpAcceptedChannelRequestCount::iter()
<Hrmp as Store>::HrmpAcceptedChannelRequestCount::iter()
{
let actual_num = <Router as Store>::HrmpOpenChannelRequests::iter()
let actual_num = <Hrmp as Store>::HrmpOpenChannelRequests::iter()
.filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
.count() as u32;
assert_eq!(expected_num, actual_num);
}
assert_eq!(
<Router as Store>::HrmpCloseChannelRequests::iter()
<Hrmp as Store>::HrmpCloseChannelRequests::iter()
.map(|(k, _)| k)
.collect::<HashSet<_>>(),
<Router as Store>::HrmpCloseChannelRequestsList::get()
<Hrmp as Store>::HrmpCloseChannelRequestsList::get()
.into_iter()
.collect::<HashSet<_>>(),
);
@@ -1035,14 +1221,14 @@ mod tests {
// A HRMP watermark can be None for an onboarded parachain. However, an offboarded parachain
// cannot have an HRMP watermark: it should've been cleanup.
assert_contains_only_onboarded(
<Router as Store>::HrmpWatermarks::iter().map(|(k, _)| k),
<Hrmp as Store>::HrmpWatermarks::iter().map(|(k, _)| k),
"HRMP watermarks should contain only onboarded paras",
);
// An entry in `HrmpChannels` indicates that the channel is open. Only open channels can
// have contents.
for (non_empty_channel, contents) in <Router as Store>::HrmpChannelContents::iter() {
assert!(<Router as Store>::HrmpChannels::contains_key(
for (non_empty_channel, contents) in <Hrmp as Store>::HrmpChannelContents::iter() {
assert!(<Hrmp as Store>::HrmpChannels::contains_key(
&non_empty_channel
));
@@ -1054,7 +1240,7 @@ mod tests {
// Senders and recipients must be onboarded. Otherwise, all channels associated with them
// are removed.
assert_contains_only_onboarded(
<Router as Store>::HrmpChannels::iter().flat_map(|(k, _)| vec![k.sender, k.recipient]),
<Hrmp as Store>::HrmpChannels::iter().flat_map(|(k, _)| vec![k.sender, k.recipient]),
"senders and recipients in all channels should be onboarded",
);
@@ -1077,13 +1263,13 @@ mod tests {
// (b, z) (b, z)
//
// and then that we compare that to the channel list in the `HrmpChannels`.
let channel_set_derived_from_ingress = <Router as Store>::HrmpIngressChannelsIndex::iter()
let channel_set_derived_from_ingress = <Hrmp as Store>::HrmpIngressChannelsIndex::iter()
.flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
.collect::<HashSet<_>>();
let channel_set_derived_from_egress = <Router as Store>::HrmpEgressChannelsIndex::iter()
let channel_set_derived_from_egress = <Hrmp as Store>::HrmpEgressChannelsIndex::iter()
.flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
.collect::<HashSet<_>>();
let channel_set_ground_truth = <Router as Store>::HrmpChannels::iter()
let channel_set_ground_truth = <Hrmp as Store>::HrmpChannels::iter()
.map(|(k, _)| (k.sender, k.recipient))
.collect::<HashSet<_>>();
assert_eq!(
@@ -1092,18 +1278,18 @@ mod tests {
);
assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
<Router as Store>::HrmpIngressChannelsIndex::iter()
<Hrmp as Store>::HrmpIngressChannelsIndex::iter()
.map(|(_, v)| v)
.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
<Router as Store>::HrmpEgressChannelsIndex::iter()
<Hrmp as Store>::HrmpEgressChannelsIndex::iter()
.map(|(_, v)| v)
.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
assert_contains_only_onboarded(
<Router as Store>::HrmpChannelDigests::iter().map(|(k, _)| k),
<Hrmp as Store>::HrmpChannelDigests::iter().map(|(k, _)| k),
"HRMP channel digests should contain only onboarded paras",
);
for (_digest_for_para, digest) in <Router as Store>::HrmpChannelDigests::iter() {
for (_digest_for_para, digest) in <Hrmp as Store>::HrmpChannelDigests::iter() {
// Assert that items are in **strictly** ascending order. The strictness also implies
// there are no duplicates.
assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
@@ -1161,10 +1347,10 @@ mod tests {
register_parachain(para_b);
run_to_block(5, Some(vec![5]));
Router::init_open_channel(para_a, para_b, 2, 8).unwrap();
Hrmp::init_open_channel(para_a, para_b, 2, 8).unwrap();
assert_storage_consistency_exhaustive();
Router::accept_open_channel(para_b, para_a).unwrap();
Hrmp::accept_open_channel(para_b, para_a).unwrap();
assert_storage_consistency_exhaustive();
// Advance to a block 6, but without session change. That means that the channel has
@@ -1189,15 +1375,15 @@ mod tests {
register_parachain(para_b);
run_to_block(5, Some(vec![5]));
Router::init_open_channel(para_a, para_b, 2, 8).unwrap();
Router::accept_open_channel(para_b, para_a).unwrap();
Hrmp::init_open_channel(para_a, para_b, 2, 8).unwrap();
Hrmp::accept_open_channel(para_b, para_a).unwrap();
run_to_block(6, Some(vec![6]));
assert!(channel_exists(para_a, para_b));
// Close the channel. The effect is not immediate, but rather deferred to the next
// session change.
Router::close_channel(
Hrmp::close_channel(
para_b,
HrmpChannelId {
sender: para_a,
@@ -1228,8 +1414,8 @@ mod tests {
register_parachain(para_b);
run_to_block(5, Some(vec![5]));
Router::init_open_channel(para_a, para_b, 2, 20).unwrap();
Router::accept_open_channel(para_b, para_a).unwrap();
Hrmp::init_open_channel(para_a, para_b, 2, 20).unwrap();
Hrmp::accept_open_channel(para_b, para_a).unwrap();
// On Block 6:
// A sends a message to B
@@ -1240,15 +1426,15 @@ mod tests {
data: b"this is an emergency".to_vec(),
}];
let config = Configuration::config();
assert!(Router::check_outbound_hrmp(&config, para_a, &msgs).is_ok());
let _ = Router::queue_outbound_hrmp(para_a, msgs);
assert!(Hrmp::check_outbound_hrmp(&config, para_a, &msgs).is_ok());
let _ = Hrmp::queue_outbound_hrmp(para_a, msgs);
assert_storage_consistency_exhaustive();
// On Block 7:
// B receives the message sent by A. B sets the watermark to 6.
run_to_block(7, None);
assert!(Router::check_hrmp_watermark(para_b, 7, 6).is_ok());
let _ = Router::prune_hrmp(para_b, 6);
assert!(Hrmp::check_hrmp_watermark(para_b, 7, 6).is_ok());
let _ = Hrmp::prune_hrmp(para_b, 6);
assert_storage_consistency_exhaustive();
});
}
@@ -1263,8 +1449,8 @@ mod tests {
register_parachain(para_b);
run_to_block(5, Some(vec![5]));
Router::init_open_channel(para_a, para_b, 2, 8).unwrap();
Router::accept_open_channel(para_b, para_a).unwrap();
Hrmp::init_open_channel(para_a, para_b, 2, 8).unwrap();
Hrmp::accept_open_channel(para_b, para_a).unwrap();
deregister_parachain(para_a);
// On Block 6: session change. The channel should not be created.
@@ -1290,10 +1476,10 @@ mod tests {
// Open two channels to the same receiver, b:
// a -> b, c -> b
Router::init_open_channel(para_a, para_b, 2, 8).unwrap();
Router::accept_open_channel(para_b, para_a).unwrap();
Router::init_open_channel(para_c, para_b, 2, 8).unwrap();
Router::accept_open_channel(para_b, para_c).unwrap();
Hrmp::init_open_channel(para_a, para_b, 2, 8).unwrap();
Hrmp::accept_open_channel(para_b, para_a).unwrap();
Hrmp::init_open_channel(para_c, para_b, 2, 8).unwrap();
Hrmp::accept_open_channel(para_b, para_c).unwrap();
// On Block 6: session change.
run_to_block(6, Some(vec![6]));
@@ -1304,12 +1490,12 @@ mod tests {
data: b"knock".to_vec(),
}];
let config = Configuration::config();
assert!(Router::check_outbound_hrmp(&config, para_a, &msgs).is_ok());
let _ = Router::queue_outbound_hrmp(para_a, msgs.clone());
assert!(Hrmp::check_outbound_hrmp(&config, para_a, &msgs).is_ok());
let _ = Hrmp::queue_outbound_hrmp(para_a, msgs.clone());
// Verify that the sent messages are there and that also the empty channels are present.
let mqc_heads = Router::hrmp_mqc_heads(para_b);
let contents = Router::inbound_hrmp_channels_contents(para_b);
let mqc_heads = Hrmp::hrmp_mqc_heads(para_b);
let contents = Hrmp::inbound_hrmp_channels_contents(para_b);
assert_eq!(
contents,
vec![
+19 -14
View File
@@ -36,7 +36,7 @@ use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
use sp_staking::SessionIndex;
use sp_runtime::{DispatchError, traits::{One, Saturating}};
use crate::{configuration, paras, router, scheduler::CoreAssignment};
use crate::{configuration, paras, dmp, ump, hrmp, scheduler::CoreAssignment};
/// A bitfield signed by a validator indicating that it is keeping its piece of the erasure-coding
/// for any backed candidates referred to by a `1` bit available.
@@ -86,7 +86,12 @@ impl<H, N> CandidatePendingAvailability<H, N> {
}
pub trait Trait:
frame_system::Trait + paras::Trait + router::Trait + configuration::Trait
frame_system::Trait
+ paras::Trait
+ dmp::Trait
+ ump::Trait
+ hrmp::Trait
+ configuration::Trait
{
type Event: From<Event<Self>> + Into<<Self as frame_system::Trait>::Event>;
}
@@ -600,19 +605,19 @@ impl<T: Trait> Module<T> {
}
// enact the messaging facet of the candidate.
weight += <router::Module<T>>::prune_dmq(
weight += <dmp::Module<T>>::prune_dmq(
receipt.descriptor.para_id,
commitments.processed_downward_messages,
);
weight += <router::Module<T>>::enact_upward_messages(
weight += <ump::Module<T>>::enact_upward_messages(
receipt.descriptor.para_id,
commitments.upward_messages,
);
weight += <router::Module<T>>::prune_hrmp(
weight += <hrmp::Module<T>>::prune_hrmp(
receipt.descriptor.para_id,
T::BlockNumber::from(commitments.hrmp_watermark),
);
weight += <router::Module<T>>::queue_outbound_hrmp(
weight += <hrmp::Module<T>>::queue_outbound_hrmp(
receipt.descriptor.para_id,
commitments.horizontal_messages,
);
@@ -719,10 +724,10 @@ enum AcceptanceCheckErr<BlockNumber> {
HeadDataTooLarge,
PrematureCodeUpgrade,
NewCodeTooLarge,
ProcessedDownwardMessages(router::ProcessedDownwardMessagesAcceptanceErr),
UpwardMessages(router::UpwardMessagesAcceptanceCheckErr),
HrmpWatermark(router::HrmpWatermarkAcceptanceErr<BlockNumber>),
OutboundHrmp(router::OutboundHrmpAcceptanceErr),
ProcessedDownwardMessages(dmp::ProcessedDownwardMessagesAcceptanceErr),
UpwardMessages(ump::AcceptanceCheckErr),
HrmpWatermark(hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>),
OutboundHrmp(hrmp::OutboundHrmpAcceptanceErr),
}
impl<BlockNumber> AcceptanceCheckErr<BlockNumber> {
@@ -795,17 +800,17 @@ impl<T: Trait> CandidateCheckContext<T> {
}
// check if the candidate passes the messaging acceptance criteria
<router::Module<T>>::check_processed_downward_messages(
<dmp::Module<T>>::check_processed_downward_messages(
para_id,
processed_downward_messages,
)?;
<router::Module<T>>::check_upward_messages(&self.config, para_id, upward_messages)?;
<router::Module<T>>::check_hrmp_watermark(
<ump::Module<T>>::check_upward_messages(&self.config, para_id, upward_messages)?;
<hrmp::Module<T>>::check_hrmp_watermark(
para_id,
self.relay_parent_number,
hrmp_watermark,
)?;
<router::Module<T>>::check_outbound_hrmp(&self.config, para_id, horizontal_messages)?;
<hrmp::Module<T>>::check_outbound_hrmp(&self.config, para_id, horizontal_messages)?;
Ok(())
}
@@ -35,7 +35,7 @@ use frame_system::ensure_none;
use crate::{
inclusion,
scheduler::{self, FreedReason},
router,
ump,
};
use inherents::{InherentIdentifier, InherentData, MakeFatalError, ProvideInherent};
@@ -117,7 +117,7 @@ decl_module! {
<scheduler::Module<T>>::occupied(&occupied);
// Give some time slice to dispatch pending upward messages.
<router::Module<T>>::process_pending_upward_messages();
<ump::Module<T>>::process_pending_upward_messages();
// And track that we've finished processing the inherent for this block.
Included::set(Some(()));
+16 -6
View File
@@ -29,7 +29,7 @@ use sp_runtime::traits::One;
use codec::{Encode, Decode};
use crate::{
configuration::{self, HostConfiguration},
paras, router, scheduler, inclusion,
paras, scheduler, inclusion, dmp, ump, hrmp,
};
/// Information about a session change that has just occurred.
@@ -63,7 +63,9 @@ pub trait Trait:
+ paras::Trait
+ scheduler::Trait
+ inclusion::Trait
+ router::Trait
+ dmp::Trait
+ ump::Trait
+ hrmp::Trait
{
/// A randomness beacon.
type Randomness: Randomness<Self::Hash>;
@@ -122,12 +124,16 @@ decl_module! {
// - Scheduler
// - Inclusion
// - Validity
// - Router
// - DMP
// - UMP
// - HRMP
let total_weight = configuration::Module::<T>::initializer_initialize(now) +
paras::Module::<T>::initializer_initialize(now) +
scheduler::Module::<T>::initializer_initialize(now) +
inclusion::Module::<T>::initializer_initialize(now) +
router::Module::<T>::initializer_initialize(now);
dmp::Module::<T>::initializer_initialize(now) +
ump::Module::<T>::initializer_initialize(now) +
hrmp::Module::<T>::initializer_initialize(now);
HasInitialized::set(Some(()));
@@ -137,7 +143,9 @@ decl_module! {
fn on_finalize() {
// reverse initialization order.
router::Module::<T>::initializer_finalize();
hrmp::Module::<T>::initializer_finalize();
ump::Module::<T>::initializer_finalize();
dmp::Module::<T>::initializer_finalize();
inclusion::Module::<T>::initializer_finalize();
scheduler::Module::<T>::initializer_finalize();
paras::Module::<T>::initializer_finalize();
@@ -181,7 +189,9 @@ impl<T: Trait> Module<T> {
paras::Module::<T>::initializer_on_new_session(&notification);
scheduler::Module::<T>::initializer_on_new_session(&notification);
inclusion::Module::<T>::initializer_on_new_session(&notification);
router::Module::<T>::initializer_on_new_session(&notification);
dmp::Module::<T>::initializer_on_new_session(&notification);
ump::Module::<T>::initializer_on_new_session(&notification);
hrmp::Module::<T>::initializer_on_new_session(&notification);
}
/// Should be called when a new session occurs. Buffers the session notification to be applied
+25 -1
View File
@@ -27,10 +27,12 @@ pub mod inclusion;
pub mod inclusion_inherent;
pub mod initializer;
pub mod paras;
pub mod router;
pub mod scheduler;
pub mod validity;
pub mod origin;
pub mod dmp;
pub mod ump;
pub mod hrmp;
pub mod runtime_api_impl;
@@ -40,3 +42,25 @@ mod util;
mod mock;
pub use origin::{Origin, ensure_parachain};
/// Schedule a para to be initialized at the start of the next session with the given genesis data.
pub fn schedule_para_initialize<T: paras::Trait>(
id: primitives::v1::Id,
genesis: paras::ParaGenesisArgs,
) {
<paras::Module<T>>::schedule_para_initialize(id, genesis);
}
/// Schedule a para to be cleaned up at the start of the next session.
pub fn schedule_para_cleanup<T>(id: primitives::v1::Id)
where
T: paras::Trait
+ dmp::Trait
+ ump::Trait
+ hrmp::Trait,
{
<paras::Module<T>>::schedule_para_cleanup(id);
<dmp::Module<T>>::schedule_para_cleanup(id);
<ump::Module<T>>::schedule_para_cleanup(id);
<hrmp::Module<T>>::schedule_para_cleanup(id);
}
+15 -4
View File
@@ -108,9 +108,14 @@ impl crate::paras::Trait for Test {
type Origin = Origin;
}
impl crate::router::Trait for Test {
impl crate::dmp::Trait for Test { }
impl crate::ump::Trait for Test {
type UmpSink = crate::ump::mock_sink::MockUmpSink;
}
impl crate::hrmp::Trait for Test {
type Origin = Origin;
type UmpSink = crate::router::MockUmpSink;
}
impl crate::scheduler::Trait for Test { }
@@ -130,8 +135,14 @@ pub type Configuration = crate::configuration::Module<Test>;
/// Mocked paras.
pub type Paras = crate::paras::Module<Test>;
/// Mocked router.
pub type Router = crate::router::Module<Test>;
/// Mocked DMP
pub type Dmp = crate::dmp::Module<Test>;
/// Mocked UMP
pub type Ump = crate::ump::Module<Test>;
/// Mocked HRMP
pub type Hrmp = crate::hrmp::Module<Test>;
/// Mocked scheduler.
pub type Scheduler = crate::scheduler::Module<Test>;
+2 -2
View File
@@ -396,7 +396,7 @@ impl<T: Trait> Module<T> {
}
/// Schedule a para to be initialized at the start of the next session.
pub fn schedule_para_initialize(id: ParaId, genesis: ParaGenesisArgs) -> Weight {
pub(crate) fn schedule_para_initialize(id: ParaId, genesis: ParaGenesisArgs) -> Weight {
let dup = UpcomingParas::mutate(|v| {
match v.binary_search(&id) {
Ok(_) => true,
@@ -418,7 +418,7 @@ impl<T: Trait> Module<T> {
}
/// Schedule a para to be cleaned up at the start of the next session.
pub fn schedule_para_cleanup(id: ParaId) -> Weight {
pub(crate) fn schedule_para_cleanup(id: ParaId) -> Weight {
let upcoming_weight = UpcomingParas::mutate(|v| {
match v.binary_search(&id) {
Ok(i) => {
-331
View File
@@ -1,331 +0,0 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The router module is responsible for handling messaging.
//!
//! The core of the messaging is checking and processing messages sent out by the candidates,
//! routing the messages at their destinations and informing the parachains about the incoming
//! messages.
use crate::{configuration, paras, initializer, ensure_parachain};
use sp_std::prelude::*;
use frame_support::{decl_error, decl_module, decl_storage, dispatch::DispatchResult, weights::Weight};
use sp_std::collections::vec_deque::VecDeque;
use primitives::v1::{
Id as ParaId, InboundDownwardMessage, Hash, UpwardMessage, HrmpChannelId, InboundHrmpMessage,
};
mod dmp;
mod hrmp;
mod ump;
use hrmp::{HrmpOpenChannelRequest, HrmpChannel};
pub use dmp::{QueueDownwardMessageError, ProcessedDownwardMessagesAcceptanceErr};
pub use ump::{UmpSink, AcceptanceCheckErr as UpwardMessagesAcceptanceCheckErr};
pub use hrmp::{HrmpWatermarkAcceptanceErr, OutboundHrmpAcceptanceErr};
#[cfg(test)]
pub use ump::mock_sink::MockUmpSink;
pub trait Trait: frame_system::Trait + configuration::Trait + paras::Trait {
type Origin: From<crate::Origin>
+ From<<Self as frame_system::Trait>::Origin>
+ Into<Result<crate::Origin, <Self as Trait>::Origin>>;
/// A place where all received upward messages are funneled.
type UmpSink: UmpSink;
}
decl_storage! {
trait Store for Module<T: Trait> as Router {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/*
* Downward Message Passing (DMP)
*
* Storage layout required for implementation of DMP.
*/
/// The downward messages addressed for a certain para.
DownwardMessageQueues: map hasher(twox_64_concat) ParaId => Vec<InboundDownwardMessage<T::BlockNumber>>;
/// A mapping that stores the downward message queue MQC head for each para.
///
/// Each link in this chain has a form:
/// `(prev_head, B, H(M))`, where
/// - `prev_head`: is the previous head hash or zero if none.
/// - `B`: is the relay-chain block number in which a message was appended.
/// - `H(M)`: is the hash of the message being appended.
DownwardMessageQueueHeads: map hasher(twox_64_concat) ParaId => Hash;
/*
* Upward Message Passing (UMP)
*
* Storage layout required for UMP, specifically dispatchable upward messages.
*/
/// The messages waiting to be handled by the relay-chain originating from a certain parachain.
///
/// Note that some upward messages might have been already processed by the inclusion logic. E.g.
/// channel management messages.
///
/// The messages are processed in FIFO order.
RelayDispatchQueues: map hasher(twox_64_concat) ParaId => VecDeque<UpwardMessage>;
/// Size of the dispatch queues. Caches sizes of the queues in `RelayDispatchQueue`.
///
/// First item in the tuple is the count of messages and second
/// is the total length (in bytes) of the message payloads.
///
/// Note that this is an auxilary mapping: it's possible to tell the byte size and the number of
/// messages only looking at `RelayDispatchQueues`. This mapping is separate to avoid the cost of
/// loading the whole message queue if only the total size and count are required.
///
/// Invariant:
/// - The set of keys should exactly match the set of keys of `RelayDispatchQueues`.
RelayDispatchQueueSize: map hasher(twox_64_concat) ParaId => (u32, u32);
/// The ordered list of `ParaId`s that have a `RelayDispatchQueue` entry.
///
/// Invariant:
/// - The set of items from this vector should be exactly the set of the keys in
/// `RelayDispatchQueues` and `RelayDispatchQueueSize`.
NeedsDispatch: Vec<ParaId>;
/// This is the para that gets will get dispatched first during the next upward dispatchable queue
/// execution round.
///
/// Invariant:
/// - If `Some(para)`, then `para` must be present in `NeedsDispatch`.
NextDispatchRoundStartWith: Option<ParaId>;
/*
* Horizontally Relay-routed Message Passing (HRMP)
*
* HRMP related storage layout
*/
/// The set of pending HRMP open channel requests.
///
/// The set is accompanied by a list for iteration.
///
/// Invariant:
/// - There are no channels that exists in list but not in the set and vice versa.
HrmpOpenChannelRequests: map hasher(twox_64_concat) HrmpChannelId => Option<HrmpOpenChannelRequest>;
HrmpOpenChannelRequestsList: Vec<HrmpChannelId>;
/// This mapping tracks how many open channel requests are inititated by a given sender para.
/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items that has `(X, _)`
/// as the number of `HrmpOpenChannelRequestCount` for `X`.
HrmpOpenChannelRequestCount: map hasher(twox_64_concat) ParaId => u32;
/// This mapping tracks how many open channel requests were accepted by a given recipient para.
/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items `(_, X)` with
/// `confirmed` set to true, as the number of `HrmpAcceptedChannelRequestCount` for `X`.
HrmpAcceptedChannelRequestCount: map hasher(twox_64_concat) ParaId => u32;
/// A set of pending HRMP close channel requests that are going to be closed during the session change.
/// Used for checking if a given channel is registered for closure.
///
/// The set is accompanied by a list for iteration.
///
/// Invariant:
/// - There are no channels that exists in list but not in the set and vice versa.
HrmpCloseChannelRequests: map hasher(twox_64_concat) HrmpChannelId => Option<()>;
HrmpCloseChannelRequestsList: Vec<HrmpChannelId>;
/// The HRMP watermark associated with each para.
/// Invariant:
/// - each para `P` used here as a key should satisfy `Paras::is_valid_para(P)` within a session.
HrmpWatermarks: map hasher(twox_64_concat) ParaId => Option<T::BlockNumber>;
/// HRMP channel data associated with each para.
/// Invariant:
/// - each participant in the channel should satisfy `Paras::is_valid_para(P)` within a session.
HrmpChannels: map hasher(twox_64_concat) HrmpChannelId => Option<HrmpChannel>;
/// Ingress/egress indexes allow to find all the senders and receivers given the opposite
/// side. I.e.
///
/// (a) ingress index allows to find all the senders for a given recipient.
/// (b) egress index allows to find all the recipients for a given sender.
///
/// Invariants:
/// - for each ingress index entry for `P` each item `I` in the index should present in `HrmpChannels`
/// as `(I, P)`.
/// - for each egress index entry for `P` each item `E` in the index should present in `HrmpChannels`
/// as `(P, E)`.
/// - there should be no other dangling channels in `HrmpChannels`.
/// - the vectors are sorted.
HrmpIngressChannelsIndex: map hasher(twox_64_concat) ParaId => Vec<ParaId>;
HrmpEgressChannelsIndex: map hasher(twox_64_concat) ParaId => Vec<ParaId>;
/// Storage for the messages for each channel.
/// Invariant: cannot be non-empty if the corresponding channel in `HrmpChannels` is `None`.
HrmpChannelContents: map hasher(twox_64_concat) HrmpChannelId => Vec<InboundHrmpMessage<T::BlockNumber>>;
/// Maintains a mapping that can be used to answer the question:
/// What paras sent a message at the given block number for a given reciever.
/// Invariants:
/// - The inner `Vec<ParaId>` is never empty.
/// - The inner `Vec<ParaId>` cannot store two same `ParaId`.
/// - The outer vector is sorted ascending by block number and cannot store two items with the same
/// block number.
HrmpChannelDigests: map hasher(twox_64_concat) ParaId => Vec<(T::BlockNumber, Vec<ParaId>)>;
}
}
decl_error! {
pub enum Error for Module<T: Trait> {
/// The sender tried to open a channel to themselves.
OpenHrmpChannelToSelf,
/// The recipient is not a valid para.
OpenHrmpChannelInvalidRecipient,
/// The requested capacity is zero.
OpenHrmpChannelZeroCapacity,
/// The requested capacity exceeds the global limit.
OpenHrmpChannelCapacityExceedsLimit,
/// The requested maximum message size is 0.
OpenHrmpChannelZeroMessageSize,
/// The open request requested the message size that exceeds the global limit.
OpenHrmpChannelMessageSizeExceedsLimit,
/// The channel already exists
OpenHrmpChannelAlreadyExists,
/// There is already a request to open the same channel.
OpenHrmpChannelAlreadyRequested,
/// The sender already has the maximum number of allowed outbound channels.
OpenHrmpChannelLimitExceeded,
/// The channel from the sender to the origin doesn't exist.
AcceptHrmpChannelDoesntExist,
/// The channel is already confirmed.
AcceptHrmpChannelAlreadyConfirmed,
/// The recipient already has the maximum number of allowed inbound channels.
AcceptHrmpChannelLimitExceeded,
/// The origin tries to close a channel where it is neither the sender nor the recipient.
CloseHrmpChannelUnauthorized,
/// The channel to be closed doesn't exist.
CloseHrmpChannelDoesntExist,
/// The channel close request is already requested.
CloseHrmpChannelAlreadyUnderway,
}
}
decl_module! {
/// The router module.
pub struct Module<T: Trait> for enum Call where origin: <T as frame_system::Trait>::Origin {
type Error = Error<T>;
#[weight = 0]
fn hrmp_init_open_channel(
origin,
recipient: ParaId,
proposed_max_capacity: u32,
proposed_max_message_size: u32,
) -> DispatchResult {
let origin = ensure_parachain(<T as Trait>::Origin::from(origin))?;
Self::init_open_channel(
origin,
recipient,
proposed_max_capacity,
proposed_max_message_size
)?;
Ok(())
}
#[weight = 0]
fn hrmp_accept_open_channel(origin, sender: ParaId) -> DispatchResult {
let origin = ensure_parachain(<T as Trait>::Origin::from(origin))?;
Self::accept_open_channel(origin, sender)?;
Ok(())
}
#[weight = 0]
fn hrmp_close_channel(origin, channel_id: HrmpChannelId) -> DispatchResult {
let origin = ensure_parachain(<T as Trait>::Origin::from(origin))?;
Self::close_channel(origin, channel_id)?;
Ok(())
}
}
}
impl<T: Trait> Module<T> {
/// Block initialization logic, called by initializer.
pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight {
0
}
/// Block finalization logic, called by initializer.
pub(crate) fn initializer_finalize() {}
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
notification: &initializer::SessionChangeNotification<T::BlockNumber>,
) {
Self::perform_outgoing_para_cleanup();
Self::process_hrmp_open_channel_requests(&notification.prev_config);
Self::process_hrmp_close_channel_requests();
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
for outgoing_para in outgoing {
Self::clean_dmp_after_outgoing(outgoing_para);
Self::clean_ump_after_outgoing(outgoing_para);
Self::clean_hrmp_after_outgoing(outgoing_para);
}
}
/// Schedule a para to be cleaned up at the start of the next session.
pub fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use primitives::v1::BlockNumber;
use frame_support::traits::{OnFinalize, OnInitialize};
use crate::mock::{System, Router, GenesisConfig as MockGenesisConfig};
pub(crate) fn run_to_block(to: BlockNumber, new_session: Option<Vec<BlockNumber>>) {
while System::block_number() < to {
let b = System::block_number();
Router::initializer_finalize();
System::on_finalize(b);
System::on_initialize(b + 1);
System::set_block_number(b + 1);
if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) {
Router::initializer_on_new_session(&Default::default());
}
Router::initializer_initialize(b + 1);
}
}
pub(crate) fn default_genesis_config() -> MockGenesisConfig {
MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: crate::configuration::HostConfiguration {
max_downward_message_size: 1024,
..Default::default()
},
},
..Default::default()
}
}
}
@@ -28,7 +28,7 @@ use primitives::v1::{
};
use sp_runtime::traits::Zero;
use frame_support::debug;
use crate::{initializer, inclusion, scheduler, configuration, paras, router};
use crate::{initializer, inclusion, scheduler, configuration, paras, dmp, hrmp};
/// Implementation for the `validators` function of the runtime API.
pub fn validators<T: initializer::Trait>() -> Vec<ValidatorId> {
@@ -310,15 +310,15 @@ where
}
/// Implementation for the `dmq_contents` function of the runtime API.
pub fn dmq_contents<T: router::Trait>(
pub fn dmq_contents<T: dmp::Trait>(
recipient: ParaId,
) -> Vec<InboundDownwardMessage<T::BlockNumber>> {
<router::Module<T>>::dmq_contents(recipient)
<dmp::Module<T>>::dmq_contents(recipient)
}
/// Implementation for the `inbound_hrmp_channels_contents` function of the runtime API.
pub fn inbound_hrmp_channels_contents<T: router::Trait>(
pub fn inbound_hrmp_channels_contents<T: hrmp::Trait>(
recipient: ParaId,
) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<T::BlockNumber>>> {
<router::Module<T>>::inbound_hrmp_channels_contents(recipient)
<hrmp::Module<T>>::inbound_hrmp_channels_contents(recipient)
}
@@ -14,11 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::{Trait, Module, Store};
use crate::configuration::{self, HostConfiguration};
use crate::{
configuration::{self, HostConfiguration},
initializer,
};
use sp_std::{fmt, prelude::*};
use sp_std::collections::{btree_map::BTreeMap, vec_deque::VecDeque};
use frame_support::{StorageMap, StorageValue, weights::Weight, traits::Get};
use frame_support::{decl_module, decl_storage, StorageMap, StorageValue, weights::Weight, traits::Get};
use primitives::v1::{Id as ParaId, UpwardMessage};
/// All upward messages coming from parachains will be funneled into an implementation of this trait.
@@ -78,8 +80,7 @@ impl fmt::Debug for AcceptanceCheckErr {
AcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted } => write!(
fmt,
"more upward messages than permitted by config ({} > {})",
sent,
permitted,
sent, permitted,
),
AcceptanceCheckErr::MessageSize {
idx,
@@ -88,29 +89,109 @@ impl fmt::Debug for AcceptanceCheckErr {
} => write!(
fmt,
"upward message idx {} larger than permitted by config ({} > {})",
idx,
msg_size,
max_size,
idx, msg_size, max_size,
),
AcceptanceCheckErr::CapacityExceeded { count, limit } => write!(
fmt,
"the ump queue would have more items than permitted by config ({} > {})",
count,
limit,
count, limit,
),
AcceptanceCheckErr::TotalSizeExceeded { total_size, limit } => write!(
fmt,
"the ump queue would have grown past the max size permitted by config ({} > {})",
total_size,
limit,
total_size, limit,
),
}
}
}
pub trait Trait: frame_system::Trait + configuration::Trait {
/// A place where all received upward messages are funneled.
type UmpSink: UmpSink;
}
decl_storage! {
trait Store for Module<T: Trait> as Ump {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/// The messages waiting to be handled by the relay-chain originating from a certain parachain.
///
/// Note that some upward messages might have been already processed by the inclusion logic. E.g.
/// channel management messages.
///
/// The messages are processed in FIFO order.
RelayDispatchQueues: map hasher(twox_64_concat) ParaId => VecDeque<UpwardMessage>;
/// Size of the dispatch queues. Caches sizes of the queues in `RelayDispatchQueue`.
///
/// First item in the tuple is the count of messages and second
/// is the total length (in bytes) of the message payloads.
///
/// Note that this is an auxilary mapping: it's possible to tell the byte size and the number of
/// messages only looking at `RelayDispatchQueues`. This mapping is separate to avoid the cost of
/// loading the whole message queue if only the total size and count are required.
///
/// Invariant:
/// - The set of keys should exactly match the set of keys of `RelayDispatchQueues`.
RelayDispatchQueueSize: map hasher(twox_64_concat) ParaId => (u32, u32);
/// The ordered list of `ParaId`s that have a `RelayDispatchQueue` entry.
///
/// Invariant:
/// - The set of items from this vector should be exactly the set of the keys in
/// `RelayDispatchQueues` and `RelayDispatchQueueSize`.
NeedsDispatch: Vec<ParaId>;
/// This is the para that gets will get dispatched first during the next upward dispatchable queue
/// execution round.
///
/// Invariant:
/// - If `Some(para)`, then `para` must be present in `NeedsDispatch`.
NextDispatchRoundStartWith: Option<ParaId>;
}
}
decl_module! {
/// The UMP module.
pub struct Module<T: Trait> for enum Call where origin: <T as frame_system::Trait>::Origin {
}
}
/// Routines related to the upward message passing.
impl<T: Trait> Module<T> {
pub(super) fn clean_ump_after_outgoing(outgoing_para: ParaId) {
/// Block initialization logic, called by initializer.
pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight {
0
}
/// Block finalization logic, called by initializer.
pub(crate) fn initializer_finalize() {}
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
_notification: &initializer::SessionChangeNotification<T::BlockNumber>,
) {
Self::perform_outgoing_para_cleanup();
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
for outgoing_para in outgoing {
Self::clean_ump_after_outgoing(outgoing_para);
}
}
/// Schedule a para to be cleaned up at the start of the next session.
pub(crate) fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
}
fn clean_ump_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::RelayDispatchQueueSize::remove(&outgoing_para);
<Self as Store>::RelayDispatchQueues::remove(&outgoing_para);
@@ -193,13 +274,10 @@ impl<T: Trait> Module<T> {
v.extend(upward_messages.into_iter())
});
<Self as Store>::RelayDispatchQueueSize::mutate(
&para,
|(ref mut cnt, ref mut size)| {
*cnt += extra_cnt;
*size += extra_size;
},
);
<Self as Store>::RelayDispatchQueueSize::mutate(&para, |(ref mut cnt, ref mut size)| {
*cnt += extra_cnt;
*size += extra_size;
});
<Self as Store>::NeedsDispatch::mutate(|v| {
if let Err(i) = v.binary_search(&para) {
@@ -545,8 +623,7 @@ pub(crate) mod mock_sink {
mod tests {
use super::*;
use super::mock_sink::Probe;
use crate::router::tests::default_genesis_config;
use crate::mock::{Configuration, Router, new_test_ext};
use crate::mock::{Configuration, Ump, new_test_ext, GenesisConfig as MockGenesisConfig};
use frame_support::IterableStorageMap;
use std::collections::HashSet;
@@ -585,22 +662,33 @@ mod tests {
}
}
fn default_genesis_config() -> MockGenesisConfig {
MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: crate::configuration::HostConfiguration {
max_downward_message_size: 1024,
..Default::default()
},
},
..Default::default()
}
}
fn queue_upward_msg(para: ParaId, msg: UpwardMessage) {
let msgs = vec![msg];
assert!(Router::check_upward_messages(&Configuration::config(), para, &msgs).is_ok());
let _ = Router::enact_upward_messages(para, msgs);
assert!(Ump::check_upward_messages(&Configuration::config(), para, &msgs).is_ok());
let _ = Ump::enact_upward_messages(para, msgs);
}
fn assert_storage_consistency_exhaustive() {
// check that empty queues don't clutter the storage.
for (_para, queue) in <Router as Store>::RelayDispatchQueues::iter() {
for (_para, queue) in <Ump as Store>::RelayDispatchQueues::iter() {
assert!(!queue.is_empty());
}
// actually count the counts and sizes in queues and compare them to the bookkeeped version.
for (para, queue) in <Router as Store>::RelayDispatchQueues::iter() {
let (expected_count, expected_size) =
<Router as Store>::RelayDispatchQueueSize::get(para);
for (para, queue) in <Ump as Store>::RelayDispatchQueues::iter() {
let (expected_count, expected_size) = <Ump as Store>::RelayDispatchQueueSize::get(para);
let (actual_count, actual_size) =
queue.into_iter().fold((0, 0), |(acc_count, acc_size), x| {
(acc_count + 1, acc_size + x.len() as u32)
@@ -612,27 +700,29 @@ mod tests {
// since we wipe the empty queues the sets of paras in queue contents, queue sizes and
// need dispatch set should all be equal.
let queue_contents_set = <Router as Store>::RelayDispatchQueues::iter()
let queue_contents_set = <Ump as Store>::RelayDispatchQueues::iter()
.map(|(k, _)| k)
.collect::<HashSet<ParaId>>();
let queue_sizes_set = <Router as Store>::RelayDispatchQueueSize::iter()
let queue_sizes_set = <Ump as Store>::RelayDispatchQueueSize::iter()
.map(|(k, _)| k)
.collect::<HashSet<ParaId>>();
let needs_dispatch_set = <Router as Store>::NeedsDispatch::get()
let needs_dispatch_set = <Ump as Store>::NeedsDispatch::get()
.into_iter()
.collect::<HashSet<ParaId>>();
assert_eq!(queue_contents_set, queue_sizes_set);
assert_eq!(queue_contents_set, needs_dispatch_set);
// `NextDispatchRoundStartWith` should point into a para that is tracked.
if let Some(para) = <Router as Store>::NextDispatchRoundStartWith::get() {
if let Some(para) = <Ump as Store>::NextDispatchRoundStartWith::get() {
assert!(queue_contents_set.contains(&para));
}
// `NeedsDispatch` is always sorted.
assert!(<Router as Store>::NeedsDispatch::get()
.windows(2)
.all(|xs| xs[0] <= xs[1]));
assert!(
<Ump as Store>::NeedsDispatch::get()
.windows(2)
.all(|xs| xs[0] <= xs[1])
);
}
#[test]
@@ -641,7 +731,7 @@ mod tests {
assert_storage_consistency_exhaustive();
// make sure that the case with empty queues is handled properly
Router::process_pending_upward_messages();
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
});
@@ -658,7 +748,7 @@ mod tests {
probe.assert_msg(a, msg.clone(), 0);
queue_upward_msg(a, msg);
Router::process_pending_upward_messages();
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
});
@@ -697,7 +787,7 @@ mod tests {
probe.assert_msg(a, a_msg_1.clone(), 300);
probe.assert_msg(c, c_msg_1.clone(), 300);
Router::process_pending_upward_messages();
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
drop(probe);
@@ -711,7 +801,7 @@ mod tests {
let mut probe = Probe::new();
probe.assert_msg(q, q_msg.clone(), 500);
Router::process_pending_upward_messages();
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
drop(probe);
@@ -723,7 +813,7 @@ mod tests {
probe.assert_msg(a, a_msg_2.clone(), 100);
probe.assert_msg(c, c_msg_2.clone(), 100);
Router::process_pending_upward_messages();
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
drop(probe);
@@ -733,7 +823,7 @@ mod tests {
{
let probe = Probe::new();
Router::process_pending_upward_messages();
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
drop(probe);
@@ -775,7 +865,7 @@ mod tests {
probe.assert_msg(b, b_msg_1.clone(), 300);
probe.assert_msg(a, a_msg_2.clone(), 300);
Router::process_pending_upward_messages();
Ump::process_pending_upward_messages();
drop(probe);
}
+6 -6
View File
@@ -20,12 +20,12 @@
use sp_runtime::traits::{One, Saturating};
use primitives::v1::{Id as ParaId, PersistedValidationData, TransientValidationData};
use crate::{configuration, paras, router};
use crate::{configuration, paras, dmp, hrmp};
/// Make the persisted validation data for a particular parachain.
///
/// This ties together the storage of several modules.
pub fn make_persisted_validation_data<T: paras::Trait + router::Trait>(
pub fn make_persisted_validation_data<T: paras::Trait + hrmp::Trait>(
para_id: ParaId,
) -> Option<PersistedValidationData<T::BlockNumber>> {
let relay_parent_number = <frame_system::Module<T>>::block_number() - One::one();
@@ -33,15 +33,15 @@ pub fn make_persisted_validation_data<T: paras::Trait + router::Trait>(
Some(PersistedValidationData {
parent_head: <paras::Module<T>>::para_head(&para_id)?,
block_number: relay_parent_number,
hrmp_mqc_heads: <router::Module<T>>::hrmp_mqc_heads(para_id),
dmq_mqc_head: <router::Module<T>>::dmq_mqc_head(para_id),
hrmp_mqc_heads: <hrmp::Module<T>>::hrmp_mqc_heads(para_id),
dmq_mqc_head: <dmp::Module<T>>::dmq_mqc_head(para_id),
})
}
/// Make the transient validation data for a particular parachain.
///
/// This ties together the storage of several modules.
pub fn make_transient_validation_data<T: paras::Trait + router::Trait>(
pub fn make_transient_validation_data<T: paras::Trait + dmp::Trait>(
para_id: ParaId,
) -> Option<TransientValidationData<T::BlockNumber>> {
let config = <configuration::Module<T>>::config();
@@ -67,6 +67,6 @@ pub fn make_transient_validation_data<T: paras::Trait + router::Trait>(
max_head_data_size: config.max_head_data_size,
balance: 0,
code_upgrade_allowed,
dmq_length: <router::Module<T>>::dmq_length(para_id),
dmq_length: <dmp::Module<T>>::dmq_length(para_id),
})
}