mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 15:51:12 +00:00
Remove Job trait (#5600)
This commit is contained in:
@@ -26,9 +26,8 @@
|
||||
|
||||
use polkadot_node_subsystem::{
|
||||
errors::{RuntimeApiError, SubsystemError},
|
||||
messages::{BoundToRelayParent, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
|
||||
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
|
||||
SubsystemContext, SubsystemSender,
|
||||
messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
|
||||
overseer, SubsystemSender,
|
||||
};
|
||||
|
||||
pub use overseer::{
|
||||
@@ -38,14 +37,8 @@ pub use overseer::{
|
||||
|
||||
pub use polkadot_node_metrics::{metrics, Metronome};
|
||||
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
prelude::*,
|
||||
select,
|
||||
stream::{SelectAll, Stream},
|
||||
};
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use parity_scale_codec::Encode;
|
||||
use pin_project::pin_project;
|
||||
|
||||
use polkadot_primitives::v2::{
|
||||
AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs,
|
||||
@@ -58,14 +51,7 @@ pub use rand;
|
||||
use sp_application_crypto::AppKey;
|
||||
use sp_core::ByteArray;
|
||||
use sp_keystore::{CryptoStore, Error as KeystoreError, SyncCryptoStorePtr};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
fmt,
|
||||
marker::Unpin,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
|
||||
pub use metered;
|
||||
@@ -388,396 +374,3 @@ impl Validator {
|
||||
Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key).await
|
||||
}
|
||||
}
|
||||
|
||||
struct AbortOnDrop(future::AbortHandle);
|
||||
|
||||
impl Drop for AbortOnDrop {
|
||||
fn drop(&mut self) {
|
||||
self.0.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// A `JobHandle` manages a particular job for a subsystem.
|
||||
struct JobHandle<Consumes> {
|
||||
_abort_handle: AbortOnDrop,
|
||||
to_job: mpsc::Sender<Consumes>,
|
||||
}
|
||||
|
||||
impl<Consumes> JobHandle<Consumes> {
|
||||
/// Send a message to the job.
|
||||
async fn send_msg(&mut self, msg: Consumes) -> Result<(), Error> {
|
||||
self.to_job.send(msg).await.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
/// Commands from a job to the broader subsystem.
|
||||
pub enum FromJobCommand {
|
||||
/// Spawn a child task on the executor.
|
||||
Spawn(&'static str, Pin<Box<dyn Future<Output = ()> + Send>>),
|
||||
/// Spawn a blocking child task on the executor's dedicated thread pool.
|
||||
SpawnBlocking(&'static str, Pin<Box<dyn Future<Output = ()> + Send>>),
|
||||
}
|
||||
|
||||
/// A sender for messages from jobs, as well as commands to the overseer.
|
||||
pub struct JobSender<S> {
|
||||
sender: S,
|
||||
from_job: mpsc::Sender<FromJobCommand>,
|
||||
}
|
||||
|
||||
// A custom clone impl, since M does not need to impl `Clone`
|
||||
// which `#[derive(Clone)]` requires.
|
||||
impl<S: Clone> Clone for JobSender<S> {
|
||||
fn clone(&self) -> Self {
|
||||
Self { sender: self.sender.clone(), from_job: self.from_job.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> JobSender<S> {
|
||||
/// Get access to the underlying subsystem sender.
|
||||
pub fn subsystem_sender(&mut self) -> &mut S {
|
||||
&mut self.sender
|
||||
}
|
||||
|
||||
/// Send a command to the subsystem, to be relayed onwards to the overseer.
|
||||
pub async fn send_command(&mut self, msg: FromJobCommand) -> Result<(), mpsc::SendError> {
|
||||
self.from_job.send(msg).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<S, M> overseer::SubsystemSender<M> for JobSender<S>
|
||||
where
|
||||
M: Send + 'static,
|
||||
S: SubsystemSender<M> + Clone,
|
||||
{
|
||||
async fn send_message(&mut self, msg: M) {
|
||||
self.sender.send_message(msg).await
|
||||
}
|
||||
|
||||
async fn send_messages<I>(&mut self, msgs: I)
|
||||
where
|
||||
I: IntoIterator<Item = M> + Send,
|
||||
I::IntoIter: Send,
|
||||
{
|
||||
self.sender.send_messages(msgs).await
|
||||
}
|
||||
|
||||
fn send_unbounded_message(&mut self, msg: M) {
|
||||
self.sender.send_unbounded_message(msg)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for FromJobCommand {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Self::Spawn(name, _) => write!(fmt, "FromJobCommand::Spawn({})", name),
|
||||
Self::SpawnBlocking(name, _) => write!(fmt, "FromJobCommand::SpawnBlocking({})", name),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This trait governs jobs.
|
||||
///
|
||||
/// Jobs are instantiated and killed automatically on appropriate overseer messages.
|
||||
/// Other messages are passed along to and from the job via the overseer to other subsystems.
|
||||
pub trait JobTrait: Unpin + Sized {
|
||||
/// Message type used to send messages to the job.
|
||||
type ToJob: 'static + BoundToRelayParent + Send;
|
||||
|
||||
/// The set of outgoing messages to be accumulated into.
|
||||
type OutgoingMessages: 'static + Send;
|
||||
|
||||
/// The sender to send outgoing messages.
|
||||
// The trait bounds are rather minimal.
|
||||
type Sender: 'static + Send + Clone;
|
||||
|
||||
/// Job runtime error.
|
||||
type Error: 'static + std::error::Error + Send;
|
||||
/// Extra arguments this job needs to run properly.
|
||||
///
|
||||
/// If no extra information is needed, it is perfectly acceptable to set it to `()`.
|
||||
type RunArgs: 'static + Send;
|
||||
/// Subsystem-specific Prometheus metrics.
|
||||
///
|
||||
/// Jobs spawned by one subsystem should share the same
|
||||
/// instance of metrics (use `.clone()`).
|
||||
/// The `delegate_subsystem!` macro should take care of this.
|
||||
type Metrics: 'static + metrics::Metrics + Send;
|
||||
|
||||
/// Name of the job, i.e. `candidate-backing-job`
|
||||
const NAME: &'static str;
|
||||
|
||||
/// Run a job for the given relay `parent`.
|
||||
///
|
||||
/// The job should be ended when `receiver` returns `None`.
|
||||
fn run(
|
||||
leaf: ActivatedLeaf,
|
||||
run_args: Self::RunArgs,
|
||||
metrics: Self::Metrics,
|
||||
receiver: mpsc::Receiver<Self::ToJob>,
|
||||
sender: JobSender<Self::Sender>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
|
||||
}
|
||||
|
||||
/// Error which can be returned by the jobs manager
|
||||
///
|
||||
/// Wraps the utility error type and the job-specific error
|
||||
#[derive(Debug, Error)]
|
||||
pub enum JobsError<JobError: std::fmt::Debug + std::error::Error + 'static> {
|
||||
/// utility error
|
||||
#[error("Utility")]
|
||||
Utility(#[source] Error),
|
||||
/// internal job error
|
||||
#[error("Internal")]
|
||||
Job(#[source] JobError),
|
||||
}
|
||||
|
||||
/// Jobs manager for a subsystem
|
||||
///
|
||||
/// - Spawns new jobs for a given relay-parent on demand.
|
||||
/// - Closes old jobs for a given relay-parent on demand.
|
||||
/// - Dispatches messages to the appropriate job for a given relay-parent.
|
||||
/// - When dropped, aborts all remaining jobs.
|
||||
/// - implements `Stream<Item=FromJobCommand>`, collecting all messages from subordinate jobs.
|
||||
#[pin_project]
|
||||
struct Jobs<Spawner, ToJob> {
|
||||
spawner: Spawner,
|
||||
running: HashMap<Hash, JobHandle<ToJob>>,
|
||||
outgoing_msgs: SelectAll<mpsc::Receiver<FromJobCommand>>,
|
||||
}
|
||||
|
||||
impl<Spawner, ToJob> Jobs<Spawner, ToJob>
|
||||
where
|
||||
Spawner: overseer::gen::Spawner + Clone,
|
||||
ToJob: Send + 'static,
|
||||
{
|
||||
/// Create a new Jobs manager which handles spawning appropriate jobs.
|
||||
pub fn new(spawner: Spawner) -> Self {
|
||||
Self { spawner, running: HashMap::new(), outgoing_msgs: SelectAll::new() }
|
||||
}
|
||||
|
||||
/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
|
||||
fn spawn_job<Job>(
|
||||
&mut self,
|
||||
leaf: ActivatedLeaf,
|
||||
run_args: Job::RunArgs,
|
||||
metrics: Job::Metrics,
|
||||
sender: Job::Sender,
|
||||
) where
|
||||
Job: JobTrait<ToJob = ToJob>,
|
||||
{
|
||||
let hash = leaf.hash;
|
||||
let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
|
||||
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
|
||||
|
||||
let (future, abort_handle) = future::abortable(async move {
|
||||
if let Err(e) = Job::run(
|
||||
leaf,
|
||||
run_args,
|
||||
metrics,
|
||||
to_job_rx,
|
||||
JobSender { sender, from_job: from_job_tx },
|
||||
)
|
||||
.await
|
||||
{
|
||||
gum::error!(
|
||||
job = Job::NAME,
|
||||
parent_hash = %hash,
|
||||
err = ?e,
|
||||
"job finished with an error",
|
||||
);
|
||||
|
||||
return Err(e)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
self.spawner.spawn(
|
||||
Job::NAME,
|
||||
Some(Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME)),
|
||||
future.map(drop).boxed(),
|
||||
);
|
||||
self.outgoing_msgs.push(from_job_rx);
|
||||
|
||||
let handle = JobHandle { _abort_handle: AbortOnDrop(abort_handle), to_job: to_job_tx };
|
||||
|
||||
self.running.insert(hash, handle);
|
||||
}
|
||||
|
||||
/// Stop the job associated with this `parent_hash`.
|
||||
pub async fn stop_job(&mut self, parent_hash: Hash) {
|
||||
self.running.remove(&parent_hash);
|
||||
}
|
||||
|
||||
/// Send a message to the appropriate job for this `parent_hash`.
|
||||
async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) {
|
||||
if let Entry::Occupied(mut job) = self.running.entry(parent_hash) {
|
||||
if job.get_mut().send_msg(msg).await.is_err() {
|
||||
job.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Spawner, ToJob> Stream for Jobs<Spawner, ToJob>
|
||||
where
|
||||
Spawner: overseer::gen::Spawner + Clone,
|
||||
{
|
||||
type Item = FromJobCommand;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
match futures::ready!(Pin::new(&mut self.outgoing_msgs).poll_next(cx)) {
|
||||
Some(msg) => Poll::Ready(Some(msg)),
|
||||
// Don't end if there are no jobs running
|
||||
None => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Spawner, ToJob> stream::FusedStream for Jobs<Spawner, ToJob>
|
||||
where
|
||||
Spawner: overseer::gen::Spawner + Clone,
|
||||
{
|
||||
fn is_terminated(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Parameters to a job subsystem.
|
||||
pub struct JobSubsystemParams<Spawner, RunArgs, Metrics> {
|
||||
/// A spawner for sub-tasks.
|
||||
spawner: Spawner,
|
||||
/// Arguments to each job.
|
||||
run_args: RunArgs,
|
||||
/// Metrics for the subsystem.
|
||||
pub metrics: Metrics,
|
||||
}
|
||||
|
||||
/// A subsystem which wraps jobs.
|
||||
///
|
||||
/// Conceptually, this is very simple: it just loops forever.
|
||||
///
|
||||
/// - On incoming overseer messages, it starts or stops jobs as appropriate.
|
||||
/// - On other incoming messages, if they can be converted into `Job::ToJob` and
|
||||
/// include a hash, then they're forwarded to the appropriate individual job.
|
||||
/// - On outgoing messages from the jobs, it forwards them to the overseer.
|
||||
pub struct JobSubsystem<Job: JobTrait, Spawner> {
|
||||
#[allow(missing_docs)]
|
||||
pub params: JobSubsystemParams<Spawner, Job::RunArgs, Job::Metrics>,
|
||||
_marker: std::marker::PhantomData<Job>,
|
||||
}
|
||||
|
||||
impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
|
||||
/// Create a new `JobSubsystem`.
|
||||
pub fn new(spawner: Spawner, run_args: Job::RunArgs, metrics: Job::Metrics) -> Self {
|
||||
JobSubsystem {
|
||||
params: JobSubsystemParams { spawner, run_args, metrics },
|
||||
_marker: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the subsystem to completion.
|
||||
pub async fn run<Context>(self, mut ctx: Context)
|
||||
where
|
||||
Spawner: overseer::gen::Spawner + Clone + Unpin + 'static,
|
||||
Context: SubsystemContext<
|
||||
Message = <Job as JobTrait>::ToJob,
|
||||
OutgoingMessages = <Job as JobTrait>::OutgoingMessages,
|
||||
Sender = <Job as JobTrait>::Sender,
|
||||
Signal = OverseerSignal,
|
||||
>,
|
||||
Job: 'static + JobTrait + Send,
|
||||
<Job as JobTrait>::RunArgs: Clone + Sync,
|
||||
<Job as JobTrait>::ToJob:
|
||||
Sync + From<<Context as polkadot_overseer::SubsystemContext>::Message>,
|
||||
<Job as JobTrait>::Metrics: Sync,
|
||||
{
|
||||
let JobSubsystem { params: JobSubsystemParams { spawner, run_args, metrics }, .. } = self;
|
||||
|
||||
let mut jobs = Jobs::<Spawner, Job::ToJob>::new(spawner);
|
||||
|
||||
loop {
|
||||
select! {
|
||||
incoming = ctx.recv().fuse() => {
|
||||
match incoming {
|
||||
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated,
|
||||
deactivated,
|
||||
}))) => {
|
||||
for activated in activated {
|
||||
let sender = ctx.sender().clone();
|
||||
jobs.spawn_job::<Job>(
|
||||
activated,
|
||||
run_args.clone(),
|
||||
metrics.clone(),
|
||||
sender,
|
||||
)
|
||||
}
|
||||
|
||||
for hash in deactivated {
|
||||
jobs.stop_job(hash).await;
|
||||
}
|
||||
}
|
||||
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => {
|
||||
jobs.running.clear();
|
||||
break;
|
||||
}
|
||||
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}
|
||||
Ok(FromOrchestra::Communication { msg }) => {
|
||||
if let Ok(to_job) = <<Context as SubsystemContext>::Message>::try_from(msg) {
|
||||
jobs.send_msg(to_job.relay_parent(), to_job).await;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
gum::error!(
|
||||
job = Job::NAME,
|
||||
err = ?err,
|
||||
"error receiving message from subsystem context for job",
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
outgoing = jobs.next() => {
|
||||
// TODO verify the introduced .await here is not a problem
|
||||
// TODO it should only wait for the spawn to complete
|
||||
// TODO but not for anything beyond that
|
||||
let res = match outgoing.expect("the Jobs stream never ends; qed") {
|
||||
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task),
|
||||
FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task),
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
gum::warn!(err = ?e, "failed to handle command from job");
|
||||
}
|
||||
}
|
||||
complete => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Context, Job, Spawner> Subsystem<Context, SubsystemError> for JobSubsystem<Job, Spawner>
|
||||
where
|
||||
Spawner: overseer::gen::Spawner + Clone + Unpin + 'static,
|
||||
Context: SubsystemContext<
|
||||
Message = Job::ToJob,
|
||||
Signal = OverseerSignal,
|
||||
OutgoingMessages = <Job as JobTrait>::OutgoingMessages,
|
||||
Sender = <Job as JobTrait>::Sender,
|
||||
>,
|
||||
Job: 'static + JobTrait + Send,
|
||||
Job::RunArgs: Clone + Sync,
|
||||
<Job as JobTrait>::ToJob: Sync + From<<Context as SubsystemContext>::Message>,
|
||||
Job::Metrics: Sync,
|
||||
{
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
let future = Box::pin(async move {
|
||||
self.run(ctx).await;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
SpawnedSubsystem { name: Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME), future }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,209 +17,16 @@
|
||||
#![cfg(test)]
|
||||
|
||||
use super::*;
|
||||
use assert_matches::assert_matches;
|
||||
use executor::block_on;
|
||||
use futures::{channel::mpsc, executor, future, Future, FutureExt, SinkExt, StreamExt};
|
||||
use polkadot_node_jaeger as jaeger;
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{AllMessages, CollatorProtocolMessage},
|
||||
ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal, SpawnedSubsystem,
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context};
|
||||
use polkadot_primitives::v2::Hash;
|
||||
use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash, AlwaysZeroRng};
|
||||
use futures::{channel::mpsc, executor, FutureExt, SinkExt, StreamExt};
|
||||
use polkadot_primitives_test_helpers::AlwaysZeroRng;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
|
||||
// you can leave the subsystem itself to the job manager.
|
||||
|
||||
// for purposes of demonstration, we're going to whip up a fake subsystem.
|
||||
// this will 'select' candidates which are pre-loaded in the job
|
||||
|
||||
// job structs are constructed within JobTrait::run
|
||||
// most will want to retain the sender and receiver, as well as whatever other data they like
|
||||
struct FakeCollatorProtocolJob<Sender> {
|
||||
receiver: mpsc::Receiver<CollatorProtocolMessage>,
|
||||
_phantom: std::marker::PhantomData<Sender>,
|
||||
}
|
||||
|
||||
// Error will mostly be a wrapper to make the try operator more convenient;
|
||||
// deriving From implementations for most variants is recommended.
|
||||
// It must implement Debug for logging.
|
||||
#[derive(Debug, Error)]
|
||||
enum Error {
|
||||
#[error(transparent)]
|
||||
Sending(#[from] mpsc::SendError),
|
||||
}
|
||||
|
||||
impl<Sender> JobTrait for FakeCollatorProtocolJob<Sender>
|
||||
where
|
||||
Sender: overseer::CollatorProtocolSenderTrait
|
||||
+ std::marker::Unpin
|
||||
+ overseer::SubsystemSender<CollatorProtocolMessage>,
|
||||
JobSender<Sender>: overseer::CollatorProtocolSenderTrait
|
||||
+ std::marker::Unpin
|
||||
+ overseer::SubsystemSender<CollatorProtocolMessage>,
|
||||
{
|
||||
type ToJob = CollatorProtocolMessage;
|
||||
type OutgoingMessages = overseer::CollatorProtocolOutgoingMessages;
|
||||
type Sender = Sender;
|
||||
type Error = Error;
|
||||
type RunArgs = bool;
|
||||
type Metrics = ();
|
||||
|
||||
const NAME: &'static str = "fake-collator-protocol-job";
|
||||
|
||||
/// Run a job for the parent block indicated
|
||||
//
|
||||
// this function is in charge of creating and executing the job's main loop
|
||||
fn run(
|
||||
_: ActivatedLeaf,
|
||||
run_args: Self::RunArgs,
|
||||
_metrics: Self::Metrics,
|
||||
receiver: mpsc::Receiver<CollatorProtocolMessage>,
|
||||
mut sender: JobSender<Sender>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||
async move {
|
||||
let job =
|
||||
FakeCollatorProtocolJob { receiver, _phantom: std::marker::PhantomData::<Sender> };
|
||||
|
||||
if run_args {
|
||||
sender
|
||||
.send_message(CollatorProtocolMessage::Invalid(
|
||||
dummy_hash(),
|
||||
dummy_candidate_receipt(dummy_hash()),
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
// it isn't necessary to break run_loop into its own function,
|
||||
// but it's convenient to separate the concerns in this way
|
||||
job.run_loop().await
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Sender> FakeCollatorProtocolJob<Sender>
|
||||
where
|
||||
Sender: overseer::CollatorProtocolSenderTrait,
|
||||
{
|
||||
async fn run_loop(mut self) -> Result<(), Error> {
|
||||
loop {
|
||||
match self.receiver.next().await {
|
||||
Some(_csm) => {
|
||||
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
|
||||
},
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// with the job defined, it's straightforward to get a subsystem implementation.
|
||||
type FakeCollatorProtocolSubsystem<Spawner> =
|
||||
JobSubsystem<FakeCollatorProtocolJob<test_helpers::TestSubsystemSender>, Spawner>;
|
||||
|
||||
// this type lets us pretend to be the overseer
|
||||
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>;
|
||||
|
||||
fn test_harness<T: Future<Output = ()>>(run_args: bool, test: impl FnOnce(OverseerHandle) -> T) {
|
||||
let _ = env_logger::builder()
|
||||
.is_test(true)
|
||||
.filter(None, log::LevelFilter::Trace)
|
||||
.try_init();
|
||||
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, overseer_handle) = make_subsystem_context(pool.clone());
|
||||
|
||||
let subsystem =
|
||||
FakeCollatorProtocolSubsystem::new(overseer::SpawnGlue(pool), run_args, ()).run(context);
|
||||
let test_future = test(overseer_handle);
|
||||
|
||||
futures::pin_mut!(subsystem, test_future);
|
||||
|
||||
executor::block_on(async move {
|
||||
future::join(subsystem, test_future)
|
||||
.timeout(Duration::from_secs(2))
|
||||
.await
|
||||
.expect("test timed out instead of completing")
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn starting_and_stopping_job_works() {
|
||||
let relay_parent: Hash = [0; 32].into();
|
||||
|
||||
test_harness(true, |mut overseer_handle| async move {
|
||||
overseer_handle
|
||||
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
hash: relay_parent,
|
||||
number: 1,
|
||||
status: LeafStatus::Fresh,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
assert_matches!(overseer_handle.recv().await, AllMessages::CollatorProtocol(_));
|
||||
overseer_handle
|
||||
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::stop_work(relay_parent),
|
||||
)))
|
||||
.await;
|
||||
|
||||
overseer_handle.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
|
||||
let relay_parent = Hash::repeat_byte(0x01);
|
||||
|
||||
test_harness(true, |mut overseer_handle| async move {
|
||||
overseer_handle
|
||||
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
|
||||
ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
hash: relay_parent,
|
||||
number: 1,
|
||||
status: LeafStatus::Fresh,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
|
||||
// send to a non running job
|
||||
overseer_handle
|
||||
.send(FromOrchestra::Communication { msg: Default::default() })
|
||||
.await;
|
||||
|
||||
// the subsystem is still alive
|
||||
assert_matches!(overseer_handle.recv().await, AllMessages::CollatorProtocol(_));
|
||||
|
||||
overseer_handle.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_subsystem_impl_and_name_derivation() {
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, _) = make_subsystem_context::<CollatorProtocolMessage, _>(pool.clone());
|
||||
|
||||
let SpawnedSubsystem { name, .. } =
|
||||
FakeCollatorProtocolSubsystem::new(overseer::SpawnGlue(pool), false, ()).start(context);
|
||||
assert_eq!(name, "fake-collator-protocol");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_tack_metronome() {
|
||||
|
||||
Reference in New Issue
Block a user