mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 11:41:02 +00:00
Parachains scheduler.rs to FrameV2 (#3529)
* migration * migrate runtimes * storage visability * Remove unused Call part * Remove Call part import from parachain mock runtime * Remove Call part import from test-runtime Co-authored-by: Keith Yeung <kungfukeith11@gmail.com>
This commit is contained in:
@@ -1477,7 +1477,7 @@ construct_runtime! {
|
||||
ParasShared: parachains_shared::{Pallet, Call, Storage} = 52,
|
||||
ParaInclusion: parachains_inclusion::{Pallet, Call, Storage, Event<T>} = 53,
|
||||
ParasInherent: parachains_paras_inherent::{Pallet, Call, Storage, Inherent} = 54,
|
||||
ParasScheduler: parachains_scheduler::{Pallet, Call, Storage} = 55,
|
||||
ParaScheduler: parachains_scheduler::{Pallet, Storage} = 55,
|
||||
Paras: parachains_paras::{Pallet, Call, Storage, Event, Config} = 56,
|
||||
Initializer: parachains_initializer::{Pallet, Call, Storage} = 57,
|
||||
Dmp: parachains_dmp::{Pallet, Call, Storage} = 58,
|
||||
|
||||
@@ -135,7 +135,7 @@ pub mod pallet {
|
||||
let total_weight = configuration::Pallet::<T>::initializer_initialize(now) +
|
||||
shared::Pallet::<T>::initializer_initialize(now) +
|
||||
paras::Pallet::<T>::initializer_initialize(now) +
|
||||
scheduler::Module::<T>::initializer_initialize(now) +
|
||||
scheduler::Pallet::<T>::initializer_initialize(now) +
|
||||
inclusion::Pallet::<T>::initializer_initialize(now) +
|
||||
session_info::Pallet::<T>::initializer_initialize(now) +
|
||||
T::DisputesHandler::initializer_initialize(now) +
|
||||
@@ -156,7 +156,7 @@ pub mod pallet {
|
||||
T::DisputesHandler::initializer_finalize();
|
||||
session_info::Pallet::<T>::initializer_finalize();
|
||||
inclusion::Pallet::<T>::initializer_finalize();
|
||||
scheduler::Module::<T>::initializer_finalize();
|
||||
scheduler::Pallet::<T>::initializer_finalize();
|
||||
paras::Pallet::<T>::initializer_finalize();
|
||||
shared::Pallet::<T>::initializer_finalize();
|
||||
configuration::Pallet::<T>::initializer_finalize();
|
||||
@@ -234,7 +234,7 @@ impl<T: Config> Pallet<T> {
|
||||
};
|
||||
|
||||
let outgoing_paras = paras::Pallet::<T>::initializer_on_new_session(¬ification);
|
||||
scheduler::Module::<T>::initializer_on_new_session(¬ification);
|
||||
scheduler::Pallet::<T>::initializer_on_new_session(¬ification);
|
||||
inclusion::Pallet::<T>::initializer_on_new_session(¬ification);
|
||||
session_info::Pallet::<T>::initializer_on_new_session(¬ification);
|
||||
T::DisputesHandler::initializer_on_new_session(¬ification);
|
||||
|
||||
@@ -49,7 +49,7 @@ frame_support::construct_runtime!(
|
||||
Configuration: configuration::{Pallet, Call, Storage, Config<T>},
|
||||
ParasShared: shared::{Pallet, Call, Storage},
|
||||
ParaInclusion: inclusion::{Pallet, Call, Storage, Event<T>},
|
||||
Scheduler: scheduler::{Pallet, Call, Storage},
|
||||
Scheduler: scheduler::{Pallet, Storage},
|
||||
Initializer: initializer::{Pallet, Call, Storage},
|
||||
Dmp: dmp::{Pallet, Call, Storage},
|
||||
Ump: ump::{Pallet, Call, Storage, Event},
|
||||
|
||||
@@ -148,11 +148,11 @@ decl_module! {
|
||||
|
||||
// Process new availability bitfields, yielding any availability cores whose
|
||||
// work has now concluded.
|
||||
let expected_bits = <scheduler::Module<T>>::availability_cores().len();
|
||||
let expected_bits = <scheduler::Pallet<T>>::availability_cores().len();
|
||||
let freed_concluded = <inclusion::Pallet<T>>::process_bitfields(
|
||||
expected_bits,
|
||||
signed_bitfields,
|
||||
<scheduler::Module<T>>::core_para,
|
||||
<scheduler::Pallet<T>>::core_para,
|
||||
)?;
|
||||
|
||||
// Inform the disputes module of all included candidates.
|
||||
@@ -162,7 +162,7 @@ decl_module! {
|
||||
}
|
||||
|
||||
// Handle timeouts for any availability core work.
|
||||
let availability_pred = <scheduler::Module<T>>::availability_timeout_predicate();
|
||||
let availability_pred = <scheduler::Pallet<T>>::availability_timeout_predicate();
|
||||
let freed_timeout = if let Some(pred) = availability_pred {
|
||||
<inclusion::Pallet<T>>::collect_pending(pred)
|
||||
} else {
|
||||
@@ -177,8 +177,8 @@ decl_module! {
|
||||
|
||||
freed.sort_unstable_by_key(|pair| pair.0); // sort by core index
|
||||
|
||||
<scheduler::Module<T>>::clear();
|
||||
<scheduler::Module<T>>::schedule(
|
||||
<scheduler::Pallet<T>>::clear();
|
||||
<scheduler::Pallet<T>>::schedule(
|
||||
freed,
|
||||
<frame_system::Pallet<T>>::block_number(),
|
||||
);
|
||||
@@ -202,12 +202,12 @@ decl_module! {
|
||||
let occupied = <inclusion::Pallet<T>>::process_candidates(
|
||||
parent_storage_root,
|
||||
backed_candidates,
|
||||
<scheduler::Module<T>>::scheduled(),
|
||||
<scheduler::Module<T>>::group_validators,
|
||||
<scheduler::Pallet<T>>::scheduled(),
|
||||
<scheduler::Pallet<T>>::group_validators,
|
||||
)?;
|
||||
|
||||
// Note which of the scheduled cores were actually occupied by a backed candidate.
|
||||
<scheduler::Module<T>>::occupied(&occupied);
|
||||
<scheduler::Pallet<T>>::occupied(&occupied);
|
||||
|
||||
// Give some time slice to dispatch pending upward messages.
|
||||
<ump::Pallet<T>>::process_pending_upward_messages();
|
||||
|
||||
@@ -42,23 +42,23 @@ pub fn validator_groups<T: initializer::Config>() -> (
|
||||
) {
|
||||
let now = <frame_system::Pallet<T>>::block_number() + One::one();
|
||||
|
||||
let groups = <scheduler::Module<T>>::validator_groups();
|
||||
let rotation_info = <scheduler::Module<T>>::group_rotation_info(now);
|
||||
let groups = <scheduler::Pallet<T>>::validator_groups();
|
||||
let rotation_info = <scheduler::Pallet<T>>::group_rotation_info(now);
|
||||
|
||||
(groups, rotation_info)
|
||||
}
|
||||
|
||||
/// Implementation for the `availability_cores` function of the runtime API.
|
||||
pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T::BlockNumber>> {
|
||||
let cores = <scheduler::Module<T>>::availability_cores();
|
||||
let cores = <scheduler::Pallet<T>>::availability_cores();
|
||||
let parachains = <paras::Pallet<T>>::parachains();
|
||||
let config = <configuration::Pallet<T>>::config();
|
||||
|
||||
let now = <frame_system::Pallet<T>>::block_number() + One::one();
|
||||
<scheduler::Module<T>>::clear();
|
||||
<scheduler::Module<T>>::schedule(Vec::new(), now);
|
||||
<scheduler::Pallet<T>>::clear();
|
||||
<scheduler::Pallet<T>>::schedule(Vec::new(), now);
|
||||
|
||||
let rotation_info = <scheduler::Module<T>>::group_rotation_info(now);
|
||||
let rotation_info = <scheduler::Pallet<T>>::group_rotation_info(now);
|
||||
|
||||
let time_out_at = |backed_in_number, availability_period| {
|
||||
let time_out_at = backed_in_number + availability_period;
|
||||
@@ -81,7 +81,7 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T:
|
||||
};
|
||||
|
||||
let group_responsible_for = |backed_in_number, core_index| {
|
||||
match <scheduler::Module<T>>::group_assigned_to_core(core_index, backed_in_number) {
|
||||
match <scheduler::Pallet<T>>::group_assigned_to_core(core_index, backed_in_number) {
|
||||
Some(g) => g,
|
||||
None => {
|
||||
log::warn!(
|
||||
@@ -106,7 +106,7 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T:
|
||||
|
||||
let backed_in_number = pending_availability.backed_in_number().clone();
|
||||
OccupiedCore {
|
||||
next_up_on_available: <scheduler::Module<T>>::next_up_on_available(
|
||||
next_up_on_available: <scheduler::Pallet<T>>::next_up_on_available(
|
||||
CoreIndex(i as u32)
|
||||
),
|
||||
occupied_since: backed_in_number,
|
||||
@@ -114,7 +114,7 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T:
|
||||
backed_in_number,
|
||||
config.chain_availability_period,
|
||||
),
|
||||
next_up_on_time_out: <scheduler::Module<T>>::next_up_on_time_out(
|
||||
next_up_on_time_out: <scheduler::Pallet<T>>::next_up_on_time_out(
|
||||
CoreIndex(i as u32)
|
||||
),
|
||||
availability: pending_availability.availability_votes().clone(),
|
||||
@@ -134,7 +134,7 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T:
|
||||
|
||||
let backed_in_number = pending_availability.backed_in_number().clone();
|
||||
OccupiedCore {
|
||||
next_up_on_available: <scheduler::Module<T>>::next_up_on_available(
|
||||
next_up_on_available: <scheduler::Pallet<T>>::next_up_on_available(
|
||||
CoreIndex(i as u32)
|
||||
),
|
||||
occupied_since: backed_in_number,
|
||||
@@ -142,7 +142,7 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T:
|
||||
backed_in_number,
|
||||
config.thread_availability_period,
|
||||
),
|
||||
next_up_on_time_out: <scheduler::Module<T>>::next_up_on_time_out(
|
||||
next_up_on_time_out: <scheduler::Pallet<T>>::next_up_on_time_out(
|
||||
CoreIndex(i as u32)
|
||||
),
|
||||
availability: pending_availability.availability_votes().clone(),
|
||||
@@ -160,7 +160,7 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T:
|
||||
}).collect();
|
||||
|
||||
// This will overwrite only `Free` cores if the scheduler module is working as intended.
|
||||
for scheduled in <scheduler::Module<T>>::scheduled() {
|
||||
for scheduled in <scheduler::Pallet<T>>::scheduled() {
|
||||
core_states[scheduled.core.0 as usize] = CoreState::Scheduled(ScheduledCore {
|
||||
para_id: scheduled.para_id,
|
||||
collator: scheduled.required_collator().map(|c| c.clone()),
|
||||
|
||||
@@ -41,15 +41,13 @@ use primitives::v1::{
|
||||
Id as ParaId, ValidatorIndex, CoreOccupied, CoreIndex, CollatorId,
|
||||
GroupIndex, ParathreadClaim, ParathreadEntry, GroupRotationInfo, ScheduledCore,
|
||||
};
|
||||
use frame_support::{
|
||||
decl_storage, decl_module, decl_error,
|
||||
weights::Weight,
|
||||
};
|
||||
use parity_scale_codec::{Encode, Decode};
|
||||
use frame_support::pallet_prelude::*;
|
||||
use sp_runtime::traits::{One, Saturating};
|
||||
|
||||
use crate::{configuration, paras, initializer::SessionChangeNotification};
|
||||
|
||||
pub use pallet::*;
|
||||
|
||||
/// A queued parathread entry, pre-assigned to a core.
|
||||
#[derive(Encode, Decode, Default)]
|
||||
#[cfg_attr(test, derive(PartialEq, Debug))]
|
||||
@@ -150,66 +148,77 @@ impl CoreAssignment {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Config: frame_system::Config + configuration::Config + paras::Config { }
|
||||
|
||||
decl_storage! {
|
||||
trait Store for Module<T: Config> as ParaScheduler {
|
||||
/// All the validator groups. One for each core. Indices are into `ActiveValidators` - not the
|
||||
/// broader set of Polkadot validators, but instead just the subset used for parachains during
|
||||
/// this session.
|
||||
///
|
||||
/// Bound: The number of cores is the sum of the numbers of parachains and parathread multiplexers.
|
||||
/// Reasonably, 100-1000. The dominant factor is the number of validators: safe upper bound at 10k.
|
||||
ValidatorGroups get(fn validator_groups): Vec<Vec<ValidatorIndex>>;
|
||||
#[frame_support::pallet]
|
||||
pub mod pallet {
|
||||
use super::*;
|
||||
|
||||
#[pallet::pallet]
|
||||
#[pallet::generate_store(pub(super) trait Store)]
|
||||
pub struct Pallet<T>(_);
|
||||
|
||||
#[pallet::config]
|
||||
pub trait Config: frame_system::Config + configuration::Config + paras::Config {}
|
||||
|
||||
/// All the validator groups. One for each core. Indices are into `ActiveValidators` - not the
|
||||
/// broader set of Polkadot validators, but instead just the subset used for parachains during
|
||||
/// this session.
|
||||
///
|
||||
/// Bound: The number of cores is the sum of the numbers of parachains and parathread multiplexers.
|
||||
/// Reasonably, 100-1000. The dominant factor is the number of validators: safe upper bound at 10k.
|
||||
#[pallet::storage]
|
||||
#[pallet::getter(fn validator_groups)]
|
||||
pub(crate) type ValidatorGroups<T> = StorageValue<_, Vec<Vec<ValidatorIndex>>, ValueQuery>;
|
||||
|
||||
/// A queue of upcoming claims and which core they should be mapped onto.
|
||||
///
|
||||
/// The number of queued claims is bounded at the `scheduling_lookahead`
|
||||
/// multiplied by the number of parathread multiplexer cores. Reasonably, 10 * 50 = 500.
|
||||
#[pallet::storage]
|
||||
pub(crate) type ParathreadQueue<T> = StorageValue<_, ParathreadClaimQueue, ValueQuery>;
|
||||
|
||||
/// One entry for each availability core. Entries are `None` if the core is not currently occupied. Can be
|
||||
/// temporarily `Some` if scheduled but not occupied.
|
||||
/// The i'th parachain belongs to the i'th core, with the remaining cores all being
|
||||
/// parathread-multiplexers.
|
||||
///
|
||||
/// Bounded by the maximum of either of these two values:
|
||||
/// * The number of parachains and parathread multiplexers
|
||||
/// * The number of validators divided by `configuration.max_validators_per_core`.
|
||||
#[pallet::storage]
|
||||
#[pallet::getter(fn availability_cores)]
|
||||
pub(crate) type AvailabilityCores<T> = StorageValue<_, Vec<Option<CoreOccupied>>, ValueQuery>;
|
||||
|
||||
/// An index used to ensure that only one claim on a parathread exists in the queue or is
|
||||
/// currently being handled by an occupied core.
|
||||
///
|
||||
/// Bounded by the number of parathread cores and scheduling lookahead. Reasonably, 10 * 50 = 500.
|
||||
#[pallet::storage]
|
||||
pub(crate) type ParathreadClaimIndex<T> = StorageValue<_, Vec<ParaId>, ValueQuery>;
|
||||
|
||||
/// The block number where the session start occurred. Used to track how many group rotations have occurred.
|
||||
///
|
||||
/// Note that in the context of parachains modules the session change is signaled during
|
||||
/// the block and enacted at the end of the block (at the finalization stage, to be exact).
|
||||
/// Thus for all intents and purposes the effect of the session change is observed at the
|
||||
/// block following the session change, block number of which we save in this storage value.
|
||||
#[pallet::storage]
|
||||
#[pallet::getter(fn session_start_block)]
|
||||
pub(crate) type SessionStartBlock<T: Config> = StorageValue<_, T::BlockNumber, ValueQuery>;
|
||||
|
||||
/// Currently scheduled cores - free but up to be occupied.
|
||||
///
|
||||
/// Bounded by the number of cores: one for each parachain and parathread multiplexer.
|
||||
///
|
||||
/// The value contained here will not be valid after the end of a block. Runtime APIs should be used to determine scheduled cores/
|
||||
/// for the upcoming block.
|
||||
#[pallet::storage]
|
||||
#[pallet::getter(fn scheduled)]
|
||||
pub(crate) type Scheduled<T> = StorageValue<_, Vec<CoreAssignment>, ValueQuery>; // sorted ascending by CoreIndex.
|
||||
|
||||
/// A queue of upcoming claims and which core they should be mapped onto.
|
||||
///
|
||||
/// The number of queued claims is bounded at the `scheduling_lookahead`
|
||||
/// multiplied by the number of parathread multiplexer cores. Reasonably, 10 * 50 = 500.
|
||||
ParathreadQueue: ParathreadClaimQueue;
|
||||
/// One entry for each availability core. Entries are `None` if the core is not currently occupied. Can be
|
||||
/// temporarily `Some` if scheduled but not occupied.
|
||||
/// The i'th parachain belongs to the i'th core, with the remaining cores all being
|
||||
/// parathread-multiplexers.
|
||||
///
|
||||
/// Bounded by the maximum of either of these two values:
|
||||
/// * The number of parachains and parathread multiplexers
|
||||
/// * The number of validators divided by `configuration.max_validators_per_core`.
|
||||
AvailabilityCores get(fn availability_cores): Vec<Option<CoreOccupied>>;
|
||||
/// An index used to ensure that only one claim on a parathread exists in the queue or is
|
||||
/// currently being handled by an occupied core.
|
||||
///
|
||||
/// Bounded by the number of parathread cores and scheduling lookahead. Reasonably, 10 * 50 = 500.
|
||||
ParathreadClaimIndex: Vec<ParaId>;
|
||||
/// The block number where the session start occurred. Used to track how many group rotations have occurred.
|
||||
///
|
||||
/// Note that in the context of parachains modules the session change is signaled during
|
||||
/// the block and enacted at the end of the block (at the finalization stage, to be exact).
|
||||
/// Thus for all intents and purposes the effect of the session change is observed at the
|
||||
/// block following the session change, block number of which we save in this storage value.
|
||||
SessionStartBlock get(fn session_start_block): T::BlockNumber;
|
||||
/// Currently scheduled cores - free but up to be occupied.
|
||||
///
|
||||
/// Bounded by the number of cores: one for each parachain and parathread multiplexer.
|
||||
///
|
||||
/// The value contained here will not be valid after the end of a block. Runtime APIs should be used to determine scheduled cores/
|
||||
/// for the upcoming block.
|
||||
Scheduled get(fn scheduled): Vec<CoreAssignment>; // sorted ascending by CoreIndex.
|
||||
}
|
||||
}
|
||||
|
||||
decl_error! {
|
||||
pub enum Error for Module<T: Config> { }
|
||||
}
|
||||
|
||||
decl_module! {
|
||||
/// The scheduler module.
|
||||
pub struct Module<T: Config> for enum Call where origin: <T as frame_system::Config>::Origin {
|
||||
type Error = Error<T>;
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Config> Module<T> {
|
||||
impl<T: Config> Pallet<T> {
|
||||
/// Called by the initializer to initialize the scheduler module.
|
||||
pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight {
|
||||
0
|
||||
@@ -227,7 +236,7 @@ impl<T: Config> Module<T> {
|
||||
} = notification;
|
||||
let config = new_config;
|
||||
|
||||
let mut thread_queue = ParathreadQueue::get();
|
||||
let mut thread_queue = ParathreadQueue::<T>::get();
|
||||
let n_parachains = <paras::Pallet<T>>::parachains().len() as u32;
|
||||
let n_cores = core::cmp::max(
|
||||
n_parachains + config.parathread_cores,
|
||||
@@ -237,7 +246,7 @@ impl<T: Config> Module<T> {
|
||||
},
|
||||
);
|
||||
|
||||
AvailabilityCores::mutate(|cores| {
|
||||
AvailabilityCores::<T>::mutate(|cores| {
|
||||
// clear all occupied cores.
|
||||
for maybe_occupied in cores.iter_mut() {
|
||||
if let Some(CoreOccupied::Parathread(claim)) = maybe_occupied.take() {
|
||||
@@ -255,7 +264,7 @@ impl<T: Config> Module<T> {
|
||||
|
||||
// shuffle validators into groups.
|
||||
if n_cores == 0 || validators.is_empty() {
|
||||
ValidatorGroups::set(Vec::new());
|
||||
ValidatorGroups::<T>::set(Vec::new());
|
||||
} else {
|
||||
let group_base_size = validators.len() / n_cores as usize;
|
||||
let n_larger_groups = validators.len() % n_cores as usize;
|
||||
@@ -278,12 +287,12 @@ impl<T: Config> Module<T> {
|
||||
);
|
||||
}
|
||||
|
||||
ValidatorGroups::set(groups);
|
||||
ValidatorGroups::<T>::set(groups);
|
||||
}
|
||||
|
||||
// prune out all parathread claims with too many retries.
|
||||
// assign all non-pruned claims to new cores, if they've changed.
|
||||
ParathreadClaimIndex::mutate(|claim_index| {
|
||||
ParathreadClaimIndex::<T>::mutate(|claim_index| {
|
||||
// wipe all parathread metadata if no parathread cores are configured.
|
||||
if config.parathread_cores == 0 {
|
||||
thread_queue = ParathreadClaimQueue {
|
||||
@@ -321,7 +330,7 @@ impl<T: Config> Module<T> {
|
||||
((thread_queue.queue.len()) as u32) % config.parathread_cores;
|
||||
}
|
||||
});
|
||||
ParathreadQueue::set(thread_queue);
|
||||
ParathreadQueue::<T>::set(thread_queue);
|
||||
|
||||
let now = <frame_system::Pallet<T>>::block_number() + One::one();
|
||||
<SessionStartBlock<T>>::set(now);
|
||||
@@ -338,12 +347,12 @@ impl<T: Config> Module<T> {
|
||||
let config = <configuration::Pallet<T>>::config();
|
||||
let queue_max_size = config.parathread_cores * config.scheduling_lookahead;
|
||||
|
||||
ParathreadQueue::mutate(|queue| {
|
||||
ParathreadQueue::<T>::mutate(|queue| {
|
||||
if queue.queue.len() >= queue_max_size as usize { return }
|
||||
|
||||
let para_id = claim.0;
|
||||
|
||||
let competes_with_another = ParathreadClaimIndex::mutate(|index| {
|
||||
let competes_with_another = ParathreadClaimIndex::<T>::mutate(|index| {
|
||||
match index.binary_search(¶_id) {
|
||||
Ok(_) => true,
|
||||
Err(i) => {
|
||||
@@ -367,7 +376,7 @@ impl<T: Config> Module<T> {
|
||||
just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>,
|
||||
now: T::BlockNumber,
|
||||
) {
|
||||
let mut cores = AvailabilityCores::get();
|
||||
let mut cores = AvailabilityCores::<T>::get();
|
||||
let config = <configuration::Pallet<T>>::config();
|
||||
|
||||
for (freed_index, freed_reason) in just_freed_cores {
|
||||
@@ -380,7 +389,7 @@ impl<T: Config> Module<T> {
|
||||
FreedReason::Concluded => {
|
||||
// After a parathread candidate has successfully been included,
|
||||
// open it up for further claims!
|
||||
ParathreadClaimIndex::mutate(|index| {
|
||||
ParathreadClaimIndex::<T>::mutate(|index| {
|
||||
if let Ok(i) = index.binary_search(&entry.claim.0) {
|
||||
index.remove(i);
|
||||
}
|
||||
@@ -389,7 +398,7 @@ impl<T: Config> Module<T> {
|
||||
FreedReason::TimedOut => {
|
||||
// If a parathread candidate times out, it's not the collator's fault,
|
||||
// so we don't increment retries.
|
||||
ParathreadQueue::mutate(|queue| {
|
||||
ParathreadQueue::<T>::mutate(|queue| {
|
||||
queue.enqueue_entry(entry, config.parathread_cores);
|
||||
})
|
||||
}
|
||||
@@ -400,10 +409,10 @@ impl<T: Config> Module<T> {
|
||||
}
|
||||
|
||||
let parachains = <paras::Pallet<T>>::parachains();
|
||||
let mut scheduled = Scheduled::get();
|
||||
let mut parathread_queue = ParathreadQueue::get();
|
||||
let mut scheduled = Scheduled::<T>::get();
|
||||
let mut parathread_queue = ParathreadQueue::<T>::get();
|
||||
|
||||
if ValidatorGroups::get().is_empty() { return }
|
||||
if ValidatorGroups::<T>::get().is_empty() { return }
|
||||
|
||||
{
|
||||
let mut prev_scheduled_in_order = scheduled.iter().enumerate().peekable();
|
||||
@@ -491,9 +500,9 @@ impl<T: Config> Module<T> {
|
||||
// insertions.
|
||||
}
|
||||
|
||||
Scheduled::set(scheduled);
|
||||
ParathreadQueue::set(parathread_queue);
|
||||
AvailabilityCores::set(cores);
|
||||
Scheduled::<T>::set(scheduled);
|
||||
ParathreadQueue::<T>::set(parathread_queue);
|
||||
AvailabilityCores::<T>::set(cores);
|
||||
}
|
||||
|
||||
/// Note that the given cores have become occupied. Behavior undefined if any of the given cores were not scheduled
|
||||
@@ -504,8 +513,8 @@ impl<T: Config> Module<T> {
|
||||
pub(crate) fn occupied(now_occupied: &[CoreIndex]) {
|
||||
if now_occupied.is_empty() { return }
|
||||
|
||||
let mut availability_cores = AvailabilityCores::get();
|
||||
Scheduled::mutate(|scheduled| {
|
||||
let mut availability_cores = AvailabilityCores::<T>::get();
|
||||
Scheduled::<T>::mutate(|scheduled| {
|
||||
// The constraints on the function require that now_occupied is a sorted subset of the
|
||||
// `scheduled` cores, which are also sorted.
|
||||
|
||||
@@ -527,13 +536,13 @@ impl<T: Config> Module<T> {
|
||||
})
|
||||
});
|
||||
|
||||
AvailabilityCores::set(availability_cores);
|
||||
AvailabilityCores::<T>::set(availability_cores);
|
||||
}
|
||||
|
||||
/// Get the para (chain or thread) ID assigned to a particular core or index, if any. Core indices
|
||||
/// out of bounds will return `None`, as will indices of unassigned cores.
|
||||
pub(crate) fn core_para(core_index: CoreIndex) -> Option<ParaId> {
|
||||
let cores = AvailabilityCores::get();
|
||||
let cores = AvailabilityCores::<T>::get();
|
||||
match cores.get(core_index.0 as usize).and_then(|c| c.as_ref()) {
|
||||
None => None,
|
||||
Some(CoreOccupied::Parachain) => {
|
||||
@@ -546,7 +555,7 @@ impl<T: Config> Module<T> {
|
||||
|
||||
/// Get the validators in the given group, if the group index is valid for this session.
|
||||
pub(crate) fn group_validators(group_index: GroupIndex) -> Option<Vec<ValidatorIndex>> {
|
||||
ValidatorGroups::get().get(group_index.0 as usize).map(|g| g.clone())
|
||||
ValidatorGroups::<T>::get().get(group_index.0 as usize).map(|g| g.clone())
|
||||
}
|
||||
|
||||
/// Get the group assigned to a specific core by index at the current block number. Result undefined if the core index is unknown
|
||||
@@ -557,7 +566,7 @@ impl<T: Config> Module<T> {
|
||||
|
||||
if at < session_start_block { return None }
|
||||
|
||||
let validator_groups = ValidatorGroups::get();
|
||||
let validator_groups = ValidatorGroups::<T>::get();
|
||||
|
||||
if core.0 as usize >= validator_groups.len() { return None }
|
||||
|
||||
@@ -599,7 +608,7 @@ impl<T: Config> Module<T> {
|
||||
config.thread_availability_period,
|
||||
);
|
||||
|
||||
let availability_cores = AvailabilityCores::get();
|
||||
let availability_cores = AvailabilityCores::<T>::get();
|
||||
|
||||
if blocks_since_last_rotation >= absolute_cutoff {
|
||||
None
|
||||
@@ -654,7 +663,7 @@ impl<T: Config> Module<T> {
|
||||
collator: None,
|
||||
})
|
||||
} else {
|
||||
let queue = ParathreadQueue::get();
|
||||
let queue = ParathreadQueue::<T>::get();
|
||||
let core_offset = (core.0 as usize - parachains.len()) as u32;
|
||||
queue.get_next_on_core(core_offset).map(|entry| ScheduledCore {
|
||||
para_id: entry.claim.0,
|
||||
@@ -678,7 +687,7 @@ impl<T: Config> Module<T> {
|
||||
collator: None,
|
||||
})
|
||||
} else {
|
||||
let queue = ParathreadQueue::get();
|
||||
let queue = ParathreadQueue::<T>::get();
|
||||
|
||||
// This is the next scheduled para on this core.
|
||||
let core_offset = (core.0 as usize - parachains.len()) as u32;
|
||||
@@ -690,7 +699,7 @@ impl<T: Config> Module<T> {
|
||||
.or_else(|| {
|
||||
// Or, if none, the claim currently occupying the core,
|
||||
// as it would be put back on the queue after timing out.
|
||||
let cores = AvailabilityCores::get();
|
||||
let cores = AvailabilityCores::<T>::get();
|
||||
cores.get(core.0 as usize).and_then(|c| c.as_ref()).and_then(|o| {
|
||||
match o {
|
||||
CoreOccupied::Parathread(entry) => {
|
||||
@@ -708,9 +717,10 @@ impl<T: Config> Module<T> {
|
||||
|
||||
// Free all scheduled cores and return parathread claims to queue, with retries incremented.
|
||||
pub(crate) fn clear() {
|
||||
|
||||
let config = <configuration::Pallet<T>>::config();
|
||||
ParathreadQueue::mutate(|queue| {
|
||||
for core_assignment in Scheduled::take() {
|
||||
ParathreadQueue::<T>::mutate(|queue| {
|
||||
for core_assignment in Scheduled::<T>::take() {
|
||||
if let AssignmentKind::Parathread(collator, retries) = core_assignment.kind {
|
||||
if !<paras::Pallet<T>>::is_parathread(core_assignment.para_id) { continue }
|
||||
|
||||
@@ -734,13 +744,10 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
use primitives::v1::{BlockNumber, ValidatorId, CollatorId, SessionIndex};
|
||||
use frame_support::{
|
||||
assert_ok,
|
||||
traits::{OnFinalize, OnInitialize},
|
||||
};
|
||||
use frame_support::assert_ok;
|
||||
use keyring::Sr25519Keyring;
|
||||
|
||||
use crate::mock::{new_test_ext, Configuration, Paras, ParasShared, System, Scheduler, MockGenesisConfig};
|
||||
use crate::mock::{new_test_ext, Configuration, Paras, ParasShared, System, Scheduler, MockGenesisConfig, Test};
|
||||
use crate::initializer::SessionChangeNotification;
|
||||
use crate::configuration::HostConfiguration;
|
||||
use crate::paras::ParaGenesisArgs;
|
||||
@@ -840,7 +847,7 @@ mod tests {
|
||||
|
||||
{
|
||||
Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator.clone()));
|
||||
let queue = ParathreadQueue::get();
|
||||
let queue = ParathreadQueue::<Test>::get();
|
||||
assert_eq!(queue.next_core_offset, 1);
|
||||
assert_eq!(queue.queue.len(), 1);
|
||||
assert_eq!(queue.queue[0], QueuedParathread {
|
||||
@@ -856,7 +863,7 @@ mod tests {
|
||||
{
|
||||
let collator2 = CollatorId::from(Sr25519Keyring::Bob.public());
|
||||
Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator2.clone()));
|
||||
let queue = ParathreadQueue::get();
|
||||
let queue = ParathreadQueue::<Test>::get();
|
||||
assert_eq!(queue.next_core_offset, 1);
|
||||
assert_eq!(queue.queue.len(), 1);
|
||||
assert_eq!(queue.queue[0], QueuedParathread {
|
||||
@@ -872,7 +879,7 @@ mod tests {
|
||||
{
|
||||
let thread_id2 = ParaId::from(11);
|
||||
Scheduler::add_parathread_claim(ParathreadClaim(thread_id2, collator.clone()));
|
||||
let queue = ParathreadQueue::get();
|
||||
let queue = ParathreadQueue::<Test>::get();
|
||||
assert_eq!(queue.next_core_offset, 1);
|
||||
assert_eq!(queue.queue.len(), 1);
|
||||
assert_eq!(queue.queue[0], QueuedParathread {
|
||||
@@ -914,7 +921,7 @@ mod tests {
|
||||
assert!(Paras::is_parathread(thread_id));
|
||||
|
||||
Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator.clone()));
|
||||
assert_eq!(ParathreadQueue::get(), Default::default());
|
||||
assert_eq!(ParathreadQueue::<Test>::get(), Default::default());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -947,7 +954,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// set up a queue as if n_cores was 4 and with some with many retries.
|
||||
ParathreadQueue::put({
|
||||
ParathreadQueue::<Test>::put({
|
||||
let mut queue = ParathreadClaimQueue::default();
|
||||
|
||||
// Will be pruned: too many retries.
|
||||
@@ -977,7 +984,7 @@ mod tests {
|
||||
queue
|
||||
});
|
||||
|
||||
ParathreadClaimIndex::put(vec![thread_a, thread_b, thread_c, thread_d]);
|
||||
ParathreadClaimIndex::<Test>::put(vec![thread_a, thread_b, thread_c, thread_d]);
|
||||
|
||||
run_to_block(
|
||||
10,
|
||||
@@ -991,7 +998,7 @@ mod tests {
|
||||
);
|
||||
assert_eq!(Configuration::config(), default_config());
|
||||
|
||||
let queue = ParathreadQueue::get();
|
||||
let queue = ParathreadQueue::<Test>::get();
|
||||
assert_eq!(
|
||||
queue.queue,
|
||||
vec![
|
||||
@@ -1013,7 +1020,7 @@ mod tests {
|
||||
);
|
||||
assert_eq!(queue.next_core_offset, 2);
|
||||
|
||||
assert_eq!(ParathreadClaimIndex::get(), vec![thread_b, thread_c]);
|
||||
assert_eq!(ParathreadClaimIndex::<Test>::get(), vec![thread_b, thread_c]);
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1055,7 +1062,7 @@ mod tests {
|
||||
_ => None,
|
||||
});
|
||||
|
||||
let groups = ValidatorGroups::get();
|
||||
let groups = ValidatorGroups::<Test>::get();
|
||||
assert_eq!(groups.len(), 5);
|
||||
|
||||
// first two groups have the overflow.
|
||||
@@ -1115,7 +1122,7 @@ mod tests {
|
||||
_ => None,
|
||||
});
|
||||
|
||||
let groups = ValidatorGroups::get();
|
||||
let groups = ValidatorGroups::<Test>::get();
|
||||
assert_eq!(groups.len(), 7);
|
||||
|
||||
// Every validator gets its own group, even though there are 2 paras.
|
||||
@@ -1295,7 +1302,7 @@ mod tests {
|
||||
Scheduler::occupied(&[CoreIndex(0), CoreIndex(1), CoreIndex(2), CoreIndex(3)]);
|
||||
|
||||
{
|
||||
let cores = AvailabilityCores::get();
|
||||
let cores = AvailabilityCores::<Test>::get();
|
||||
|
||||
assert!(cores[0].is_some());
|
||||
assert!(cores[1].is_some());
|
||||
@@ -1371,8 +1378,8 @@ mod tests {
|
||||
|
||||
// the prior claim on thread A concluded, but the claim on thread C was marked as
|
||||
// timed out.
|
||||
let index = ParathreadClaimIndex::get();
|
||||
let parathread_queue = ParathreadQueue::get();
|
||||
let index = ParathreadClaimIndex::<Test>::get();
|
||||
let parathread_queue = ParathreadQueue::<Test>::get();
|
||||
|
||||
// thread A claim should have been wiped, but thread C claim should remain.
|
||||
assert_eq!(index, vec![thread_b, thread_c, thread_d, thread_e]);
|
||||
@@ -1436,7 +1443,7 @@ mod tests {
|
||||
Scheduler::occupied(&[CoreIndex(0), CoreIndex(1), CoreIndex(2)]);
|
||||
|
||||
{
|
||||
let cores = AvailabilityCores::get();
|
||||
let cores = AvailabilityCores::<Test>::get();
|
||||
|
||||
assert!(cores[0].is_some());
|
||||
assert!(cores[1].is_some());
|
||||
@@ -1474,7 +1481,7 @@ mod tests {
|
||||
});
|
||||
|
||||
// The freed cores should be `None` in `AvailabilityCores`.
|
||||
let cores = AvailabilityCores::get();
|
||||
let cores = AvailabilityCores::<Test>::get();
|
||||
assert!(cores[0].is_none());
|
||||
assert!(cores[2].is_none());
|
||||
}
|
||||
@@ -1661,7 +1668,7 @@ mod tests {
|
||||
|
||||
// assign some availability cores.
|
||||
{
|
||||
AvailabilityCores::mutate(|cores| {
|
||||
AvailabilityCores::<Test>::mutate(|cores| {
|
||||
cores[0] = Some(CoreOccupied::Parachain);
|
||||
cores[1] = Some(CoreOccupied::Parathread(ParathreadEntry {
|
||||
claim: ParathreadClaim(thread_a, collator),
|
||||
@@ -1681,7 +1688,7 @@ mod tests {
|
||||
|
||||
let now = System::block_number();
|
||||
let would_be_timed_out = now - thread_availability_period;
|
||||
for i in 0..AvailabilityCores::get().len() {
|
||||
for i in 0..AvailabilityCores::<Test>::get().len() {
|
||||
// returns true for unoccupied cores.
|
||||
// And can time out both threads and chains at this stage.
|
||||
assert!(pred(CoreIndex(i as u32), would_be_timed_out));
|
||||
@@ -1776,7 +1783,7 @@ mod tests {
|
||||
|
||||
Scheduler::add_parathread_claim(thread_claim_b);
|
||||
|
||||
let queue = ParathreadQueue::get();
|
||||
let queue = ParathreadQueue::<Test>::get();
|
||||
assert_eq!(
|
||||
queue.get_next_on_core(0).unwrap().claim,
|
||||
ParathreadClaim(thread_b, collator.clone()),
|
||||
@@ -1847,7 +1854,7 @@ mod tests {
|
||||
_ => panic!("with no chains, only core should be a thread core"),
|
||||
}
|
||||
|
||||
let queue = ParathreadQueue::get();
|
||||
let queue = ParathreadQueue::<Test>::get();
|
||||
assert!(queue.get_next_on_core(0).is_none());
|
||||
assert_eq!(
|
||||
Scheduler::next_up_on_time_out(CoreIndex(0)).unwrap(),
|
||||
@@ -1859,7 +1866,7 @@ mod tests {
|
||||
|
||||
Scheduler::add_parathread_claim(thread_claim_b);
|
||||
|
||||
let queue = ParathreadQueue::get();
|
||||
let queue = ParathreadQueue::<Test>::get();
|
||||
assert_eq!(
|
||||
queue.get_next_on_core(0).unwrap().claim,
|
||||
ParathreadClaim(thread_b, collator.clone()),
|
||||
@@ -2031,7 +2038,7 @@ mod tests {
|
||||
|
||||
assert_eq!(Scheduler::scheduled().len(), 2);
|
||||
|
||||
let groups = ValidatorGroups::get();
|
||||
let groups = ValidatorGroups::<Test>::get();
|
||||
assert_eq!(groups.len(), 5);
|
||||
|
||||
assert_ok!(Paras::schedule_para_cleanup(chain_b));
|
||||
|
||||
@@ -93,8 +93,8 @@ impl<T: Config> Pallet<T> {
|
||||
let assignment_keys = AssignmentKeysUnsafe::<T>::get();
|
||||
let active_set = <shared::Pallet<T>>::active_validator_indices();
|
||||
|
||||
let validator_groups = <scheduler::Module<T>>::validator_groups();
|
||||
let n_cores = <scheduler::Module<T>>::availability_cores().len() as u32;
|
||||
let validator_groups = <scheduler::Pallet<T>>::validator_groups();
|
||||
let n_cores = <scheduler::Pallet<T>>::availability_cores().len() as u32;
|
||||
let zeroth_delay_tranche_width = config.zeroth_delay_tranche_width;
|
||||
let relay_vrf_modulo_samples = config.relay_vrf_modulo_samples;
|
||||
let n_delay_tranches = config.n_delay_tranches;
|
||||
|
||||
@@ -226,7 +226,7 @@ construct_runtime! {
|
||||
ParasShared: parachains_shared::{Pallet, Call, Storage},
|
||||
ParaInclusion: parachains_inclusion::{Pallet, Call, Storage, Event<T>},
|
||||
ParasInherent: parachains_paras_inherent::{Pallet, Call, Storage, Inherent},
|
||||
Scheduler: parachains_scheduler::{Pallet, Call, Storage},
|
||||
ParaScheduler: parachains_scheduler::{Pallet, Storage},
|
||||
Paras: parachains_paras::{Pallet, Call, Storage, Event, Config},
|
||||
Initializer: parachains_initializer::{Pallet, Call, Storage},
|
||||
Dmp: parachains_dmp::{Pallet, Call, Storage},
|
||||
|
||||
@@ -541,7 +541,7 @@ construct_runtime! {
|
||||
Initializer: parachains_initializer::{Pallet, Call, Storage},
|
||||
Paras: parachains_paras::{Pallet, Call, Storage, Origin, Event},
|
||||
ParasShared: parachains_shared::{Pallet, Call, Storage},
|
||||
Scheduler: parachains_scheduler::{Pallet, Call, Storage},
|
||||
Scheduler: parachains_scheduler::{Pallet, Storage},
|
||||
ParasSudoWrapper: paras_sudo_wrapper::{Pallet, Call},
|
||||
ParaSessionInfo: parachains_session_info::{Pallet, Storage},
|
||||
Hrmp: parachains_hrmp::{Pallet, Call, Storage, Event<T>},
|
||||
|
||||
@@ -1067,7 +1067,7 @@ construct_runtime! {
|
||||
ParasShared: parachains_shared::{Pallet, Call, Storage} = 43,
|
||||
ParaInclusion: parachains_inclusion::{Pallet, Call, Storage, Event<T>} = 44,
|
||||
ParasInherent: parachains_paras_inherent::{Pallet, Call, Storage, Inherent} = 45,
|
||||
ParasScheduler: parachains_scheduler::{Pallet, Call, Storage} = 46,
|
||||
ParaScheduler: parachains_scheduler::{Pallet, Storage} = 46,
|
||||
Paras: parachains_paras::{Pallet, Call, Storage, Event, Config} = 47,
|
||||
Initializer: parachains_initializer::{Pallet, Call, Storage} = 48,
|
||||
Dmp: parachains_dmp::{Pallet, Call, Storage} = 49,
|
||||
|
||||
Reference in New Issue
Block a user