* Initial commit

* Licenses, spaces, docs

* Add a spawner

* Watch spawned subsystems with a FuturesUnordered

* Move the types around a bit

* Suggested fixes by Max

* Add a handler to talk to the Overseer

* FromOverseer and ToOverseer msgs and stopping

* Docs and return errors

* Dont broadcast, have add a from field to messages

* Allow communication between subsystems and outside world

* A message with a oneshot to send result example

* Remove leftover can_recv_msg

* Remove from field from messages

* Dont be generic over stuff

* Gather messages with StreamUnordered

* Fix comments and formatting

* More docs fixes and an example

* Apply suggestions from code review

Co-authored-by: Robert Habermeier <rphmeier@gmail.com>

* Fixes from review

Move function from impl block.
Do not panic but resolve with errors if spawner fails or subsystem
resolves.

* Dropping a handler results in a flaky test

Co-authored-by: Robert Habermeier <rphmeier@gmail.com>
This commit is contained in:
Fedor Sakharov
2020-06-02 15:36:20 +03:00
committed by GitHub
parent 03576b3707
commit fa46a92353
5 changed files with 962 additions and 0 deletions
+41
View File
@@ -1144,6 +1144,21 @@ dependencies = [
"libc",
]
[[package]]
name = "femme"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b6b21baebbed15551f2170010ca4101b9ed3fdc05822791c8bd4631840eab81"
dependencies = [
"cfg-if",
"js-sys",
"log 0.4.8",
"serde",
"serde_derive",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "file-per-thread-logger"
version = "0.1.3"
@@ -3284,6 +3299,18 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
[[package]]
name = "overseer"
version = "0.1.0"
dependencies = [
"femme",
"futures 0.3.5",
"futures-timer 3.0.2",
"kv-log-macro",
"log 0.4.8",
"streamunordered",
]
[[package]]
name = "owning_ref"
version = "0.4.1"
@@ -7183,6 +7210,18 @@ dependencies = [
"generic-array",
]
[[package]]
name = "streamunordered"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9394ee1338fee8370bee649f8a7170b3a56917903a0956467ad192dcf8699ca"
dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"slab",
]
[[package]]
name = "string"
version = "0.2.1"
@@ -8347,6 +8386,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c7d40d09cdbf0f4895ae58cf57d92e1e57a9dd8ed2e8390514b54a47cc5551"
dependencies = [
"cfg-if",
"serde",
"serde_json",
"wasm-bindgen-macro",
]
+1
View File
@@ -27,6 +27,7 @@ members = [
"erasure-coding",
"network",
"network/test",
"overseer",
"primitives",
"runtime/common",
"runtime/polkadot",
+18
View File
@@ -0,0 +1,18 @@
[package]
name = "overseer"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
[dev-dependencies]
futures = { version = "0.3.5", features = ["thread-pool"] }
futures-timer = "3.0.2"
femme = "2.0.1"
log = "0.4.8"
kv-log-macro = "1.0.6"
@@ -0,0 +1,134 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Shows a basic usage of the `Overseer`:
//! * Spawning subsystems and subsystem child jobs
//! * Establishing message passing
use std::time::Duration;
use futures::{
pending, pin_mut, executor, select, stream,
FutureExt, StreamExt,
};
use futures_timer::Delay;
use kv_log_macro as log;
use overseer::{
AllMessages, CandidateBackingSubsystemMessage, FromOverseer,
Overseer, Subsystem, SubsystemContext, SpawnedSubsystem, ValidationSubsystemMessage,
};
struct Subsystem1;
impl Subsystem1 {
async fn run(mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
if let FromOverseer::Communication { msg } = msg {
log::info!("msg {:?}", msg);
}
continue;
}
Ok(None) => (),
Err(_) => {
log::info!("exiting");
return;
}
}
Delay::new(Duration::from_secs(1)).await;
ctx.send_msg(AllMessages::Validation(
ValidationSubsystemMessage::ValidityAttestation
)).await.unwrap();
}
}
}
impl Subsystem<CandidateBackingSubsystemMessage> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
}
}
struct Subsystem2;
impl Subsystem2 {
async fn run(mut ctx: SubsystemContext<ValidationSubsystemMessage>) {
ctx.spawn(Box::pin(async {
loop {
log::info!("Job tick");
Delay::new(Duration::from_secs(1)).await;
}
})).await.unwrap();
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
log::info!("Subsystem2 received message {:?}", msg);
continue;
}
Ok(None) => { pending!(); }
Err(_) => {
log::info!("exiting");
return;
},
}
}
}
}
impl Subsystem<ValidationSubsystemMessage> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
}
}
fn main() {
femme::with_level(femme::LevelFilter::Trace);
let spawner = executor::ThreadPool::new().unwrap();
futures::executor::block_on(async {
let timer_stream = stream::repeat(()).then(|_| async {
Delay::new(Duration::from_secs(1)).await;
});
let (overseer, _handler) = Overseer::new(
Box::new(Subsystem2),
Box::new(Subsystem1),
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;
pin_mut!(timer_stream);
pin_mut!(overseer_fut);
loop {
select! {
_ = overseer_fut => break,
_ = timer_stream.next() => {
log::info!("tick");
}
complete => break,
}
}
});
}
+768
View File
@@ -0,0 +1,768 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! # Overseer
//!
//! `overseer` implements the Overseer architecture described in the
//! [implementors-guide](https://github.com/paritytech/polkadot/blob/master/roadmap/implementors-guide/guide.md).
//! For the motivations behind implementing the overseer itself you should
//! check out that guide, documentation in this crate will be mostly discussing
//! technical stuff.
//!
//! An `Overseer` is something that allows spawning/stopping and overseing
//! asynchronous tasks as well as establishing a well-defined and easy to use
//! protocol that the tasks can use to communicate with each other. It is desired
//! that this protocol is the only way tasks communicate with each other, however
//! at this moment there are no foolproof guards against other ways of communication.
//!
//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
//! share the same behavior from `Overseer`'s point of view.
//!
//! ```text
//! +-----------------------------+
//! | Overseer |
//! +-----------------------------+
//!
//! ................| Overseer "holds" these and uses |..............
//! . them to (re)start things .
//! . .
//! . +-------------------+ +---------------------+ .
//! . | Subsystem1 | | Subsystem2 | .
//! . +-------------------+ +---------------------+ .
//! . | | .
//! ..................................................................
//! | |
//! start() start()
//! V V
//! ..................| Overseer "runs" these |.......................
//! . +--------------------+ +---------------------+ .
//! . | SubsystemInstance1 | | SubsystemInstance2 | .
//! . +--------------------+ +---------------------+ .
//! ..................................................................
//! ```
use std::fmt::Debug;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use futures::channel::{mpsc, oneshot};
use futures::{
pending, poll, select,
future::{BoxFuture, RemoteHandle},
stream::FuturesUnordered,
task::{Spawn, SpawnError, SpawnExt},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use streamunordered::{StreamYield, StreamUnordered};
/// An error type that describes faults that may happen
///
/// These are:
/// * Channels being closed
/// * Subsystems dying when they are not expected to
/// * Subsystems not dying when they are told to die
/// * etc.
#[derive(Debug)]
pub struct SubsystemError;
impl From<mpsc::SendError> for SubsystemError {
fn from(_: mpsc::SendError) -> Self {
Self
}
}
impl From<oneshot::Canceled> for SubsystemError {
fn from(_: oneshot::Canceled) -> Self {
Self
}
}
impl From<SpawnError> for SubsystemError {
fn from(_: SpawnError) -> Self {
Self
}
}
/// A `Result` type that wraps [`SubsystemError`].
///
/// [`SubsystemError`]: struct.SubsystemError.html
pub type SubsystemResult<T> = Result<T, SubsystemError>;
/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`].
///
/// In essence it's just a newtype wrapping a `BoxFuture`.
///
/// [`Overseer`]: struct.Overseer.html
pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>);
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
// A graceful `Overseer` teardown time delay.
const STOP_DELAY: u64 = 1;
/// A type of messages that are sent from [`Subsystem`] to [`Overseer`].
///
/// It wraps a system-wide [`AllMessages`] type that represents all possible
/// messages in the system.
///
/// [`AllMessages`]: enum.AllMessages.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`Overseer`]: struct.Overseer.html
enum ToOverseer {
/// This is a message sent by a `Subsystem`.
SubsystemMessage(AllMessages),
/// A message that wraps something the `Subsystem` is desiring to
/// spawn on the overseer and a `oneshot::Sender` to signal the result
/// of the spawn.
SpawnJob {
s: BoxFuture<'static, ()>,
res: oneshot::Sender<SubsystemResult<()>>,
},
}
/// Some event from outer world.
enum Event {
BlockImport,
BlockFinalized,
MsgToSubsystem(AllMessages),
Stop,
}
/// Some message that is sent from one of the `Subsystem`s to the outside world.
pub enum OutboundMessage {
SubsystemMessage {
msg: AllMessages,
}
}
/// A handler used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
pub struct OverseerHandler {
events_tx: mpsc::Sender<Event>,
}
impl OverseerHandler {
/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImport).await?;
Ok(())
}
/// Send some message to one of the `Subsystem`s.
pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.events_tx.send(Event::MsgToSubsystem(msg)).await?;
Ok(())
}
/// Inform the `Overseer` that that some block was finalized.
pub async fn block_finalized(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockFinalized).await?;
Ok(())
}
/// Tell `Overseer` to shutdown.
pub async fn stop(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::Stop).await?;
Ok(())
}
}
impl Debug for ToOverseer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ToOverseer::SubsystemMessage(msg) => {
write!(f, "OverseerMessage::SubsystemMessage({:?})", msg)
}
ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)")
}
}
}
/// A running instance of some [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M: Debug> {
tx: mpsc::Sender<FromOverseer<M>>,
}
/// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or to spawn it's [`SubsystemJob`]s.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
pub struct SubsystemContext<M: Debug>{
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::Sender<ToOverseer>,
}
/// A signal used by [`Overseer`] to communicate with the [`Subsystem`]s.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
#[derive(Debug)]
pub enum OverseerSignal {
/// `Subsystem` should start working.
StartWork,
/// `Subsystem` should stop working.
StopWork,
/// Conclude the work of the `Overseer` and all `Subsystem`s.
Conclude,
}
#[derive(Debug)]
/// A message type used by the Validation [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
pub enum ValidationSubsystemMessage {
ValidityAttestation,
}
#[derive(Debug)]
/// A message type used by the CandidateBacking [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
pub enum CandidateBackingSubsystemMessage {
RegisterBackingWatcher,
Second,
}
/// A message type tying together all message types that are used across [`Subsystem`]s.
///
/// [`Subsystem`]: trait.Subsystem.html
#[derive(Debug)]
pub enum AllMessages {
Validation(ValidationSubsystemMessage),
CandidateBacking(CandidateBackingSubsystemMessage),
}
/// A message type that a [`Subsystem`] receives from the [`Overseer`].
/// It wraps siglans from the [`Overseer`] and messages that are circulating
/// between subsystems.
///
/// It is generic over over the message type `M` that a particular `Subsystem` may use.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
#[derive(Debug)]
pub enum FromOverseer<M: Debug> {
/// Signal from the `Overseer`.
Signal(OverseerSignal),
/// Some other `Subsystem`'s message.
Communication {
msg: M,
},
}
impl<M: Debug> SubsystemContext<M> {
/// Try to asyncronously receive a message.
///
/// This has to be used with caution, if you loop over this without
/// using `pending!()` macro you will end up with a busy loop!
pub async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
match poll!(self.rx.next()) {
Poll::Ready(Some(msg)) => Ok(Some(msg)),
Poll::Ready(None) => Err(()),
Poll::Pending => Ok(None),
}
}
/// Receive a message.
pub async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
self.rx.next().await.ok_or(SubsystemError)
}
/// Spawn a child task on the executor.
pub async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ToOverseer::SpawnJob {
s,
res: tx,
}).await?;
rx.await?
}
/// Send a direct message to some other `Subsystem`, routed based on message type.
pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage(msg)).await?;
Ok(())
}
fn new(rx: mpsc::Receiver<FromOverseer<M>>, tx: mpsc::Sender<ToOverseer>) -> Self {
Self {
rx,
tx,
}
}
}
/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
///
/// It is generic over the message type circulating in the system.
/// The idea that we want some type contaning persistent state that
/// can spawn actually running subsystems when asked to.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
pub trait Subsystem<M: Debug> {
/// Start this `Subsystem` and return `SpawnedSubsystem`.
fn start(&mut self, ctx: SubsystemContext<M>) -> SpawnedSubsystem;
}
/// A subsystem that we oversee.
///
/// Ties together the [`Subsystem`] itself and it's running instance
/// (which may be missing if the [`Subsystem`] is not running at the moment
/// for whatever reason).
///
/// [`Subsystem`]: trait.Subsystem.html
#[allow(dead_code)]
struct OverseenSubsystem<M: Debug> {
subsystem: Box<dyn Subsystem<M> + Send>,
instance: Option<SubsystemInstance<M>>,
}
/// The `Overseer` itself.
pub struct Overseer<S: Spawn> {
/// A validation subsystem
validation_subsystem: OverseenSubsystem<ValidationSubsystemMessage>,
/// A candidate backing subsystem
candidate_backing_subsystem: OverseenSubsystem<CandidateBackingSubsystemMessage>,
/// Spawner to spawn tasks to.
s: S,
/// Here we keep handles to spawned subsystems to be notified when they terminate.
running_subsystems: FuturesUnordered<RemoteHandle<()>>,
/// Gather running subsystms' outbound streams into one.
running_subsystems_rx: StreamUnordered<mpsc::Receiver<ToOverseer>>,
/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,
}
impl<S> Overseer<S>
where
S: Spawn,
{
/// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s.
///
/// Each [`Subsystem`] is passed to this function as an explicit parameter
/// and is supposed to implement some interface that is generic over message type
/// that is specific to this [`Subsystem`]. At the moment there are only two
/// subsystems:
/// * Validation
/// * CandidateBacking
///
/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows
/// mocking in the test code:
///
/// ```text
/// +------------------------------------+
/// | Overseer |
/// +------------------------------------+
/// / | | \
/// ................. subsystems...................................
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// . | | | | | | | | .
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// ...............................................................
/// |
/// probably `spawn`
/// a `job`
/// |
/// V
/// +-----------+
/// | |
/// +-----------+
///
/// ```
///
/// [`Subsystem`]: trait.Subsystem.html
///
/// # Example
///
/// The [`Subsystems`] may be any type as long as they implement an expected interface.
/// Here, we create two mock subsystems and start the `Overseer` with them. For the sake
/// of simplicity the termination of the example is done with a timeout.
/// ```
/// # use std::time::Duration;
/// # use futures::{executor, pin_mut, select, FutureExt};
/// # use futures_timer::Delay;
/// # use overseer::{
/// # Overseer, Subsystem, SpawnedSubsystem, SubsystemContext,
/// # ValidationSubsystemMessage, CandidateBackingSubsystemMessage,
/// # };
///
/// struct ValidationSubsystem;
/// impl Subsystem<ValidationSubsystemMessage> for ValidationSubsystem {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<ValidationSubsystemMessage>,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
/// Delay::new(Duration::from_secs(1)).await;
/// }
/// }))
/// }
/// }
///
/// struct CandidateBackingSubsystem;
/// impl Subsystem<CandidateBackingSubsystemMessage> for CandidateBackingSubsystem {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
/// Delay::new(Duration::from_secs(1)).await;
/// }
/// }))
/// }
/// }
///
/// # fn main() { executor::block_on(async move {
/// let spawner = executor::ThreadPool::new().unwrap();
/// let (overseer, _handler) = Overseer::new(
/// Box::new(ValidationSubsystem),
/// Box::new(CandidateBackingSubsystem),
/// spawner,
/// ).unwrap();
///
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
///
/// let overseer_fut = overseer.run().fuse();
/// pin_mut!(timer);
/// pin_mut!(overseer_fut);
///
/// select! {
/// _ = overseer_fut => (),
/// _ = timer => (),
/// }
/// #
/// # }); }
/// ```
pub fn new(
validation: Box<dyn Subsystem<ValidationSubsystemMessage> + Send>,
candidate_backing: Box<dyn Subsystem<CandidateBackingSubsystemMessage> + Send>,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
let handler = OverseerHandler {
events_tx: events_tx.clone(),
};
let mut running_subsystems_rx = StreamUnordered::new();
let mut running_subsystems = FuturesUnordered::new();
let validation_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
validation,
)?;
let candidate_backing_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
candidate_backing,
)?;
let this = Self {
validation_subsystem,
candidate_backing_subsystem,
s,
running_subsystems,
running_subsystems_rx,
events_rx,
};
Ok((this, handler))
}
// Stop the overseer.
async fn stop(mut self) {
if let Some(ref mut s) = self.validation_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.candidate_backing_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
loop {
select! {
_ = self.running_subsystems.next() => {
if self.running_subsystems.is_empty() {
break;
}
},
_ = stop_delay => break,
complete => break,
}
}
}
/// Run the `Overseer`.
pub async fn run(mut self) -> SubsystemResult<()> {
loop {
while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg).await;
}
Event::Stop => {
self.stop().await;
return Ok(());
}
_ => ()
}
}
while let Poll::Ready(Some((StreamYield::Item(msg), _))) = poll!(
&mut self.running_subsystems_rx.next()
) {
match msg {
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await,
ToOverseer::SpawnJob { s, res } => {
let s = self.spawn_job(s);
let _ = res.send(s);
}
}
}
// Some subsystem exited? It's time to panic.
if let Poll::Ready(Some(finished)) = poll!(self.running_subsystems.next()) {
log::error!("Subsystem finished unexpectedly {:?}", finished);
self.stop().await;
return Err(SubsystemError);
}
// Looks like nothing is left to be polled, let's take a break.
pending!();
}
}
async fn route_message(&mut self, msg: AllMessages) {
match msg {
AllMessages::Validation(msg) => {
if let Some(ref mut s) = self.validation_subsystem.instance {
let _= s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::CandidateBacking(msg) => {
if let Some(ref mut s) = self.candidate_backing_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
}
}
fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> {
self.s.spawn(j).map_err(|_| SubsystemError)
}
}
fn spawn<S: Spawn, M: Debug>(
spawner: &mut S,
futures: &mut FuturesUnordered<RemoteHandle<()>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
mut s: Box<dyn Subsystem<M> + Send>,
) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = SubsystemContext::new(to_rx, from_tx);
let f = s.start(ctx);
let handle = spawner.spawn_with_handle(f.0)?;
streams.push(from_rx);
futures.push(handle);
let instance = Some(SubsystemInstance {
tx: to_tx,
});
Ok(OverseenSubsystem {
subsystem: s,
instance,
})
}
#[cfg(test)]
mod tests {
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};
use super::*;
struct TestSubsystem1(mpsc::Sender<usize>);
impl Subsystem<ValidationSubsystemMessage> for TestSubsystem1 {
fn start(&mut self, mut ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
let mut i = 0;
loop {
match ctx.recv().await {
Ok(FromOverseer::Communication { .. }) => {
let _ = sender.send(i).await;
i += 1;
continue;
}
Ok(FromOverseer::Signal(OverseerSignal::StopWork)) => return,
Err(_) => return,
_ => (),
}
}
}))
}
}
struct TestSubsystem2(mpsc::Sender<usize>);
impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem2 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
let mut c: usize = 0;
loop {
if c < 10 {
ctx.send_msg(
AllMessages::Validation(
ValidationSubsystemMessage::ValidityAttestation
)
).await.unwrap();
c += 1;
continue;
}
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork))) => {
break;
}
Ok(Some(_)) => {
continue;
}
Err(_) => return,
_ => (),
}
pending!();
}
}))
}
}
struct TestSubsystem4;
impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem4 {
fn start(&mut self, mut _ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
// Do nothing and exit.
}))
}
}
// Checks that a minimal configuration of two jobs can run and exchange messages.
#[test]
fn overseer_works() {
let spawner = executor::ThreadPool::new().unwrap();
executor::block_on(async move {
let (s1_tx, mut s1_rx) = mpsc::channel(64);
let (s2_tx, mut s2_rx) = mpsc::channel(64);
let (overseer, mut handler) = Overseer::new(
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem2(s2_tx)),
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
let mut s1_results = Vec::new();
let mut s2_results = Vec::new();
loop {
select! {
a = overseer_fut => break,
s1_next = s1_rx.next() => {
match s1_next {
Some(msg) => {
s1_results.push(msg);
if s1_results.len() == 10 {
handler.stop().await.unwrap();
}
}
None => break,
}
},
s2_next = s2_rx.next() => {
match s2_next {
Some(msg) => s2_results.push(s2_next),
None => break,
}
},
complete => break,
}
}
assert_eq!(s1_results, (0..10).collect::<Vec<_>>());
});
}
// Spawn a subsystem that immediately exits.
//
// Should immediately conclude the overseer itself with an error.
#[test]
fn overseer_panics_on_sybsystem_exit() {
let spawner = executor::ThreadPool::new().unwrap();
executor::block_on(async move {
let (s1_tx, _) = mpsc::channel(64);
let (overseer, _handle) = Overseer::new(
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem4),
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
select! {
res = overseer_fut => assert!(res.is_err()),
complete => (),
}
})
}
}