mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-30 09:37:55 +00:00
implement provisioner (#1473)
* sketch out provisioner basics
* handle provisionable data
* stub out select_inherent_data
* split runtime APIs into sub-chapters to improve linkability
* explain SignedAvailabilityBitfield semantics
* add internal link to further documentation
* some more work figuring out how the provisioner can do its thing
* fix broken link
* don't import enum variants where it's one layer deep
* make request_availability_cores a free fn in util
* document more precisely what should happen on block production
* finish first-draft implementation of provisioner
* start working on the full and proper backed candidate selection rule
* Pass number of block under construction via RequestInherentData
* Revert "Pass number of block under construction via RequestInherentData"
This reverts commit 850fe62cc0dfb04252580c21a985962000e693c8.
That initially looked like the better approach--it spent the time
budget for fetching the block number in the proposer, instead of
the provisioner, and that felt more appropriate--but it turns out
not to be obvious how to get the block number of the block under
construction from within the proposer. The Chain API may be less
ideal, but it should be easier to implement.
* wip: get the block under production from the Chain API
* add ChainApiMessage to AllMessages
* don't break the run loop if a provisionable data channel closes
* clone only those backed candidates which are coherent
* propagate chain_api subsystem through various locations
* add delegated_subsystem! macro to ease delegating subsystems
Unfortunately, it doesn't work right:
```
error[E0446]: private type `CandidateBackingJob` in public interface
--> node/core/backing/src/lib.rs:775:1
|
86 | struct CandidateBackingJob {
| - `CandidateBackingJob` declared as private
...
775 | delegated_subsystem!(CandidateBackingJob as CandidateBackingSubsystem);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ can't leak private type
```
I'm not sure precisely what's going wrong, here; I suspect the problem is
the use of `$job as JobTrait>::RunArgs` and `::ToJob`; the failure would be
that it's not reifying the types to verify that the actual types are public,
but instead referring to them via `CandidateBackingJob`, which is in fact private;
that privacy is the point.
Going to see if I can generic my way out of this, but we may be headed for a
quick revert here.
* fix delegated_subsystem
The invocation is a bit more verbose than I'd prefer, but it's also
more explicit about what types need to be public. I'll take it as a win.
* add provisioning subsystem; reduce public interface of provisioner
* deny missing docs in provisioner
* refactor core selection per code review suggestion
This is twice as much code when measured by line, but IMO it is
in fact somewhat clearer to read, so overall a win.
Also adds an improved rule for selecting availability bitfields,
which (unlike the previous implementation) guarantees that the
appropriate postconditions hold there.
* fix bad merge double-declaration
* update guide with (hopefully) complete provisioner candidate selection procedure
* clarify candidate selection algorithm
* Revert "clarify candidate selection algorithm"
This reverts commit c68a02ac9cf42b3a4a28eb197d38633a40d0e3e6.
* clarify candidate selection algorithm
* update provisioner to implement candidate selection per the guide
* add test that no more than one bitfield is selected per validator
* add test that each selected bitfield corresponds to an occupied core
* add test that more set bits win conflicts
* add macro for specializing runtime requests; specailize all runtime requests
* add tests harness for select_candidates tests
* add first real select_candidates test, fix test_harness
* add mock overseer and test that success is possible
* add test that the candidate selection algorithm picks the right ones
* make candidate selection test somewhat more stringent
This commit is contained in:
committed by
GitHub
parent
877a5059aa
commit
21cec309a4
@@ -407,7 +407,8 @@ impl StatementDistributionMessage {
|
||||
}
|
||||
|
||||
/// This data becomes intrinsics or extrinsics which should be included in a future relay chain block.
|
||||
#[derive(Debug)]
|
||||
// It needs to be cloneable because multiple potential block authors can request copies.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ProvisionableData {
|
||||
/// This bitfield indicates the availability of various candidate blocks.
|
||||
Bitfield(Hash, SignedAvailabilityBitfield),
|
||||
@@ -488,8 +489,6 @@ pub enum AllMessages {
|
||||
CandidateBacking(CandidateBackingMessage),
|
||||
/// Message for the candidate selection subsystem.
|
||||
CandidateSelection(CandidateSelectionMessage),
|
||||
/// Message for the Chain API subsystem.
|
||||
ChainApi(ChainApiMessage),
|
||||
/// Message for the statement distribution subsystem.
|
||||
StatementDistribution(StatementDistributionMessage),
|
||||
/// Message for the availability distribution subsystem.
|
||||
@@ -508,6 +507,8 @@ pub enum AllMessages {
|
||||
AvailabilityStore(AvailabilityStoreMessage),
|
||||
/// Message for the network bridge subsystem.
|
||||
NetworkBridge(NetworkBridgeMessage),
|
||||
/// Message for the Chain API subsystem
|
||||
ChainApi(ChainApiMessage),
|
||||
/// Test message
|
||||
///
|
||||
/// This variant is only valid while testing, but makes the process of testing the
|
||||
|
||||
+236
-101
@@ -21,10 +21,8 @@
|
||||
//! this module.
|
||||
|
||||
use crate::{
|
||||
messages::{
|
||||
AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender,
|
||||
},
|
||||
errors::{ChainApiError, RuntimeApiError},
|
||||
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
|
||||
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
|
||||
};
|
||||
use futures::{
|
||||
@@ -40,13 +38,12 @@ use keystore::KeyStorePtr;
|
||||
use parity_scale_codec::Encode;
|
||||
use pin_project::{pin_project, pinned_drop};
|
||||
use polkadot_primitives::v1::{
|
||||
EncodeAs, Hash, Signed, SigningContext, SessionIndex,
|
||||
ValidatorId, ValidatorIndex, ValidatorPair, GroupRotationInfo,
|
||||
};
|
||||
use sp_core::{
|
||||
Pair,
|
||||
traits::SpawnNamed,
|
||||
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, GlobalValidationData,
|
||||
GroupRotationInfo, Hash, Id as ParaId, LocalValidationData, OccupiedCoreAssumption,
|
||||
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex,
|
||||
ValidatorPair,
|
||||
};
|
||||
use sp_core::Pair;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
convert::{TryFrom, TryInto},
|
||||
@@ -56,6 +53,11 @@ use std::{
|
||||
};
|
||||
use streamunordered::{StreamUnordered, StreamYield};
|
||||
|
||||
/// This reexport is required so that external crates can use the `delegated_subsystem` macro properly.
|
||||
///
|
||||
/// Otherwise, downstream crates might have to modify their `Cargo.toml` to ensure `sp-core` appeared there.
|
||||
pub use sp_core::traits::SpawnNamed;
|
||||
|
||||
/// Duration a job will wait after sending a stop signal before hard-aborting.
|
||||
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
|
||||
/// Capacity of channels to and from individual jobs
|
||||
@@ -119,42 +121,67 @@ where
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
/// Request a validator set from the `RuntimeApi`.
|
||||
pub async fn request_validators<FromJob>(
|
||||
parent: Hash,
|
||||
s: &mut mpsc::Sender<FromJob>,
|
||||
) -> Result<RuntimeApiReceiver<Vec<ValidatorId>>, Error>
|
||||
where
|
||||
FromJob: TryFrom<AllMessages>,
|
||||
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
|
||||
{
|
||||
request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await
|
||||
/// Construct specialized request functions for the runtime.
|
||||
///
|
||||
/// These would otherwise get pretty repetitive.
|
||||
macro_rules! specialize_requests {
|
||||
// expand return type name for documentation purposes
|
||||
(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
|
||||
specialize_requests!{
|
||||
named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
|
||||
}
|
||||
};
|
||||
|
||||
// create a single specialized request function
|
||||
(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
|
||||
#[doc = "Request `"]
|
||||
#[doc = $doc_name]
|
||||
#[doc = "` from the runtime"]
|
||||
pub async fn $func_name<FromJob>(
|
||||
parent: Hash,
|
||||
$(
|
||||
$param_name: $param_ty,
|
||||
)*
|
||||
sender: &mut mpsc::Sender<FromJob>,
|
||||
) -> Result<RuntimeApiReceiver<$return_ty>, Error>
|
||||
where
|
||||
FromJob: TryFrom<AllMessages>,
|
||||
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
|
||||
{
|
||||
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
|
||||
$( $param_name, )* tx
|
||||
)).await
|
||||
}
|
||||
};
|
||||
|
||||
// recursive decompose
|
||||
(
|
||||
fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
|
||||
$(
|
||||
fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
|
||||
)+
|
||||
) => {
|
||||
specialize_requests!{
|
||||
fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
|
||||
}
|
||||
specialize_requests!{
|
||||
$(
|
||||
fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
|
||||
)+
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Request the validator groups.
|
||||
pub async fn request_validator_groups<FromJob>(
|
||||
parent: Hash,
|
||||
s: &mut mpsc::Sender<FromJob>,
|
||||
) -> Result<RuntimeApiReceiver<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>, Error>
|
||||
where
|
||||
FromJob: TryFrom<AllMessages>,
|
||||
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
|
||||
{
|
||||
request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await
|
||||
}
|
||||
|
||||
/// Request the session index of the child block.
|
||||
pub async fn request_session_index_for_child<FromJob>(
|
||||
parent: Hash,
|
||||
s: &mut mpsc::Sender<FromJob>,
|
||||
) -> Result<RuntimeApiReceiver<SessionIndex>, Error>
|
||||
where
|
||||
FromJob: TryFrom<AllMessages>,
|
||||
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
|
||||
{
|
||||
request_from_runtime(parent, s, |tx| {
|
||||
RuntimeApiRequest::SessionIndexForChild(tx)
|
||||
}).await
|
||||
specialize_requests! {
|
||||
fn request_validators() -> Vec<ValidatorId>; Validators;
|
||||
fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
|
||||
fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
|
||||
fn request_global_validation_data() -> GlobalValidationData; GlobalValidationData;
|
||||
fn request_local_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<LocalValidationData>; LocalValidationData;
|
||||
fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
|
||||
fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
|
||||
fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
|
||||
fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
|
||||
}
|
||||
|
||||
/// From the given set of validators, find the first key we can sign with, if any.
|
||||
@@ -405,8 +432,13 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
|
||||
/// the error is forwarded onto the provided channel.
|
||||
///
|
||||
/// Errors if the error channel already exists.
|
||||
pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>) -> Result<(), Error> {
|
||||
if self.errors.is_some() { return Err(Error::AlreadyForwarding) }
|
||||
pub fn forward_errors(
|
||||
&mut self,
|
||||
tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>,
|
||||
) -> Result<(), Error> {
|
||||
if self.errors.is_some() {
|
||||
return Err(Error::AlreadyForwarding);
|
||||
}
|
||||
self.errors = Some(tx);
|
||||
Ok(())
|
||||
}
|
||||
@@ -510,13 +542,12 @@ where
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
|
||||
// pin-project the outgoing messages
|
||||
self.project()
|
||||
.outgoing_msgs
|
||||
.poll_next(cx)
|
||||
.map(|opt| opt.and_then(|(stream_yield, _)| match stream_yield {
|
||||
self.project().outgoing_msgs.poll_next(cx).map(|opt| {
|
||||
opt.and_then(|(stream_yield, _)| match stream_yield {
|
||||
StreamYield::Item(msg) => Some(msg),
|
||||
StreamYield::Finished(_) => None,
|
||||
}))
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -559,8 +590,13 @@ where
|
||||
/// the error is forwarded onto the provided channel.
|
||||
///
|
||||
/// Errors if the error channel already exists.
|
||||
pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>) -> Result<(), Error> {
|
||||
if self.errors.is_some() { return Err(Error::AlreadyForwarding) }
|
||||
pub fn forward_errors(
|
||||
&mut self,
|
||||
tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>,
|
||||
) -> Result<(), Error> {
|
||||
if self.errors.is_some() {
|
||||
return Err(Error::AlreadyForwarding);
|
||||
}
|
||||
self.errors = Some(tx);
|
||||
Ok(())
|
||||
}
|
||||
@@ -576,10 +612,16 @@ where
|
||||
///
|
||||
/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
|
||||
/// Otherwise, most are logged and then discarded.
|
||||
pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner, mut err_tx: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) {
|
||||
pub async fn run(
|
||||
mut ctx: Context,
|
||||
run_args: Job::RunArgs,
|
||||
spawner: Spawner,
|
||||
mut err_tx: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
|
||||
) {
|
||||
let mut jobs = Jobs::new(spawner.clone());
|
||||
if let Some(ref err_tx) = err_tx {
|
||||
jobs.forward_errors(err_tx.clone()).expect("we never call this twice in this context; qed");
|
||||
jobs.forward_errors(err_tx.clone())
|
||||
.expect("we never call this twice in this context; qed");
|
||||
}
|
||||
|
||||
loop {
|
||||
@@ -592,7 +634,11 @@ where
|
||||
}
|
||||
|
||||
// if we have a channel on which to forward errors, do so
|
||||
async fn fwd_err(hash: Option<Hash>, err: JobsError<Job::Error>, err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) {
|
||||
async fn fwd_err(
|
||||
hash: Option<Hash>,
|
||||
err: JobsError<Job::Error>,
|
||||
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
|
||||
) {
|
||||
if let Some(err_tx) = err_tx {
|
||||
// if we can't send on the error transmission channel, we can't do anything useful about it
|
||||
// still, we can at least log the failure
|
||||
@@ -607,14 +653,17 @@ where
|
||||
incoming: SubsystemResult<FromOverseer<Context::Message>>,
|
||||
jobs: &mut Jobs<Spawner, Job>,
|
||||
run_args: &Job::RunArgs,
|
||||
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>
|
||||
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
|
||||
) -> bool {
|
||||
use crate::FromOverseer::{Communication, Signal};
|
||||
use crate::ActiveLeavesUpdate;
|
||||
use crate::OverseerSignal::{BlockFinalized, Conclude, ActiveLeaves};
|
||||
use crate::FromOverseer::{Communication, Signal};
|
||||
use crate::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};
|
||||
|
||||
match incoming {
|
||||
Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }))) => {
|
||||
Ok(Signal(ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated,
|
||||
deactivated,
|
||||
}))) => {
|
||||
for hash in activated {
|
||||
if let Err(e) = jobs.spawn_job(hash, run_args.clone()) {
|
||||
log::error!("Failed to spawn a job: {:?}", e);
|
||||
@@ -638,10 +687,11 @@ where
|
||||
// Forwarding the stream to a drain means we wait until all of the items in the stream
|
||||
// have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`.
|
||||
use futures::sink::drain;
|
||||
use futures::stream::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::stream::StreamExt;
|
||||
|
||||
if let Err(e) = jobs.running
|
||||
if let Err(e) = jobs
|
||||
.running
|
||||
.drain()
|
||||
.map(|(_, handle)| handle.stop())
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
@@ -686,7 +736,11 @@ where
|
||||
}
|
||||
|
||||
// handle an outgoing message. return true if we should break afterwards.
|
||||
async fn handle_outgoing(outgoing: Option<Job::FromJob>, ctx: &mut Context, err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) -> bool {
|
||||
async fn handle_outgoing(
|
||||
outgoing: Option<Job::FromJob>,
|
||||
ctx: &mut Context,
|
||||
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
|
||||
) -> bool {
|
||||
match outgoing {
|
||||
Some(msg) => {
|
||||
if let Err(e) = ctx.send_message(msg.into()).await {
|
||||
@@ -713,7 +767,6 @@ where
|
||||
let run_args = self.run_args.clone();
|
||||
let errors = self.errors;
|
||||
|
||||
|
||||
let future = Box::pin(async move {
|
||||
Self::run(ctx, run_args, spawner, errors).await;
|
||||
});
|
||||
@@ -725,41 +778,107 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a delegated subsystem
|
||||
///
|
||||
/// It is possible to create a type which implements `Subsystem` by simply doing:
|
||||
///
|
||||
/// ```ignore
|
||||
/// pub type ExampleSubsystem<Spawner, Context> = util::JobManager<Spawner, Context, ExampleJob>;
|
||||
/// ```
|
||||
///
|
||||
/// However, doing this requires that job itself and all types which comprise it (i.e. `ToJob`, `FromJob`, `Error`, `RunArgs`)
|
||||
/// are public, to avoid exposing private types in public interfaces. It's possible to delegate instead, which
|
||||
/// can reduce the total number of public types exposed, i.e.
|
||||
///
|
||||
/// ```ignore
|
||||
/// type Manager<Spawner, Context> = util::JobManager<Spawner, Context, ExampleJob>;
|
||||
/// pub struct ExampleSubsystem {
|
||||
/// manager: Manager<Spawner, Context>,
|
||||
/// }
|
||||
///
|
||||
/// impl<Spawner, Context> Subsystem<Context> for ExampleSubsystem<Spawner, Context> { ... }
|
||||
/// ```
|
||||
///
|
||||
/// This dramatically reduces the number of public types in the crate; the only things which must be public are now
|
||||
///
|
||||
/// - `struct ExampleSubsystem` (defined by this macro)
|
||||
/// - `type ToJob` (because it appears in a trait bound)
|
||||
/// - `type RunArgs` (because it appears in a function signature)
|
||||
///
|
||||
/// Implementing this all manually is of course possible, but it's tedious; why bother? This macro exists for
|
||||
/// the purpose of doing it automatically:
|
||||
///
|
||||
/// ```ignore
|
||||
/// delegated_subsystem!(ExampleJob(ExampleRunArgs) <- ExampleToJob as ExampleSubsystem);
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! delegated_subsystem {
|
||||
($job:ident($run_args:ty) <- $to_job:ty as $subsystem:ident) => {
|
||||
delegated_subsystem!($job($run_args) <- $to_job as $subsystem; stringify!($subsystem));
|
||||
};
|
||||
|
||||
($job:ident($run_args:ty) <- $to_job:ty as $subsystem:ident; $subsystem_name:expr) => {
|
||||
#[doc = "Manager type for the "]
|
||||
#[doc = $subsystem_name]
|
||||
type Manager<Spawner, Context> = $crate::util::JobManager<Spawner, Context, $job>;
|
||||
|
||||
#[doc = "An implementation of the "]
|
||||
#[doc = $subsystem_name]
|
||||
pub struct $subsystem<Spawner, Context> {
|
||||
manager: Manager<Spawner, Context>,
|
||||
}
|
||||
|
||||
impl<Spawner, Context> $subsystem<Spawner, Context>
|
||||
where
|
||||
Spawner: Clone + $crate::util::SpawnNamed + Send + Unpin,
|
||||
Context: $crate::SubsystemContext,
|
||||
<Context as $crate::SubsystemContext>::Message: Into<$to_job>,
|
||||
{
|
||||
#[doc = "Creates a new "]
|
||||
#[doc = $subsystem_name]
|
||||
pub fn new(spawner: Spawner, run_args: $run_args) -> Self {
|
||||
$subsystem {
|
||||
manager: $crate::util::JobManager::new(spawner, run_args)
|
||||
}
|
||||
}
|
||||
|
||||
/// Run this subsystem
|
||||
pub async fn run(ctx: Context, run_args: $run_args, spawner: Spawner) {
|
||||
<Manager<Spawner, Context>>::run(ctx, run_args, spawner, None).await
|
||||
}
|
||||
}
|
||||
|
||||
impl<Spawner, Context> $crate::Subsystem<Context> for $subsystem<Spawner, Context>
|
||||
where
|
||||
Spawner: $crate::util::SpawnNamed + Send + Clone + Unpin + 'static,
|
||||
Context: $crate::SubsystemContext,
|
||||
<Context as $crate::SubsystemContext>::Message: Into<$to_job>,
|
||||
{
|
||||
fn start(self, ctx: Context) -> $crate::SpawnedSubsystem {
|
||||
self.manager.start(ctx)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use assert_matches::assert_matches;
|
||||
use crate::{
|
||||
messages::{AllMessages, CandidateSelectionMessage},
|
||||
test_helpers::{self, make_subsystem_context},
|
||||
util::{
|
||||
self,
|
||||
JobsError,
|
||||
JobManager,
|
||||
JobTrait,
|
||||
ToJobTrait,
|
||||
},
|
||||
ActiveLeavesUpdate,
|
||||
FromOverseer,
|
||||
OverseerSignal,
|
||||
SpawnedSubsystem,
|
||||
Subsystem,
|
||||
util::{self, JobManager, JobTrait, JobsError, ToJobTrait},
|
||||
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
|
||||
};
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{
|
||||
channel::mpsc,
|
||||
executor,
|
||||
Future,
|
||||
FutureExt,
|
||||
stream::{self, StreamExt},
|
||||
SinkExt,
|
||||
Future, FutureExt, SinkExt,
|
||||
};
|
||||
use futures_timer::Delay;
|
||||
use polkadot_primitives::v1::Hash;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
convert::TryFrom,
|
||||
pin::Pin,
|
||||
time::Duration,
|
||||
};
|
||||
use std::{collections::HashMap, convert::TryFrom, pin::Pin, time::Duration};
|
||||
|
||||
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
|
||||
// you can leave the subsystem itself to the job manager.
|
||||
@@ -803,7 +922,7 @@ mod tests {
|
||||
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
|
||||
match msg {
|
||||
AllMessages::CandidateSelection(csm) => Ok(ToJob::CandidateSelection(csm)),
|
||||
_ => Err(())
|
||||
_ => Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -839,7 +958,7 @@ mod tests {
|
||||
#[derive(Debug, derive_more::From)]
|
||||
enum Error {
|
||||
#[from]
|
||||
Sending(mpsc::SendError)
|
||||
Sending(mpsc::SendError),
|
||||
}
|
||||
|
||||
impl JobTrait for FakeCandidateSelectionJob {
|
||||
@@ -867,9 +986,7 @@ mod tests {
|
||||
mut sender: mpsc::Sender<FromJob>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||
async move {
|
||||
let job = FakeCandidateSelectionJob {
|
||||
receiver,
|
||||
};
|
||||
let job = FakeCandidateSelectionJob { receiver };
|
||||
|
||||
// most jobs will have a request-response cycle at the heart of their run loop.
|
||||
// however, in this case, we never receive valid messages, so we may as well
|
||||
@@ -881,7 +998,8 @@ mod tests {
|
||||
// it isn't necessary to break run_loop into its own function,
|
||||
// but it's convenient to separate the concerns in this way
|
||||
job.run_loop().await
|
||||
}.boxed()
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -901,12 +1019,16 @@ mod tests {
|
||||
}
|
||||
|
||||
// with the job defined, it's straightforward to get a subsystem implementation.
|
||||
type FakeCandidateSelectionSubsystem<Spawner, Context> = JobManager<Spawner, Context, FakeCandidateSelectionJob>;
|
||||
type FakeCandidateSelectionSubsystem<Spawner, Context> =
|
||||
JobManager<Spawner, Context, FakeCandidateSelectionJob>;
|
||||
|
||||
// this type lets us pretend to be the overseer
|
||||
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CandidateSelectionMessage>;
|
||||
|
||||
fn test_harness<T: Future<Output=()>>(run_args: HashMap<Hash, Vec<FromJob>>, test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T) {
|
||||
fn test_harness<T: Future<Output = ()>>(
|
||||
run_args: HashMap<Hash, Vec<FromJob>>,
|
||||
test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T,
|
||||
) {
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, overseer_handle) = make_subsystem_context(pool.clone());
|
||||
let (err_tx, err_rx) = mpsc::channel(16);
|
||||
@@ -933,15 +1055,26 @@ mod tests {
|
||||
let relay_parent: Hash = [0; 32].into();
|
||||
let mut run_args = HashMap::new();
|
||||
let test_message = format!("greetings from {}", relay_parent);
|
||||
run_args.insert(relay_parent.clone(), vec![FromJob::Test(test_message.clone())]);
|
||||
run_args.insert(
|
||||
relay_parent.clone(),
|
||||
vec![FromJob::Test(test_message.clone())],
|
||||
);
|
||||
|
||||
test_harness(run_args, |mut overseer_handle, err_rx| async move {
|
||||
overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(relay_parent)))).await;
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::start_work(relay_parent),
|
||||
)))
|
||||
.await;
|
||||
assert_matches!(
|
||||
overseer_handle.recv().await,
|
||||
AllMessages::Test(msg) if msg == test_message
|
||||
);
|
||||
overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(relay_parent)))).await;
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(relay_parent),
|
||||
)))
|
||||
.await;
|
||||
|
||||
let errs: Vec<_> = err_rx.collect().await;
|
||||
assert_eq!(errs.len(), 0);
|
||||
@@ -954,7 +1087,11 @@ mod tests {
|
||||
let run_args = HashMap::new();
|
||||
|
||||
test_harness(run_args, |mut overseer_handle, err_rx| async move {
|
||||
overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(relay_parent)))).await;
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(relay_parent),
|
||||
)))
|
||||
.await;
|
||||
|
||||
let errs: Vec<_> = err_rx.collect().await;
|
||||
assert_eq!(errs.len(), 1);
|
||||
@@ -971,10 +1108,8 @@ mod tests {
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, _) = make_subsystem_context::<CandidateSelectionMessage, _>(pool.clone());
|
||||
|
||||
let SpawnedSubsystem { name, .. } = FakeCandidateSelectionSubsystem::new(
|
||||
pool,
|
||||
HashMap::new(),
|
||||
).start(context);
|
||||
let SpawnedSubsystem { name, .. } =
|
||||
FakeCandidateSelectionSubsystem::new(pool, HashMap::new()).start(context);
|
||||
assert_eq!(name, "FakeCandidateSelection");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user