diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 2ca11b4459..7bfada5757 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -26,9 +26,8 @@ use polkadot_node_subsystem::{ errors::{RuntimeApiError, SubsystemError}, - messages::{BoundToRelayParent, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender}, - overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, - SubsystemContext, SubsystemSender, + messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender}, + overseer, SubsystemSender, }; pub use overseer::{ @@ -38,14 +37,8 @@ pub use overseer::{ pub use polkadot_node_metrics::{metrics, Metronome}; -use futures::{ - channel::{mpsc, oneshot}, - prelude::*, - select, - stream::{SelectAll, Stream}, -}; +use futures::channel::{mpsc, oneshot}; use parity_scale_codec::Encode; -use pin_project::pin_project; use polkadot_primitives::v2::{ AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, @@ -58,14 +51,7 @@ pub use rand; use sp_application_crypto::AppKey; use sp_core::ByteArray; use sp_keystore::{CryptoStore, Error as KeystoreError, SyncCryptoStorePtr}; -use std::{ - collections::{hash_map::Entry, HashMap}, - fmt, - marker::Unpin, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; +use std::time::Duration; use thiserror::Error; pub use metered; @@ -388,396 +374,3 @@ impl Validator { Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key).await } } - -struct AbortOnDrop(future::AbortHandle); - -impl Drop for AbortOnDrop { - fn drop(&mut self) { - self.0.abort(); - } -} - -/// A `JobHandle` manages a particular job for a subsystem. -struct JobHandle { - _abort_handle: AbortOnDrop, - to_job: mpsc::Sender, -} - -impl JobHandle { - /// Send a message to the job. - async fn send_msg(&mut self, msg: Consumes) -> Result<(), Error> { - self.to_job.send(msg).await.map_err(Into::into) - } -} - -/// Commands from a job to the broader subsystem. -pub enum FromJobCommand { - /// Spawn a child task on the executor. - Spawn(&'static str, Pin + Send>>), - /// Spawn a blocking child task on the executor's dedicated thread pool. - SpawnBlocking(&'static str, Pin + Send>>), -} - -/// A sender for messages from jobs, as well as commands to the overseer. -pub struct JobSender { - sender: S, - from_job: mpsc::Sender, -} - -// A custom clone impl, since M does not need to impl `Clone` -// which `#[derive(Clone)]` requires. -impl Clone for JobSender { - fn clone(&self) -> Self { - Self { sender: self.sender.clone(), from_job: self.from_job.clone() } - } -} - -impl JobSender { - /// Get access to the underlying subsystem sender. - pub fn subsystem_sender(&mut self) -> &mut S { - &mut self.sender - } - - /// Send a command to the subsystem, to be relayed onwards to the overseer. - pub async fn send_command(&mut self, msg: FromJobCommand) -> Result<(), mpsc::SendError> { - self.from_job.send(msg).await - } -} - -#[async_trait::async_trait] -impl overseer::SubsystemSender for JobSender -where - M: Send + 'static, - S: SubsystemSender + Clone, -{ - async fn send_message(&mut self, msg: M) { - self.sender.send_message(msg).await - } - - async fn send_messages(&mut self, msgs: I) - where - I: IntoIterator + Send, - I::IntoIter: Send, - { - self.sender.send_messages(msgs).await - } - - fn send_unbounded_message(&mut self, msg: M) { - self.sender.send_unbounded_message(msg) - } -} - -impl fmt::Debug for FromJobCommand { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::Spawn(name, _) => write!(fmt, "FromJobCommand::Spawn({})", name), - Self::SpawnBlocking(name, _) => write!(fmt, "FromJobCommand::SpawnBlocking({})", name), - } - } -} - -/// This trait governs jobs. -/// -/// Jobs are instantiated and killed automatically on appropriate overseer messages. -/// Other messages are passed along to and from the job via the overseer to other subsystems. -pub trait JobTrait: Unpin + Sized { - /// Message type used to send messages to the job. - type ToJob: 'static + BoundToRelayParent + Send; - - /// The set of outgoing messages to be accumulated into. - type OutgoingMessages: 'static + Send; - - /// The sender to send outgoing messages. - // The trait bounds are rather minimal. - type Sender: 'static + Send + Clone; - - /// Job runtime error. - type Error: 'static + std::error::Error + Send; - /// Extra arguments this job needs to run properly. - /// - /// If no extra information is needed, it is perfectly acceptable to set it to `()`. - type RunArgs: 'static + Send; - /// Subsystem-specific Prometheus metrics. - /// - /// Jobs spawned by one subsystem should share the same - /// instance of metrics (use `.clone()`). - /// The `delegate_subsystem!` macro should take care of this. - type Metrics: 'static + metrics::Metrics + Send; - - /// Name of the job, i.e. `candidate-backing-job` - const NAME: &'static str; - - /// Run a job for the given relay `parent`. - /// - /// The job should be ended when `receiver` returns `None`. - fn run( - leaf: ActivatedLeaf, - run_args: Self::RunArgs, - metrics: Self::Metrics, - receiver: mpsc::Receiver, - sender: JobSender, - ) -> Pin> + Send>>; -} - -/// Error which can be returned by the jobs manager -/// -/// Wraps the utility error type and the job-specific error -#[derive(Debug, Error)] -pub enum JobsError { - /// utility error - #[error("Utility")] - Utility(#[source] Error), - /// internal job error - #[error("Internal")] - Job(#[source] JobError), -} - -/// Jobs manager for a subsystem -/// -/// - Spawns new jobs for a given relay-parent on demand. -/// - Closes old jobs for a given relay-parent on demand. -/// - Dispatches messages to the appropriate job for a given relay-parent. -/// - When dropped, aborts all remaining jobs. -/// - implements `Stream`, collecting all messages from subordinate jobs. -#[pin_project] -struct Jobs { - spawner: Spawner, - running: HashMap>, - outgoing_msgs: SelectAll>, -} - -impl Jobs -where - Spawner: overseer::gen::Spawner + Clone, - ToJob: Send + 'static, -{ - /// Create a new Jobs manager which handles spawning appropriate jobs. - pub fn new(spawner: Spawner) -> Self { - Self { spawner, running: HashMap::new(), outgoing_msgs: SelectAll::new() } - } - - /// Spawn a new job for this `parent_hash`, with whatever args are appropriate. - fn spawn_job( - &mut self, - leaf: ActivatedLeaf, - run_args: Job::RunArgs, - metrics: Job::Metrics, - sender: Job::Sender, - ) where - Job: JobTrait, - { - let hash = leaf.hash; - let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); - let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); - - let (future, abort_handle) = future::abortable(async move { - if let Err(e) = Job::run( - leaf, - run_args, - metrics, - to_job_rx, - JobSender { sender, from_job: from_job_tx }, - ) - .await - { - gum::error!( - job = Job::NAME, - parent_hash = %hash, - err = ?e, - "job finished with an error", - ); - - return Err(e) - } - - Ok(()) - }); - - self.spawner.spawn( - Job::NAME, - Some(Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME)), - future.map(drop).boxed(), - ); - self.outgoing_msgs.push(from_job_rx); - - let handle = JobHandle { _abort_handle: AbortOnDrop(abort_handle), to_job: to_job_tx }; - - self.running.insert(hash, handle); - } - - /// Stop the job associated with this `parent_hash`. - pub async fn stop_job(&mut self, parent_hash: Hash) { - self.running.remove(&parent_hash); - } - - /// Send a message to the appropriate job for this `parent_hash`. - async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) { - if let Entry::Occupied(mut job) = self.running.entry(parent_hash) { - if job.get_mut().send_msg(msg).await.is_err() { - job.remove(); - } - } - } -} - -impl Stream for Jobs -where - Spawner: overseer::gen::Spawner + Clone, -{ - type Item = FromJobCommand; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match futures::ready!(Pin::new(&mut self.outgoing_msgs).poll_next(cx)) { - Some(msg) => Poll::Ready(Some(msg)), - // Don't end if there are no jobs running - None => Poll::Pending, - } - } -} - -impl stream::FusedStream for Jobs -where - Spawner: overseer::gen::Spawner + Clone, -{ - fn is_terminated(&self) -> bool { - false - } -} - -/// Parameters to a job subsystem. -pub struct JobSubsystemParams { - /// A spawner for sub-tasks. - spawner: Spawner, - /// Arguments to each job. - run_args: RunArgs, - /// Metrics for the subsystem. - pub metrics: Metrics, -} - -/// A subsystem which wraps jobs. -/// -/// Conceptually, this is very simple: it just loops forever. -/// -/// - On incoming overseer messages, it starts or stops jobs as appropriate. -/// - On other incoming messages, if they can be converted into `Job::ToJob` and -/// include a hash, then they're forwarded to the appropriate individual job. -/// - On outgoing messages from the jobs, it forwards them to the overseer. -pub struct JobSubsystem { - #[allow(missing_docs)] - pub params: JobSubsystemParams, - _marker: std::marker::PhantomData, -} - -impl JobSubsystem { - /// Create a new `JobSubsystem`. - pub fn new(spawner: Spawner, run_args: Job::RunArgs, metrics: Job::Metrics) -> Self { - JobSubsystem { - params: JobSubsystemParams { spawner, run_args, metrics }, - _marker: std::marker::PhantomData, - } - } - - /// Run the subsystem to completion. - pub async fn run(self, mut ctx: Context) - where - Spawner: overseer::gen::Spawner + Clone + Unpin + 'static, - Context: SubsystemContext< - Message = ::ToJob, - OutgoingMessages = ::OutgoingMessages, - Sender = ::Sender, - Signal = OverseerSignal, - >, - Job: 'static + JobTrait + Send, - ::RunArgs: Clone + Sync, - ::ToJob: - Sync + From<::Message>, - ::Metrics: Sync, - { - let JobSubsystem { params: JobSubsystemParams { spawner, run_args, metrics }, .. } = self; - - let mut jobs = Jobs::::new(spawner); - - loop { - select! { - incoming = ctx.recv().fuse() => { - match incoming { - Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated, - deactivated, - }))) => { - for activated in activated { - let sender = ctx.sender().clone(); - jobs.spawn_job::( - activated, - run_args.clone(), - metrics.clone(), - sender, - ) - } - - for hash in deactivated { - jobs.stop_job(hash).await; - } - } - Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => { - jobs.running.clear(); - break; - } - Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {} - Ok(FromOrchestra::Communication { msg }) => { - if let Ok(to_job) = <::Message>::try_from(msg) { - jobs.send_msg(to_job.relay_parent(), to_job).await; - } - } - Err(err) => { - gum::error!( - job = Job::NAME, - err = ?err, - "error receiving message from subsystem context for job", - ); - break; - } - } - } - outgoing = jobs.next() => { - // TODO verify the introduced .await here is not a problem - // TODO it should only wait for the spawn to complete - // TODO but not for anything beyond that - let res = match outgoing.expect("the Jobs stream never ends; qed") { - FromJobCommand::Spawn(name, task) => ctx.spawn(name, task), - FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task), - }; - - if let Err(e) = res { - gum::warn!(err = ?e, "failed to handle command from job"); - } - } - complete => break, - } - } - } -} - -impl Subsystem for JobSubsystem -where - Spawner: overseer::gen::Spawner + Clone + Unpin + 'static, - Context: SubsystemContext< - Message = Job::ToJob, - Signal = OverseerSignal, - OutgoingMessages = ::OutgoingMessages, - Sender = ::Sender, - >, - Job: 'static + JobTrait + Send, - Job::RunArgs: Clone + Sync, - ::ToJob: Sync + From<::Message>, - Job::Metrics: Sync, -{ - fn start(self, ctx: Context) -> SpawnedSubsystem { - let future = Box::pin(async move { - self.run(ctx).await; - Ok(()) - }); - - SpawnedSubsystem { name: Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME), future } - } -} diff --git a/polkadot/node/subsystem-util/src/tests.rs b/polkadot/node/subsystem-util/src/tests.rs index fde6d375d6..e3bae035d7 100644 --- a/polkadot/node/subsystem-util/src/tests.rs +++ b/polkadot/node/subsystem-util/src/tests.rs @@ -17,209 +17,16 @@ #![cfg(test)] use super::*; -use assert_matches::assert_matches; use executor::block_on; -use futures::{channel::mpsc, executor, future, Future, FutureExt, SinkExt, StreamExt}; -use polkadot_node_jaeger as jaeger; -use polkadot_node_subsystem::{ - messages::{AllMessages, CollatorProtocolMessage}, - ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal, SpawnedSubsystem, -}; -use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context}; -use polkadot_primitives::v2::Hash; -use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash, AlwaysZeroRng}; +use futures::{channel::mpsc, executor, FutureExt, SinkExt, StreamExt}; +use polkadot_primitives_test_helpers::AlwaysZeroRng; use std::{ - pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, time::Duration, }; -use thiserror::Error; - -// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do; -// you can leave the subsystem itself to the job manager. - -// for purposes of demonstration, we're going to whip up a fake subsystem. -// this will 'select' candidates which are pre-loaded in the job - -// job structs are constructed within JobTrait::run -// most will want to retain the sender and receiver, as well as whatever other data they like -struct FakeCollatorProtocolJob { - receiver: mpsc::Receiver, - _phantom: std::marker::PhantomData, -} - -// Error will mostly be a wrapper to make the try operator more convenient; -// deriving From implementations for most variants is recommended. -// It must implement Debug for logging. -#[derive(Debug, Error)] -enum Error { - #[error(transparent)] - Sending(#[from] mpsc::SendError), -} - -impl JobTrait for FakeCollatorProtocolJob -where - Sender: overseer::CollatorProtocolSenderTrait - + std::marker::Unpin - + overseer::SubsystemSender, - JobSender: overseer::CollatorProtocolSenderTrait - + std::marker::Unpin - + overseer::SubsystemSender, -{ - type ToJob = CollatorProtocolMessage; - type OutgoingMessages = overseer::CollatorProtocolOutgoingMessages; - type Sender = Sender; - type Error = Error; - type RunArgs = bool; - type Metrics = (); - - const NAME: &'static str = "fake-collator-protocol-job"; - - /// Run a job for the parent block indicated - // - // this function is in charge of creating and executing the job's main loop - fn run( - _: ActivatedLeaf, - run_args: Self::RunArgs, - _metrics: Self::Metrics, - receiver: mpsc::Receiver, - mut sender: JobSender, - ) -> Pin> + Send>> { - async move { - let job = - FakeCollatorProtocolJob { receiver, _phantom: std::marker::PhantomData:: }; - - if run_args { - sender - .send_message(CollatorProtocolMessage::Invalid( - dummy_hash(), - dummy_candidate_receipt(dummy_hash()), - )) - .await; - } - - // it isn't necessary to break run_loop into its own function, - // but it's convenient to separate the concerns in this way - job.run_loop().await - } - .boxed() - } -} - -impl FakeCollatorProtocolJob -where - Sender: overseer::CollatorProtocolSenderTrait, -{ - async fn run_loop(mut self) -> Result<(), Error> { - loop { - match self.receiver.next().await { - Some(_csm) => { - unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet"); - }, - None => break, - } - } - - Ok(()) - } -} - -// with the job defined, it's straightforward to get a subsystem implementation. -type FakeCollatorProtocolSubsystem = - JobSubsystem, Spawner>; - -// this type lets us pretend to be the overseer -type OverseerHandle = test_helpers::TestSubsystemContextHandle; - -fn test_harness>(run_args: bool, test: impl FnOnce(OverseerHandle) -> T) { - let _ = env_logger::builder() - .is_test(true) - .filter(None, log::LevelFilter::Trace) - .try_init(); - - let pool = sp_core::testing::TaskExecutor::new(); - let (context, overseer_handle) = make_subsystem_context(pool.clone()); - - let subsystem = - FakeCollatorProtocolSubsystem::new(overseer::SpawnGlue(pool), run_args, ()).run(context); - let test_future = test(overseer_handle); - - futures::pin_mut!(subsystem, test_future); - - executor::block_on(async move { - future::join(subsystem, test_future) - .timeout(Duration::from_secs(2)) - .await - .expect("test timed out instead of completing") - }); -} - -#[test] -fn starting_and_stopping_job_works() { - let relay_parent: Hash = [0; 32].into(); - - test_harness(true, |mut overseer_handle| async move { - overseer_handle - .send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: relay_parent, - number: 1, - status: LeafStatus::Fresh, - span: Arc::new(jaeger::Span::Disabled), - }), - ))) - .await; - assert_matches!(overseer_handle.recv().await, AllMessages::CollatorProtocol(_)); - overseer_handle - .send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::stop_work(relay_parent), - ))) - .await; - - overseer_handle.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; - }); -} - -#[test] -fn sending_to_a_non_running_job_do_not_stop_the_subsystem() { - let relay_parent = Hash::repeat_byte(0x01); - - test_harness(true, |mut overseer_handle| async move { - overseer_handle - .send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(ActivatedLeaf { - hash: relay_parent, - number: 1, - status: LeafStatus::Fresh, - span: Arc::new(jaeger::Span::Disabled), - }), - ))) - .await; - - // send to a non running job - overseer_handle - .send(FromOrchestra::Communication { msg: Default::default() }) - .await; - - // the subsystem is still alive - assert_matches!(overseer_handle.recv().await, AllMessages::CollatorProtocol(_)); - - overseer_handle.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; - }); -} - -#[test] -fn test_subsystem_impl_and_name_derivation() { - let pool = sp_core::testing::TaskExecutor::new(); - let (context, _) = make_subsystem_context::(pool.clone()); - - let SpawnedSubsystem { name, .. } = - FakeCollatorProtocolSubsystem::new(overseer::SpawnGlue(pool), false, ()).start(context); - assert_eq!(name, "fake-collator-protocol"); -} #[test] fn tick_tack_metronome() {