Initializer + Paras Clean Up Messages When Offboarding (#2413)

* initial hack in

* finish up

* use notification to pass outgoing paras

* move outgoing paras from notifications

* missing comma

* update guides

* clean up
This commit is contained in:
Shawn Tabrizi
2021-02-15 13:48:25 -04:00
committed by GitHub
parent 22f85c809b
commit eb0159f288
10 changed files with 116 additions and 149 deletions
+16 -42
View File
@@ -66,10 +66,6 @@ pub trait Config: frame_system::Config + configuration::Config {}
decl_storage! {
trait Store for Module<T: Config> as Dmp {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/// The downward messages addressed for a certain para.
DownwardMessageQueues: map hasher(twox_64_concat) ParaId => Vec<InboundDownwardMessage<T::BlockNumber>>;
/// A mapping that stores the downward message queue MQC head for each para.
@@ -101,31 +97,23 @@ impl<T: Config> Module<T> {
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
_notification: &initializer::SessionChangeNotification<T::BlockNumber>,
outgoing_paras: &[ParaId],
) {
Self::perform_outgoing_para_cleanup();
Self::perform_outgoing_para_cleanup(outgoing_paras);
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// Iterate over all paras that were noted for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::clean_dmp_after_outgoing(outgoing_para);
}
}
fn clean_dmp_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::DownwardMessageQueues::remove(&outgoing_para);
<Self as Store>::DownwardMessageQueueHeads::remove(&outgoing_para);
}
/// Schedule a para to be cleaned up at the start of the next session.
pub(crate) fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
/// Remove all relevant storage items for an outgoing parachain.
fn clean_dmp_after_outgoing(outgoing_para: &ParaId) {
<Self as Store>::DownwardMessageQueues::remove(outgoing_para);
<Self as Store>::DownwardMessageQueueHeads::remove(outgoing_para);
}
/// Enqueue a downward message to a specific recipient para.
@@ -229,23 +217,24 @@ mod tests {
use super::*;
use hex_literal::hex;
use primitives::v1::BlockNumber;
use frame_support::StorageValue;
use frame_support::traits::{OnFinalize, OnInitialize};
use parity_scale_codec::Encode;
use crate::mock::{Configuration, new_test_ext, System, Dmp, MockGenesisConfig};
use crate::mock::{Configuration, new_test_ext, System, Dmp, MockGenesisConfig, Paras};
pub(crate) fn run_to_block(to: BlockNumber, new_session: Option<Vec<BlockNumber>>) {
while System::block_number() < to {
let b = System::block_number();
Paras::initializer_finalize();
Dmp::initializer_finalize();
if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) {
Dmp::initializer_on_new_session(&Default::default());
Dmp::initializer_on_new_session(&Default::default(), &Vec::new());
}
System::on_finalize(b);
System::on_initialize(b + 1);
System::set_block_number(b + 1);
Paras::initializer_finalize();
Dmp::initializer_initialize(b + 1);
}
}
@@ -270,39 +259,24 @@ mod tests {
}
#[test]
fn scheduled_cleanup_performed() {
fn clean_dmp_works() {
let a = ParaId::from(1312);
let b = ParaId::from(228);
let c = ParaId::from(123);
new_test_ext(default_genesis_config()).execute_with(|| {
run_to_block(1, None);
// enqueue downward messages to A, B and C.
queue_downward_message(a, vec![1, 2, 3]).unwrap();
queue_downward_message(b, vec![4, 5, 6]).unwrap();
queue_downward_message(c, vec![7, 8, 9]).unwrap();
Dmp::schedule_para_cleanup(a);
// run to block without session change.
run_to_block(2, None);
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&c).is_empty());
Dmp::schedule_para_cleanup(b);
// run to block changing the session.
run_to_block(3, Some(vec![3]));
let notification = crate::initializer::SessionChangeNotification::default();
let outgoing_paras = vec![a, b];
Dmp::initializer_on_new_session(&notification, &outgoing_paras);
assert!(<Dmp as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(<Dmp as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&c).is_empty());
// verify that the outgoing paras are emptied.
assert!(OutgoingParas::get().is_empty())
});
}
+11 -25
View File
@@ -229,10 +229,6 @@ pub trait Config: frame_system::Config + configuration::Config + paras::Config +
decl_storage! {
trait Store for Module<T: Config> as Hrmp {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/// The set of pending HRMP open channel requests.
///
/// The set is accompanied by a list for iteration.
@@ -404,42 +400,33 @@ impl<T: Config> Module<T> {
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
notification: &initializer::SessionChangeNotification<T::BlockNumber>,
outgoing_paras: &[ParaId],
) {
Self::perform_outgoing_para_cleanup();
Self::perform_outgoing_para_cleanup(outgoing_paras);
Self::process_hrmp_open_channel_requests(&notification.prev_config);
Self::process_hrmp_close_channel_requests();
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// Iterate over all paras that were noted for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::clean_hrmp_after_outgoing(outgoing_para);
}
}
/// Schedule a para to be cleaned up at the start of the next session.
pub(crate) fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
}
/// Remove all storage entries associated with the given para.
fn clean_hrmp_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::HrmpOpenChannelRequestCount::remove(&outgoing_para);
<Self as Store>::HrmpAcceptedChannelRequestCount::remove(&outgoing_para);
fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
<Self as Store>::HrmpOpenChannelRequestCount::remove(outgoing_para);
<Self as Store>::HrmpAcceptedChannelRequestCount::remove(outgoing_para);
let ingress = <Self as Store>::HrmpIngressChannelsIndex::take(&outgoing_para)
let ingress = <Self as Store>::HrmpIngressChannelsIndex::take(outgoing_para)
.into_iter()
.map(|sender| HrmpChannelId {
sender,
recipient: outgoing_para.clone(),
});
let egress = <Self as Store>::HrmpEgressChannelsIndex::take(&outgoing_para)
let egress = <Self as Store>::HrmpEgressChannelsIndex::take(outgoing_para)
.into_iter()
.map(|recipient| HrmpChannelId {
sender: outgoing_para.clone(),
@@ -1149,8 +1136,8 @@ mod tests {
};
// NOTE: this is in initialization order.
Paras::initializer_on_new_session(&notification);
Hrmp::initializer_on_new_session(&notification);
let outgoing_paras = Paras::initializer_on_new_session(&notification);
Hrmp::initializer_on_new_session(&notification, &outgoing_paras);
}
System::on_finalize(b);
@@ -1247,7 +1234,6 @@ mod tests {
fn deregister_parachain(id: ParaId) {
Paras::schedule_para_cleanup(id);
Hrmp::schedule_para_cleanup(id);
}
fn channel_exists(sender: ParaId, recipient: ParaId) -> bool {
+61 -6
View File
@@ -199,13 +199,13 @@ impl<T: Config> Module<T> {
session_index,
};
paras::Module::<T>::initializer_on_new_session(&notification);
let outgoing_paras = paras::Module::<T>::initializer_on_new_session(&notification);
scheduler::Module::<T>::initializer_on_new_session(&notification);
inclusion::Module::<T>::initializer_on_new_session(&notification);
session_info::Module::<T>::initializer_on_new_session(&notification);
dmp::Module::<T>::initializer_on_new_session(&notification);
ump::Module::<T>::initializer_on_new_session(&notification);
hrmp::Module::<T>::initializer_on_new_session(&notification);
dmp::Module::<T>::initializer_on_new_session(&notification, &outgoing_paras);
ump::Module::<T>::initializer_on_new_session(&notification, &outgoing_paras);
hrmp::Module::<T>::initializer_on_new_session(&notification, &outgoing_paras);
}
/// Should be called when a new session occurs. Buffers the session notification to be applied
@@ -259,9 +259,16 @@ impl<T: pallet_session::Config + Config> OneSessionHandler<T::AccountId> for Mod
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::{new_test_ext, Initializer, System};
use primitives::v1::{Id as ParaId};
use crate::mock::{
new_test_ext,
Initializer, System, Dmp, Paras, Configuration, MockGenesisConfig,
};
use frame_support::traits::{OnFinalize, OnInitialize};
use frame_support::{
assert_ok,
traits::{OnFinalize, OnInitialize},
};
#[test]
fn session_change_before_initialize_is_still_buffered_after() {
@@ -316,4 +323,52 @@ mod tests {
assert!(HasInitialized::get().is_none());
})
}
#[test]
fn scheduled_cleanup_performed() {
let a = ParaId::from(1312);
let b = ParaId::from(228);
let c = ParaId::from(123);
let mock_genesis = crate::paras::ParaGenesisArgs {
parachain: true,
genesis_head: Default::default(),
validation_code: Default::default(),
};
new_test_ext(
MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: crate::configuration::HostConfiguration {
max_downward_message_size: 1024,
..Default::default()
},
},
paras: crate::paras::GenesisConfig {
paras: vec![
(a, mock_genesis.clone()),
(b, mock_genesis.clone()),
(c, mock_genesis.clone()),
],
..Default::default()
},
..Default::default()
}
).execute_with(|| {
// enqueue downward messages to A, B and C.
assert_ok!(Dmp::queue_downward_message(&Configuration::config(), a, vec![1, 2, 3]));
assert_ok!(Dmp::queue_downward_message(&Configuration::config(), b, vec![4, 5, 6]));
assert_ok!(Dmp::queue_downward_message(&Configuration::config(), c, vec![7, 8, 9]));
Paras::schedule_para_cleanup(a);
Paras::schedule_para_cleanup(b);
Initializer::apply_new_session(1, vec![], vec![]);
assert!(Dmp::dmq_contents(a).is_empty());
assert!(Dmp::dmq_contents(b).is_empty());
assert!(!Dmp::dmq_contents(c).is_empty());
});
}
}
+1 -10
View File
@@ -54,15 +54,6 @@ pub fn schedule_para_initialize<T: paras::Config>(
}
/// Schedule a para to be cleaned up at the start of the next session.
pub fn schedule_para_cleanup<T>(id: primitives::v1::Id)
where
T: paras::Config
+ dmp::Config
+ ump::Config
+ hrmp::Config,
{
pub fn schedule_para_cleanup<T: paras::Config>(id: primitives::v1::Id) {
<paras::Module<T>>::schedule_para_cleanup(id);
<dmp::Module<T>>::schedule_para_cleanup(id);
<ump::Module<T>>::schedule_para_cleanup(id);
<hrmp::Module<T>>::schedule_para_cleanup(id);
}
+13 -7
View File
@@ -317,21 +317,27 @@ impl<T: Config> Module<T> {
pub(crate) fn initializer_finalize() { }
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(_notification: &SessionChangeNotification<T::BlockNumber>) {
///
/// Returns the list of outgoing parachains for this session.
pub(crate) fn initializer_on_new_session(_notification: &SessionChangeNotification<T::BlockNumber>)
-> Vec<ParaId>
{
let now = <frame_system::Module<T>>::block_number();
let mut parachains = Self::clean_up_outgoing(now);
let (mut parachains, outgoing) = Self::clean_up_outgoing(now);
Self::apply_incoming(&mut parachains);
Self::apply_upgrades(&mut parachains);
Self::apply_downgrades(&mut parachains);
<Self as Store>::Parachains::set(parachains);
outgoing
}
/// Cleans up all outgoing paras. Returns the new set of parachains
fn clean_up_outgoing(now: T::BlockNumber) -> Vec<ParaId> {
/// Cleans up all outgoing paras. Returns the new set of parachains and any outgoing parachains.
fn clean_up_outgoing(now: T::BlockNumber) -> (Vec<ParaId>, Vec<ParaId>) {
let mut parachains = <Self as Store>::Parachains::get();
let outgoing = <Self as Store>::OutgoingParas::take();
for outgoing_para in outgoing {
for outgoing_para in &outgoing {
// Warn if there is a state error... but still perform the offboarding to be defensive.
if let Some(state) = ParaLifecycles::get(&outgoing_para) {
if !state.is_outgoing() {
@@ -353,11 +359,11 @@ impl<T: Config> Module<T> {
let removed_code = <Self as Store>::CurrentCode::take(&outgoing_para);
if let Some(removed_code) = removed_code {
Self::note_past_code(outgoing_para, now, now, removed_code);
Self::note_past_code(*outgoing_para, now, now, removed_code);
}
}
parachains
(parachains, outgoing)
}
/// Applies all incoming paras, updating the parachains list for those that are parachains.
+10 -22
View File
@@ -148,10 +148,6 @@ pub trait Config: frame_system::Config + configuration::Config {
decl_storage! {
trait Store for Module<T: Config> as Ump {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/// The messages waiting to be handled by the relay-chain originating from a certain parachain.
///
/// Note that some upward messages might have been already processed by the inclusion logic. E.g.
@@ -207,31 +203,23 @@ impl<T: Config> Module<T> {
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
_notification: &initializer::SessionChangeNotification<T::BlockNumber>,
outgoing_paras: &[ParaId],
) {
Self::perform_outgoing_para_cleanup();
Self::perform_outgoing_para_cleanup(outgoing_paras);
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// Iterate over all paras that were noted for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::clean_ump_after_outgoing(outgoing_para);
}
}
/// Schedule a para to be cleaned up at the start of the next session.
pub(crate) fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
}
fn clean_ump_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::RelayDispatchQueueSize::remove(&outgoing_para);
<Self as Store>::RelayDispatchQueues::remove(&outgoing_para);
/// Remove all relevant storage items for an outgoing parachain.
fn clean_ump_after_outgoing(outgoing_para: &ParaId) {
<Self as Store>::RelayDispatchQueueSize::remove(outgoing_para);
<Self as Store>::RelayDispatchQueues::remove(outgoing_para);
// Remove the outgoing para from the `NeedsDispatch` list and from
// `NextDispatchRoundStartWith`.
@@ -239,12 +227,12 @@ impl<T: Config> Module<T> {
// That's needed for maintaining invariant that `NextDispatchRoundStartWith` points to an
// existing item in `NeedsDispatch`.
<Self as Store>::NeedsDispatch::mutate(|v| {
if let Ok(i) = v.binary_search(&outgoing_para) {
if let Ok(i) = v.binary_search(outgoing_para) {
v.remove(i);
}
});
<Self as Store>::NextDispatchRoundStartWith::mutate(|v| {
*v = v.filter(|p| *p == outgoing_para)
*v = v.filter(|p| p == outgoing_para)
});
}