mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 14:01:02 +00:00
Implement Runtime APIs (#1411)
* create a README on Runtime APIs * add ParaId type * write up runtime APIs * more preamble * rename * rejig runtime APIs * add occupied_since to `BlockNumber` * skeleton crate for runtime API subsystem * improve group_for_core * improve docs on availability cores runtime API * guide: freed -> free * add primitives for runtime APIs * create a v1 ParachainHost API trait * guide: make validation code return `Option`al. * skeleton runtime API helpers * make parachain-host runtime-generic * skeleton for most runtime API implementation functions * guide: add runtime API helper methods * implement new helpers of the inclusion module * guide: remove retries check, as it is unneeded * implement helpers for scheduler module for Runtime APIs * clean up `validator_groups` implementation * implement next_rotation_at and last_rotation_at * guide: more helpers on GroupRotationInfo * almost finish implementing runtime APIs * add explicit block parameter to runtime API fns * guide: generalize number parameter * guide: add group_responsible to occupied-core * update primitives due to guide changes * finishing touches on runtime API implementation; squash warnings * break out runtime API impl to separate file * add tests for next_up logic * test group rotation info * point to filed TODO * remove unused TODO [now] * indentation * guide: para -> para_id * rename para field to para_id for core meta * remove reference to outdated AvailabilityCores type * add an event in `inclusion` for candidates being included or timing out * guide: candidate events * guide: adjust language * Candidate events type from guide and adjust inclusion event * implement `candidate_events` runtime API * fix runtime test compilation * max -> min * fix typos * guide: add `RuntimeAPIRequest::CandidateEvents`
This commit is contained in:
committed by
GitHub
parent
5624bd8bf4
commit
dddde219a2
@@ -39,7 +39,7 @@ use sp_std::prelude::*;
|
||||
use sp_std::convert::TryInto;
|
||||
use primitives::v1::{
|
||||
Id as ParaId, ValidatorIndex, CoreAssignment, CoreOccupied, CoreIndex, AssignmentKind,
|
||||
GroupIndex, ParathreadClaim, ParathreadEntry,
|
||||
GroupIndex, ParathreadClaim, ParathreadEntry, GroupRotationInfo, ScheduledCore,
|
||||
};
|
||||
use frame_support::{
|
||||
decl_storage, decl_module, decl_error,
|
||||
@@ -84,11 +84,17 @@ impl ParathreadClaimQueue {
|
||||
})
|
||||
}
|
||||
|
||||
// Take next queued entry with given core offset, if any.
|
||||
/// Take next queued entry with given core offset, if any.
|
||||
fn take_next_on_core(&mut self, core_offset: u32) -> Option<ParathreadEntry> {
|
||||
let pos = self.queue.iter().position(|queued| queued.core_offset == core_offset);
|
||||
pos.map(|i| self.queue.remove(i).claim)
|
||||
}
|
||||
|
||||
/// Get the next queued entry with given core offset, if any.
|
||||
fn get_next_on_core(&self, core_offset: u32) -> Option<&ParathreadEntry> {
|
||||
let pos = self.queue.iter().position(|queued| queued.core_offset == core_offset);
|
||||
pos.map(|i| &self.queue[i].claim)
|
||||
}
|
||||
}
|
||||
|
||||
/// Reasons a core might be freed
|
||||
@@ -107,7 +113,7 @@ decl_storage! {
|
||||
///
|
||||
/// Bound: The number of cores is the sum of the numbers of parachains and parathread multiplexers.
|
||||
/// Reasonably, 100-1000. The dominant factor is the number of validators: safe upper bound at 10k.
|
||||
ValidatorGroups: Vec<Vec<ValidatorIndex>>;
|
||||
ValidatorGroups get(fn validator_groups): Vec<Vec<ValidatorIndex>>;
|
||||
|
||||
/// A queue of upcoming claims and which core they should be mapped onto.
|
||||
///
|
||||
@@ -120,14 +126,14 @@ decl_storage! {
|
||||
/// parathread-multiplexers.
|
||||
///
|
||||
/// Bounded by the number of cores: one for each parachain and parathread multiplexer.
|
||||
AvailabilityCores: Vec<Option<CoreOccupied>>;
|
||||
AvailabilityCores get(fn availability_cores): Vec<Option<CoreOccupied>>;
|
||||
/// An index used to ensure that only one claim on a parathread exists in the queue or is
|
||||
/// currently being handled by an occupied core.
|
||||
///
|
||||
/// Bounded by the number of parathread cores and scheduling lookahead. Reasonably, 10 * 50 = 500.
|
||||
ParathreadClaimIndex: Vec<ParaId>;
|
||||
/// The block number where the session start occurred. Used to track how many group rotations have occurred.
|
||||
SessionStartBlock: T::BlockNumber;
|
||||
SessionStartBlock get(fn session_start_block): T::BlockNumber;
|
||||
/// Currently scheduled cores - free but up to be occupied. Ephemeral storage item that's wiped on finalization.
|
||||
///
|
||||
/// Bounded by the number of cores: one for each parachain and parathread multiplexer.
|
||||
@@ -578,6 +584,86 @@ impl<T: Trait> Module<T> {
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a helper for determining group rotation.
|
||||
pub(crate) fn group_rotation_info() -> GroupRotationInfo<T::BlockNumber> {
|
||||
let session_start_block = Self::session_start_block();
|
||||
let now = <system::Module<T>>::block_number();
|
||||
let group_rotation_frequency = <configuration::Module<T>>::config()
|
||||
.parachain_rotation_frequency;
|
||||
|
||||
GroupRotationInfo {
|
||||
session_start_block,
|
||||
now,
|
||||
group_rotation_frequency,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the next thing that will be scheduled on this core assuming it is currently
|
||||
/// occupied and the candidate occupying it became available.
|
||||
///
|
||||
/// For parachains, this is always the ID of the parachain and no specified collator.
|
||||
/// For parathreads, this is based on the next item in the ParathreadQueue assigned to that
|
||||
/// core, and is None if there isn't one.
|
||||
pub(crate) fn next_up_on_available(core: CoreIndex) -> Option<ScheduledCore> {
|
||||
let parachains = <paras::Module<T>>::parachains();
|
||||
if (core.0 as usize) < parachains.len() {
|
||||
Some(ScheduledCore {
|
||||
para_id: parachains[core.0 as usize],
|
||||
collator: None,
|
||||
})
|
||||
} else {
|
||||
let queue = ParathreadQueue::get();
|
||||
let core_offset = (core.0 as usize - parachains.len()) as u32;
|
||||
queue.get_next_on_core(core_offset).map(|entry| ScheduledCore {
|
||||
para_id: entry.claim.0,
|
||||
collator: Some(entry.claim.1.clone()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the next thing that will be scheduled on this core assuming it is currently
|
||||
/// occupied and the candidate occupying it became available.
|
||||
///
|
||||
/// For parachains, this is always the ID of the parachain and no specified collator.
|
||||
/// For parathreads, this is based on the next item in the ParathreadQueue assigned to that
|
||||
/// core, or if there isn't one, the claim that is currently occupying the core, as long
|
||||
/// as the claim's retries would not exceed the limit. Otherwise None.
|
||||
pub(crate) fn next_up_on_time_out(core: CoreIndex) -> Option<ScheduledCore> {
|
||||
let parachains = <paras::Module<T>>::parachains();
|
||||
if (core.0 as usize) < parachains.len() {
|
||||
Some(ScheduledCore {
|
||||
para_id: parachains[core.0 as usize],
|
||||
collator: None,
|
||||
})
|
||||
} else {
|
||||
let queue = ParathreadQueue::get();
|
||||
|
||||
// This is the next scheduled para on this core.
|
||||
let core_offset = (core.0 as usize - parachains.len()) as u32;
|
||||
queue.get_next_on_core(core_offset)
|
||||
.map(|entry| ScheduledCore {
|
||||
para_id: entry.claim.0,
|
||||
collator: Some(entry.claim.1.clone()),
|
||||
})
|
||||
.or_else(|| {
|
||||
// Or, if none, the claim currently occupying the core,
|
||||
// as it would be put back on the queue after timing out.
|
||||
let cores = AvailabilityCores::get();
|
||||
cores.get(core.0 as usize).and_then(|c| c.as_ref()).and_then(|o| {
|
||||
match o {
|
||||
CoreOccupied::Parathread(entry) => {
|
||||
Some(ScheduledCore {
|
||||
para_id: entry.claim.0,
|
||||
collator: Some(entry.claim.1.clone()),
|
||||
})
|
||||
}
|
||||
CoreOccupied::Parachain => None, // defensive; not possible.
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -1440,4 +1526,301 @@ mod tests {
|
||||
assert!(Scheduler::availability_timeout_predicate().is_none());
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_up_on_available_uses_next_scheduled_or_none_for_thread() {
|
||||
let mut config = default_config();
|
||||
config.parathread_cores = 1;
|
||||
|
||||
let genesis_config = MockGenesisConfig {
|
||||
configuration: crate::configuration::GenesisConfig {
|
||||
config: config.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let thread_a = ParaId::from(1);
|
||||
let thread_b = ParaId::from(2);
|
||||
|
||||
let collator = CollatorId::from(Sr25519Keyring::Alice.public());
|
||||
|
||||
let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs {
|
||||
genesis_head: Vec::new().into(),
|
||||
validation_code: Vec::new().into(),
|
||||
parachain: is_chain,
|
||||
});
|
||||
|
||||
new_test_ext(genesis_config).execute_with(|| {
|
||||
schedule_blank_para(thread_a, false);
|
||||
schedule_blank_para(thread_b, false);
|
||||
|
||||
// start a new session to activate, 5 validators for 5 cores.
|
||||
run_to_block(1, |number| match number {
|
||||
1 => Some(SessionChangeNotification {
|
||||
new_config: config.clone(),
|
||||
validators: vec![
|
||||
ValidatorId::from(Sr25519Keyring::Alice.public()),
|
||||
ValidatorId::from(Sr25519Keyring::Eve.public()),
|
||||
],
|
||||
..Default::default()
|
||||
}),
|
||||
_ => None,
|
||||
});
|
||||
|
||||
let thread_claim_a = ParathreadClaim(thread_a, collator.clone());
|
||||
let thread_claim_b = ParathreadClaim(thread_b, collator.clone());
|
||||
|
||||
Scheduler::add_parathread_claim(thread_claim_a.clone());
|
||||
|
||||
run_to_block(2, |_| None);
|
||||
|
||||
{
|
||||
assert_eq!(Scheduler::scheduled().len(), 1);
|
||||
assert_eq!(Scheduler::availability_cores().len(), 1);
|
||||
|
||||
Scheduler::occupied(&[CoreIndex(0)]);
|
||||
|
||||
let cores = Scheduler::availability_cores();
|
||||
match cores[0].as_ref().unwrap() {
|
||||
CoreOccupied::Parathread(entry) => assert_eq!(entry.claim, thread_claim_a),
|
||||
_ => panic!("with no chains, only core should be a thread core"),
|
||||
}
|
||||
|
||||
assert!(Scheduler::next_up_on_available(CoreIndex(0)).is_none());
|
||||
|
||||
Scheduler::add_parathread_claim(thread_claim_b);
|
||||
|
||||
let queue = ParathreadQueue::get();
|
||||
assert_eq!(
|
||||
queue.get_next_on_core(0).unwrap().claim,
|
||||
ParathreadClaim(thread_b, collator.clone()),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
Scheduler::next_up_on_available(CoreIndex(0)).unwrap(),
|
||||
ScheduledCore {
|
||||
para_id: thread_b,
|
||||
collator: Some(collator.clone()),
|
||||
}
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_up_on_time_out_reuses_claim_if_nothing_queued() {
|
||||
let mut config = default_config();
|
||||
config.parathread_cores = 1;
|
||||
|
||||
let genesis_config = MockGenesisConfig {
|
||||
configuration: crate::configuration::GenesisConfig {
|
||||
config: config.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let thread_a = ParaId::from(1);
|
||||
let thread_b = ParaId::from(2);
|
||||
|
||||
let collator = CollatorId::from(Sr25519Keyring::Alice.public());
|
||||
|
||||
let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs {
|
||||
genesis_head: Vec::new().into(),
|
||||
validation_code: Vec::new().into(),
|
||||
parachain: is_chain,
|
||||
});
|
||||
|
||||
new_test_ext(genesis_config).execute_with(|| {
|
||||
schedule_blank_para(thread_a, false);
|
||||
schedule_blank_para(thread_b, false);
|
||||
|
||||
// start a new session to activate, 5 validators for 5 cores.
|
||||
run_to_block(1, |number| match number {
|
||||
1 => Some(SessionChangeNotification {
|
||||
new_config: config.clone(),
|
||||
validators: vec![
|
||||
ValidatorId::from(Sr25519Keyring::Alice.public()),
|
||||
ValidatorId::from(Sr25519Keyring::Eve.public()),
|
||||
],
|
||||
..Default::default()
|
||||
}),
|
||||
_ => None,
|
||||
});
|
||||
|
||||
let thread_claim_a = ParathreadClaim(thread_a, collator.clone());
|
||||
let thread_claim_b = ParathreadClaim(thread_b, collator.clone());
|
||||
|
||||
Scheduler::add_parathread_claim(thread_claim_a.clone());
|
||||
|
||||
run_to_block(2, |_| None);
|
||||
|
||||
{
|
||||
assert_eq!(Scheduler::scheduled().len(), 1);
|
||||
assert_eq!(Scheduler::availability_cores().len(), 1);
|
||||
|
||||
Scheduler::occupied(&[CoreIndex(0)]);
|
||||
|
||||
let cores = Scheduler::availability_cores();
|
||||
match cores[0].as_ref().unwrap() {
|
||||
CoreOccupied::Parathread(entry) => assert_eq!(entry.claim, thread_claim_a),
|
||||
_ => panic!("with no chains, only core should be a thread core"),
|
||||
}
|
||||
|
||||
let queue = ParathreadQueue::get();
|
||||
assert!(queue.get_next_on_core(0).is_none());
|
||||
assert_eq!(
|
||||
Scheduler::next_up_on_time_out(CoreIndex(0)).unwrap(),
|
||||
ScheduledCore {
|
||||
para_id: thread_a,
|
||||
collator: Some(collator.clone()),
|
||||
}
|
||||
);
|
||||
|
||||
Scheduler::add_parathread_claim(thread_claim_b);
|
||||
|
||||
let queue = ParathreadQueue::get();
|
||||
assert_eq!(
|
||||
queue.get_next_on_core(0).unwrap().claim,
|
||||
ParathreadClaim(thread_b, collator.clone()),
|
||||
);
|
||||
|
||||
// Now that there is an earlier next-up, we use that.
|
||||
assert_eq!(
|
||||
Scheduler::next_up_on_available(CoreIndex(0)).unwrap(),
|
||||
ScheduledCore {
|
||||
para_id: thread_b,
|
||||
collator: Some(collator.clone()),
|
||||
}
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_up_on_available_is_parachain_always() {
|
||||
let mut config = default_config();
|
||||
config.parathread_cores = 0;
|
||||
|
||||
let genesis_config = MockGenesisConfig {
|
||||
configuration: crate::configuration::GenesisConfig {
|
||||
config: config.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let chain_a = ParaId::from(1);
|
||||
|
||||
let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs {
|
||||
genesis_head: Vec::new().into(),
|
||||
validation_code: Vec::new().into(),
|
||||
parachain: is_chain,
|
||||
});
|
||||
|
||||
new_test_ext(genesis_config).execute_with(|| {
|
||||
schedule_blank_para(chain_a, true);
|
||||
|
||||
// start a new session to activate, 5 validators for 5 cores.
|
||||
run_to_block(1, |number| match number {
|
||||
1 => Some(SessionChangeNotification {
|
||||
new_config: config.clone(),
|
||||
validators: vec![
|
||||
ValidatorId::from(Sr25519Keyring::Alice.public()),
|
||||
ValidatorId::from(Sr25519Keyring::Eve.public()),
|
||||
],
|
||||
..Default::default()
|
||||
}),
|
||||
_ => None,
|
||||
});
|
||||
|
||||
run_to_block(2, |_| None);
|
||||
|
||||
{
|
||||
assert_eq!(Scheduler::scheduled().len(), 1);
|
||||
assert_eq!(Scheduler::availability_cores().len(), 1);
|
||||
|
||||
Scheduler::occupied(&[CoreIndex(0)]);
|
||||
|
||||
let cores = Scheduler::availability_cores();
|
||||
match cores[0].as_ref().unwrap() {
|
||||
CoreOccupied::Parachain => {},
|
||||
_ => panic!("with no threads, only core should be a chain core"),
|
||||
}
|
||||
|
||||
// Now that there is an earlier next-up, we use that.
|
||||
assert_eq!(
|
||||
Scheduler::next_up_on_available(CoreIndex(0)).unwrap(),
|
||||
ScheduledCore {
|
||||
para_id: chain_a,
|
||||
collator: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_up_on_time_out_is_parachain_always() {
|
||||
let mut config = default_config();
|
||||
config.parathread_cores = 0;
|
||||
|
||||
let genesis_config = MockGenesisConfig {
|
||||
configuration: crate::configuration::GenesisConfig {
|
||||
config: config.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let chain_a = ParaId::from(1);
|
||||
|
||||
let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs {
|
||||
genesis_head: Vec::new().into(),
|
||||
validation_code: Vec::new().into(),
|
||||
parachain: is_chain,
|
||||
});
|
||||
|
||||
new_test_ext(genesis_config).execute_with(|| {
|
||||
schedule_blank_para(chain_a, true);
|
||||
|
||||
// start a new session to activate, 5 validators for 5 cores.
|
||||
run_to_block(1, |number| match number {
|
||||
1 => Some(SessionChangeNotification {
|
||||
new_config: config.clone(),
|
||||
validators: vec![
|
||||
ValidatorId::from(Sr25519Keyring::Alice.public()),
|
||||
ValidatorId::from(Sr25519Keyring::Eve.public()),
|
||||
],
|
||||
..Default::default()
|
||||
}),
|
||||
_ => None,
|
||||
});
|
||||
|
||||
run_to_block(2, |_| None);
|
||||
|
||||
{
|
||||
assert_eq!(Scheduler::scheduled().len(), 1);
|
||||
assert_eq!(Scheduler::availability_cores().len(), 1);
|
||||
|
||||
Scheduler::occupied(&[CoreIndex(0)]);
|
||||
|
||||
let cores = Scheduler::availability_cores();
|
||||
match cores[0].as_ref().unwrap() {
|
||||
CoreOccupied::Parachain => {},
|
||||
_ => panic!("with no threads, only core should be a chain core"),
|
||||
}
|
||||
|
||||
// Now that there is an earlier next-up, we use that.
|
||||
assert_eq!(
|
||||
Scheduler::next_up_on_available(CoreIndex(0)).unwrap(),
|
||||
ScheduledCore {
|
||||
para_id: chain_a,
|
||||
collator: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user