scheduler: handle re-scheduling around finalization correctly (#2257)

* scheduler: handle re-scheduling around finalization correctly

* also make sure parathreads get cleaned

* run scheduling in finalization

* Remove stray println!

* Update the schedule call site in inclusion inherent

* Clarify subtlety around SessionStartBlock

* Remove double semi-colon

* reschedule prior to `availability_cores` and in on-initialize

* improve docs

* fix line

* more doc reformat

* remove unneeded call

* avoid unnecessary scheduling on initialize

* split `clear` and `schedule

* Update runtime/parachains/src/scheduler.rs

Co-authored-by: Sergei Shulepov <sergei@parity.io>

Co-authored-by: Sergei Shulepov <sergei@parity.io>
This commit is contained in:
Robert Habermeier
2021-01-13 17:07:09 -05:00
committed by GitHub
parent 1d28f59e19
commit 3465c18b71
8 changed files with 269 additions and 62 deletions
@@ -124,7 +124,11 @@ decl_module! {
let freed = freed_concluded.into_iter().map(|c| (c, FreedReason::Concluded))
.chain(freed_timeout.into_iter().map(|c| (c, FreedReason::TimedOut)));
<scheduler::Module<T>>::schedule(freed);
<scheduler::Module<T>>::clear();
<scheduler::Module<T>>::schedule(
freed,
<frame_system::Module<T>>::block_number(),
);
let backed_candidates = limit_backed_candidates::<T>(backed_candidates);
let backed_candidates_len = backed_candidates.len() as Weight;
@@ -19,6 +19,7 @@
use sp_std::prelude::*;
use sp_std::collections::btree_map::BTreeMap;
use sp_runtime::traits::One;
use primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, ValidationData,
Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode,
@@ -39,8 +40,10 @@ pub fn validator_groups<T: initializer::Config>() -> (
Vec<Vec<ValidatorIndex>>,
GroupRotationInfo<T::BlockNumber>,
) {
let now = <frame_system::Module<T>>::block_number() + One::one();
let groups = <scheduler::Module<T>>::validator_groups();
let rotation_info = <scheduler::Module<T>>::group_rotation_info();
let rotation_info = <scheduler::Module<T>>::group_rotation_info(now);
(groups, rotation_info)
}
@@ -51,7 +54,11 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T:
let parachains = <paras::Module<T>>::parachains();
let config = <configuration::Module<T>>::config();
let rotation_info = <scheduler::Module<T>>::group_rotation_info();
let now = <frame_system::Module<T>>::block_number() + One::one();
<scheduler::Module<T>>::clear();
<scheduler::Module<T>>::schedule(Vec::new(), now);
let rotation_info = <scheduler::Module<T>>::group_rotation_info(now);
let time_out_at = |backed_in_number, availability_period| {
let time_out_at = backed_in_number + availability_period;
+233 -42
View File
@@ -46,7 +46,7 @@ use frame_support::{
weights::Weight,
};
use parity_scale_codec::{Encode, Decode};
use sp_runtime::traits::Saturating;
use sp_runtime::traits::{One, Saturating};
use rand::{SeedableRng, seq::SliceRandom};
use rand_chacha::ChaCha20Rng;
@@ -183,10 +183,18 @@ decl_storage! {
/// Bounded by the number of parathread cores and scheduling lookahead. Reasonably, 10 * 50 = 500.
ParathreadClaimIndex: Vec<ParaId>;
/// The block number where the session start occurred. Used to track how many group rotations have occurred.
///
/// Note that in the context of parachains modules the session change is signalled during
/// the block and enacted at the end of the block (at the finalization stage, to be exact).
/// Thus for all intents and purposes the effect of the session change is observed at the
/// block following the session change, block number of which we save in this storage value.
SessionStartBlock get(fn session_start_block): T::BlockNumber;
/// Currently scheduled cores - free but up to be occupied.
///
/// Bounded by the number of cores: one for each parachain and parathread multiplexer.
///
/// The value contained here will not be valid after the end of a block. Runtime APIs should be used to determine scheduled cores/
/// for the upcoming block.
Scheduled get(fn scheduled): Vec<CoreAssignment>; // sorted ascending by CoreIndex.
}
}
@@ -205,30 +213,11 @@ decl_module! {
impl<T: Config> Module<T> {
/// Called by the initializer to initialize the scheduler module.
pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight {
// Free all scheduled cores and return parathread claims to queue, with retries incremented.
let config = <configuration::Module<T>>::config();
ParathreadQueue::mutate(|queue| {
for core_assignment in Scheduled::take() {
if let AssignmentKind::Parathread(collator, retries) = core_assignment.kind {
let entry = ParathreadEntry {
claim: ParathreadClaim(core_assignment.para_id, collator),
retries: retries + 1,
};
if entry.retries <= config.parathread_retries {
queue.enqueue_entry(entry, config.parathread_cores);
}
}
}
});
Self::schedule(Vec::new());
0
}
/// Called by the initializer to finalize the scheduler module.
pub(crate) fn initializer_finalize() {}
pub(crate) fn initializer_finalize() { }
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(notification: &SessionChangeNotification<T::BlockNumber>) {
@@ -250,7 +239,6 @@ impl<T: Config> Module<T> {
},
);
<SessionStartBlock<T>>::set(<frame_system::Module<T>>::block_number());
AvailabilityCores::mutate(|cores| {
// clear all occupied cores.
for maybe_occupied in cores.iter_mut() {
@@ -337,6 +325,9 @@ impl<T: Config> Module<T> {
}
});
ParathreadQueue::set(thread_queue);
let now = <frame_system::Module<T>>::block_number() + One::one();
<SessionStartBlock<T>>::set(now);
}
/// Add a parathread claim to the queue. If there is a competing claim in the queue or currently
@@ -375,7 +366,10 @@ impl<T: Config> Module<T> {
/// Schedule all unassigned cores, where possible. Provide a list of cores that should be considered
/// newly-freed along with the reason for them being freed. The list is assumed to be sorted in
/// ascending order by core index.
pub(crate) fn schedule(just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>) {
pub(crate) fn schedule(
just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>,
now: T::BlockNumber,
) {
let mut cores = AvailabilityCores::get();
let config = <configuration::Module<T>>::config();
@@ -411,7 +405,6 @@ impl<T: Config> Module<T> {
let parachains = <paras::Module<T>>::parachains();
let mut scheduled = Scheduled::get();
let mut parathread_queue = ParathreadQueue::get();
let now = <frame_system::Module<T>>::block_number();
if ValidatorGroups::get().is_empty() { return }
@@ -638,9 +631,8 @@ impl<T: Config> Module<T> {
}
/// Returns a helper for determining group rotation.
pub(crate) fn group_rotation_info() -> GroupRotationInfo<T::BlockNumber> {
pub(crate) fn group_rotation_info(now: T::BlockNumber) -> GroupRotationInfo<T::BlockNumber> {
let session_start_block = Self::session_start_block();
let now = <frame_system::Module<T>>::block_number();
let group_rotation_frequency = <configuration::Module<T>>::config()
.group_rotation_frequency;
@@ -716,6 +708,27 @@ impl<T: Config> Module<T> {
})
}
}
// Free all scheduled cores and return parathread claims to queue, with retries incremented.
pub(crate) fn clear() {
let config = <configuration::Module<T>>::config();
ParathreadQueue::mutate(|queue| {
for core_assignment in Scheduled::take() {
if let AssignmentKind::Parathread(collator, retries) = core_assignment.kind {
if !<paras::Module<T>>::is_parathread(core_assignment.para_id) { continue }
let entry = ParathreadEntry {
claim: ParathreadClaim(core_assignment.para_id, collator),
retries: retries + 1,
};
if entry.retries <= config.parathread_retries {
queue.enqueue_entry(entry, config.parathread_cores);
}
}
}
});
}
}
#[cfg(test)]
@@ -741,21 +754,42 @@ mod tests {
Scheduler::initializer_finalize();
Paras::initializer_finalize();
System::on_finalize(b);
System::on_initialize(b + 1);
System::set_block_number(b + 1);
if let Some(notification) = new_session(b + 1) {
Paras::initializer_on_new_session(&notification);
Scheduler::initializer_on_new_session(&notification);
}
System::on_finalize(b);
System::on_initialize(b + 1);
System::set_block_number(b + 1);
Paras::initializer_initialize(b + 1);
Scheduler::initializer_initialize(b + 1);
// In the real runt;me this is expected to be called by the `InclusionInherent` module.
Scheduler::clear();
Scheduler::schedule(Vec::new(), b + 1);
}
}
fn run_to_end_of_block(
to: BlockNumber,
new_session: impl Fn(BlockNumber) -> Option<SessionChangeNotification<BlockNumber>>,
) {
run_to_block(to, &new_session);
Scheduler::initializer_finalize();
Paras::initializer_finalize();
if let Some(notification) = new_session(to + 1) {
Paras::initializer_on_new_session(&notification);
Scheduler::initializer_on_new_session(&notification);
}
System::on_finalize(to);
}
fn default_config() -> HostConfiguration<BlockNumber> {
HostConfiguration {
parathread_cores: 3,
@@ -1334,11 +1368,14 @@ mod tests {
}
// now note that cores 0, 2, and 3 were freed.
Scheduler::schedule(vec![
(CoreIndex(0), FreedReason::Concluded),
(CoreIndex(2), FreedReason::Concluded),
(CoreIndex(3), FreedReason::TimedOut), // should go back on queue.
]);
Scheduler::schedule(
vec![
(CoreIndex(0), FreedReason::Concluded),
(CoreIndex(2), FreedReason::Concluded),
(CoreIndex(3), FreedReason::TimedOut), // should go back on queue.
],
3
);
{
let scheduled = Scheduler::scheduled();
@@ -1455,10 +1492,13 @@ mod tests {
run_to_block(3, |_| None);
// now note that cores 0 and 2 were freed.
Scheduler::schedule(vec![
(CoreIndex(0), FreedReason::Concluded),
(CoreIndex(2), FreedReason::Concluded),
]);
Scheduler::schedule(
vec![
(CoreIndex(0), FreedReason::Concluded),
(CoreIndex(2), FreedReason::Concluded),
],
3,
);
{
let scheduled = Scheduler::scheduled();
@@ -1557,8 +1597,6 @@ mod tests {
// one block before first rotation.
run_to_block(rotation_frequency, |_| None);
let rotations_since_session_start = (rotation_frequency - session_start_block) / rotation_frequency;
assert_eq!(rotations_since_session_start, 0);
assert_groups_rotated(0);
// first rotation.
@@ -2038,4 +2076,157 @@ mod tests {
}
});
}
#[test]
fn session_change_requires_reschedule_dropping_removed_paras() {
let genesis_config = MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: default_config(),
..Default::default()
},
..Default::default()
};
assert_eq!(default_config().parathread_cores, 3);
new_test_ext(genesis_config).execute_with(|| {
let chain_a = ParaId::from(1);
let chain_b = ParaId::from(2);
// ensure that we have 5 groups by registering 2 parachains.
Paras::schedule_para_initialize(chain_a, ParaGenesisArgs {
genesis_head: Vec::new().into(),
validation_code: Vec::new().into(),
parachain: true,
});
Paras::schedule_para_initialize(chain_b, ParaGenesisArgs {
genesis_head: Vec::new().into(),
validation_code: Vec::new().into(),
parachain: true,
});
run_to_block(1, |number| match number {
1 => Some(SessionChangeNotification {
new_config: default_config(),
validators: vec![
ValidatorId::from(Sr25519Keyring::Alice.public()),
ValidatorId::from(Sr25519Keyring::Bob.public()),
ValidatorId::from(Sr25519Keyring::Charlie.public()),
ValidatorId::from(Sr25519Keyring::Dave.public()),
ValidatorId::from(Sr25519Keyring::Eve.public()),
ValidatorId::from(Sr25519Keyring::Ferdie.public()),
ValidatorId::from(Sr25519Keyring::One.public()),
],
random_seed: [99; 32],
..Default::default()
}),
_ => None,
});
assert_eq!(Scheduler::scheduled().len(), 2);
let groups = ValidatorGroups::get();
assert_eq!(groups.len(), 5);
Paras::schedule_para_cleanup(chain_b);
run_to_end_of_block(2, |number| match number {
2 => Some(SessionChangeNotification {
new_config: default_config(),
validators: vec![
ValidatorId::from(Sr25519Keyring::Alice.public()),
ValidatorId::from(Sr25519Keyring::Bob.public()),
ValidatorId::from(Sr25519Keyring::Charlie.public()),
ValidatorId::from(Sr25519Keyring::Dave.public()),
ValidatorId::from(Sr25519Keyring::Eve.public()),
ValidatorId::from(Sr25519Keyring::Ferdie.public()),
ValidatorId::from(Sr25519Keyring::One.public()),
],
random_seed: [99; 32],
..Default::default()
}),
_ => None,
});
Scheduler::clear();
Scheduler::schedule(Vec::new(), 3);
assert_eq!(
Scheduler::scheduled(),
vec![
CoreAssignment {
core: CoreIndex(0),
para_id: chain_a,
kind: AssignmentKind::Parachain,
group_idx: GroupIndex(0),
}
],
);
});
}
#[test]
fn parathread_claims_are_pruned_after_deregistration() {
let genesis_config = MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: default_config(),
..Default::default()
},
..Default::default()
};
let thread_a = ParaId::from(1);
let thread_b = ParaId::from(2);
let collator = CollatorId::from(Sr25519Keyring::Alice.public());
let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs {
genesis_head: Vec::new().into(),
validation_code: Vec::new().into(),
parachain: is_chain,
});
new_test_ext(genesis_config).execute_with(|| {
assert_eq!(default_config().parathread_cores, 3);
schedule_blank_para(thread_a, false);
schedule_blank_para(thread_b, false);
// start a new session to activate, 5 validators for 5 cores.
run_to_block(1, |number| match number {
1 => Some(SessionChangeNotification {
new_config: default_config(),
validators: vec![
ValidatorId::from(Sr25519Keyring::Alice.public()),
ValidatorId::from(Sr25519Keyring::Eve.public()),
],
..Default::default()
}),
_ => None,
});
Scheduler::add_parathread_claim(ParathreadClaim(thread_a, collator.clone()));
Scheduler::add_parathread_claim(ParathreadClaim(thread_b, collator.clone()));
run_to_block(2, |_| None);
assert_eq!(Scheduler::scheduled().len(), 2);
Paras::schedule_para_cleanup(thread_a);
// start a new session to activate, 5 validators for 5 cores.
run_to_block(3, |number| match number {
3 => Some(SessionChangeNotification {
new_config: default_config(),
validators: vec![
ValidatorId::from(Sr25519Keyring::Alice.public()),
ValidatorId::from(Sr25519Keyring::Eve.public()),
],
..Default::default()
}),
_ => None,
});
assert_eq!(Scheduler::scheduled().len(), 1);
});
}
}