Integrate all (dummy) subsystems with the Overseer (#1374)

* overseer: introduce a utility typemap

* it's ugly but it compiles

* move DummySubsystem to subsystem crate

* fix tests fallout

* use a struct for all subsystems

* more tests fallout

* add missing pov_distribution subsystem

* remove unused imports and bounds

* fix minimal-example
This commit is contained in:
Andronik Ordian
2020-07-09 22:25:40 +02:00
committed by GitHub
parent c119627835
commit 6957847b6b
5 changed files with 425 additions and 106 deletions
@@ -28,11 +28,14 @@ use futures_timer::Delay;
use kv_log_macro as log; use kv_log_macro as log;
use polkadot_primitives::parachain::{BlockData, PoVBlock}; use polkadot_primitives::parachain::{BlockData, PoVBlock};
use polkadot_overseer::Overseer; use polkadot_overseer::{Overseer, AllSubsystems};
use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer}; use polkadot_subsystem::{
Subsystem, SubsystemContext, DummySubsystem,
SpawnedSubsystem, FromOverseer,
};
use polkadot_subsystem::messages::{ use polkadot_subsystem::messages::{
AllMessages, CandidateBackingMessage, CandidateValidationMessage CandidateValidationMessage, CandidateBackingMessage, AllMessages,
}; };
struct Subsystem1; struct Subsystem1;
@@ -128,10 +131,22 @@ fn main() {
Delay::new(Duration::from_secs(1)).await; Delay::new(Duration::from_secs(1)).await;
}); });
let all_subsystems = AllSubsystems {
candidate_validation: Subsystem2,
candidate_backing: Subsystem1,
candidate_selection: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_distribution: DummySubsystem,
provisioner: DummySubsystem,
pov_distribution: DummySubsystem,
runtime_api: DummySubsystem,
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
};
let (overseer, _handler) = Overseer::new( let (overseer, _handler) = Overseer::new(
vec![], vec![],
Subsystem2, all_subsystems,
Subsystem1,
spawner, spawner,
).unwrap(); ).unwrap();
let overseer_fut = overseer.run().fuse(); let overseer_fut = overseer.run().fuse();
+361 -62
View File
@@ -76,7 +76,11 @@ use polkadot_primitives::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_subsystem::messages::{ use polkadot_subsystem::messages::{
CandidateValidationMessage, CandidateBackingMessage, AllMessages CandidateValidationMessage, CandidateBackingMessage,
CandidateSelectionMessage, StatementDistributionMessage,
AvailabilityDistributionMessage, BitfieldDistributionMessage,
ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage,
AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages,
}; };
pub use polkadot_subsystem::{ pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
@@ -319,12 +323,40 @@ struct OverseenSubsystem<M> {
/// The `Overseer` itself. /// The `Overseer` itself.
pub struct Overseer<S: Spawn> { pub struct Overseer<S: Spawn> {
/// A validation subsystem /// A candidate validation subsystem.
validation_subsystem: OverseenSubsystem<CandidateValidationMessage>, candidate_validation_subsystem: OverseenSubsystem<CandidateValidationMessage>,
/// A candidate backing subsystem /// A candidate backing subsystem.
candidate_backing_subsystem: OverseenSubsystem<CandidateBackingMessage>, candidate_backing_subsystem: OverseenSubsystem<CandidateBackingMessage>,
/// A candidate selection subsystem.
candidate_selection_subsystem: OverseenSubsystem<CandidateSelectionMessage>,
/// A statement distribution subsystem.
statement_distribution_subsystem: OverseenSubsystem<StatementDistributionMessage>,
/// An availability distribution subsystem.
availability_distribution_subsystem: OverseenSubsystem<AvailabilityDistributionMessage>,
/// A bitfield distribution subsystem.
bitfield_distribution_subsystem: OverseenSubsystem<BitfieldDistributionMessage>,
/// A provisioner subsystem.
provisioner_subsystem: OverseenSubsystem<ProvisionerMessage>,
/// A PoV distribution subsystem.
pov_distribution_subsystem: OverseenSubsystem<PoVDistributionMessage>,
/// A runtime API subsystem.
runtime_api_subsystem: OverseenSubsystem<RuntimeApiMessage>,
/// An availability store subsystem.
availability_store_subsystem: OverseenSubsystem<AvailabilityStoreMessage>,
/// A network bridge subsystem.
network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>,
/// Spawner to spawn tasks to. /// Spawner to spawn tasks to.
s: S, s: S,
@@ -346,22 +378,48 @@ pub struct Overseer<S: Spawn> {
active_leaves: HashSet<(Hash, BlockNumber)>, active_leaves: HashSet<(Hash, BlockNumber)>,
} }
/// This struct is passed as an argument to create a new instance of an [`Overseer`].
///
/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows
/// mocking in the test code:
///
/// Each [`Subsystem`] is supposed to implement some interface that is generic over
/// message type that is specific to this [`Subsystem`]. At the moment not all
/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
/// [`DummySubsystem`]: struct.DummySubsystem.html
pub struct AllSubsystems<CV, CB, CS, SD, AD, BD, P, PoVD, RA, AS, NB> {
/// A candidate validation subsystem.
pub candidate_validation: CV,
/// A candidate backing subsystem.
pub candidate_backing: CB,
/// A candidate selection subsystem.
pub candidate_selection: CS,
/// A statement distribution subsystem.
pub statement_distribution: SD,
/// An availability distribution subsystem.
pub availability_distribution: AD,
/// A bitfield distribution subsystem.
pub bitfield_distribution: BD,
/// A provisioner subsystem.
pub provisioner: P,
/// A PoV distribution subsystem.
pub pov_distribution: PoVD,
/// A runtime API subsystem.
pub runtime_api: RA,
/// An availability store subsystem.
pub availability_store: AS,
/// A network bridge subsystem.
pub network_bridge: NB,
}
impl<S> Overseer<S> impl<S> Overseer<S>
where where
S: Spawn, S: Spawn,
{ {
/// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s. /// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s.
/// ///
/// Each [`Subsystem`] is passed to this function as an explicit parameter
/// and is supposed to implement some interface that is generic over message type
/// that is specific to this [`Subsystem`]. At the moment there are only two
/// subsystems:
/// * Validation
/// * CandidateBacking
///
/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows
/// mocking in the test code:
///
/// ```text /// ```text
/// +------------------------------------+ /// +------------------------------------+
/// | Overseer | /// | Overseer |
@@ -388,16 +446,16 @@ where
/// # Example /// # Example
/// ///
/// The [`Subsystems`] may be any type as long as they implement an expected interface. /// The [`Subsystems`] may be any type as long as they implement an expected interface.
/// Here, we create two mock subsystems and start the `Overseer` with them. For the sake /// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them.
/// of simplicity the termination of the example is done with a timeout. /// For the sake of simplicity the termination of the example is done with a timeout.
/// ``` /// ```
/// # use std::time::Duration; /// # use std::time::Duration;
/// # use futures::{executor, pin_mut, select, FutureExt}; /// # use futures::{executor, pin_mut, select, FutureExt};
/// # use futures_timer::Delay; /// # use futures_timer::Delay;
/// # use polkadot_overseer::Overseer; /// # use polkadot_overseer::{Overseer, AllSubsystems};
/// # use polkadot_subsystem::{ /// # use polkadot_subsystem::{
/// # Subsystem, SpawnedSubsystem, SubsystemContext, /// # Subsystem, DummySubsystem, SpawnedSubsystem, SubsystemContext,
/// # messages::{CandidateValidationMessage, CandidateBackingMessage}, /// # messages::CandidateValidationMessage,
/// # }; /// # };
/// ///
/// struct ValidationSubsystem; /// struct ValidationSubsystem;
@@ -417,28 +475,24 @@ where
/// } /// }
/// } /// }
/// ///
/// struct CandidateBackingSubsystem;
/// impl<C> Subsystem<C> for CandidateBackingSubsystem
/// where C: SubsystemContext<Message=CandidateBackingMessage>
/// {
/// fn start(
/// self,
/// mut ctx: C,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
/// Delay::new(Duration::from_secs(1)).await;
/// }
/// }))
/// }
/// }
///
/// # fn main() { executor::block_on(async move { /// # fn main() { executor::block_on(async move {
/// let spawner = executor::ThreadPool::new().unwrap(); /// let spawner = executor::ThreadPool::new().unwrap();
/// let all_subsystems = AllSubsystems {
/// candidate_validation: ValidationSubsystem,
/// candidate_backing: DummySubsystem,
/// candidate_selection: DummySubsystem,
/// statement_distribution: DummySubsystem,
/// availability_distribution: DummySubsystem,
/// bitfield_distribution: DummySubsystem,
/// provisioner: DummySubsystem,
/// pov_distribution: DummySubsystem,
/// runtime_api: DummySubsystem,
/// availability_store: DummySubsystem,
/// network_bridge: DummySubsystem,
/// };
/// let (overseer, _handler) = Overseer::new( /// let (overseer, _handler) = Overseer::new(
/// vec![], /// vec![],
/// ValidationSubsystem, /// all_subsystems,
/// CandidateBackingSubsystem,
/// spawner, /// spawner,
/// ).unwrap(); /// ).unwrap();
/// ///
@@ -455,12 +509,24 @@ where
/// # /// #
/// # }); } /// # }); }
/// ``` /// ```
pub fn new( pub fn new<CV, CB, CS, SD, AD, BD, P, PoVD, RA, AS, NB>(
leaves: impl IntoIterator<Item = BlockInfo>, leaves: impl IntoIterator<Item = BlockInfo>,
validation: impl Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send, all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BD, P, PoVD, RA, AS, NB>,
candidate_backing: impl Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
mut s: S, mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)> { ) -> SubsystemResult<(Self, OverseerHandler)>
where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send,
SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send,
AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send,
BD: Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>> + Send,
P: Subsystem<OverseerSubsystemContext<ProvisionerMessage>> + Send,
PoVD: Subsystem<OverseerSubsystemContext<PoVDistributionMessage>> + Send,
RA: Subsystem<OverseerSubsystemContext<RuntimeApiMessage>> + Send,
AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + Send,
NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + Send,
{
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
let handler = OverseerHandler { let handler = OverseerHandler {
@@ -470,18 +536,81 @@ where
let mut running_subsystems_rx = StreamUnordered::new(); let mut running_subsystems_rx = StreamUnordered::new();
let mut running_subsystems = FuturesUnordered::new(); let mut running_subsystems = FuturesUnordered::new();
let validation_subsystem = spawn( let candidate_validation_subsystem = spawn(
&mut s, &mut s,
&mut running_subsystems, &mut running_subsystems,
&mut running_subsystems_rx, &mut running_subsystems_rx,
validation, all_subsystems.candidate_validation,
)?; )?;
let candidate_backing_subsystem = spawn( let candidate_backing_subsystem = spawn(
&mut s, &mut s,
&mut running_subsystems, &mut running_subsystems,
&mut running_subsystems_rx, &mut running_subsystems_rx,
candidate_backing, all_subsystems.candidate_backing,
)?;
let candidate_selection_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.candidate_selection,
)?;
let statement_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.statement_distribution,
)?;
let availability_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.availability_distribution,
)?;
let bitfield_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.bitfield_distribution,
)?;
let provisioner_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.provisioner,
)?;
let pov_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.pov_distribution,
)?;
let runtime_api_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.runtime_api,
)?;
let availability_store_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.availability_store,
)?;
let network_bridge_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.network_bridge,
)?; )?;
let active_leaves = HashSet::new(); let active_leaves = HashSet::new();
@@ -492,8 +621,17 @@ where
.collect(); .collect();
let this = Self { let this = Self {
validation_subsystem, candidate_validation_subsystem,
candidate_backing_subsystem, candidate_backing_subsystem,
candidate_selection_subsystem,
statement_distribution_subsystem,
availability_distribution_subsystem,
bitfield_distribution_subsystem,
provisioner_subsystem,
pov_distribution_subsystem,
runtime_api_subsystem,
availability_store_subsystem,
network_bridge_subsystem,
s, s,
running_subsystems, running_subsystems,
running_subsystems_rx, running_subsystems_rx,
@@ -507,7 +645,7 @@ where
// Stop the overseer. // Stop the overseer.
async fn stop(mut self) { async fn stop(mut self) {
if let Some(ref mut s) = self.validation_subsystem.instance { if let Some(ref mut s) = self.candidate_validation_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
} }
@@ -515,6 +653,42 @@ where
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
} }
if let Some(ref mut s) = self.candidate_selection_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.availability_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.provisioner_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.pov_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.runtime_api_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.availability_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.network_bridge_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
loop { loop {
@@ -616,11 +790,47 @@ where
} }
async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
if let Some(ref mut s) = self.validation_subsystem.instance { if let Some(ref mut s) = self.candidate_validation_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?; s.tx.send(FromOverseer::Signal(signal.clone())).await?;
} }
if let Some(ref mut s) = self.candidate_backing_subsystem.instance { if let Some(ref mut s) = self.candidate_backing_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.candidate_selection_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.availability_distribution_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.provisioner_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.pov_distribution_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.runtime_api_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.availability_store_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.network_bridge_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal)).await?; s.tx.send(FromOverseer::Signal(signal)).await?;
} }
@@ -630,7 +840,7 @@ where
async fn route_message(&mut self, msg: AllMessages) { async fn route_message(&mut self, msg: AllMessages) {
match msg { match msg {
AllMessages::CandidateValidation(msg) => { AllMessages::CandidateValidation(msg) => {
if let Some(ref mut s) = self.validation_subsystem.instance { if let Some(ref mut s) = self.candidate_validation_subsystem.instance {
let _= s.tx.send(FromOverseer::Communication { msg }).await; let _= s.tx.send(FromOverseer::Communication { msg }).await;
} }
} }
@@ -639,11 +849,50 @@ where
let _ = s.tx.send(FromOverseer::Communication { msg }).await; let _ = s.tx.send(FromOverseer::Communication { msg }).await;
} }
} }
_ => { AllMessages::CandidateSelection(msg) => {
// TODO: temporary catch-all until all subsystems are integrated with overseer. if let Some(ref mut s) = self.candidate_selection_subsystem.instance {
// The overseer is not complete until this is an exhaustive match with all let _ = s.tx.send(FromOverseer::Communication { msg }).await;
// messages targeting an included subsystem. }
// https://github.com/paritytech/polkadot/issues/1317 }
AllMessages::StatementDistribution(msg) => {
if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::AvailabilityDistribution(msg) => {
if let Some(ref mut s) = self.availability_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::BitfieldDistribution(msg) => {
if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::Provisioner(msg) => {
if let Some(ref mut s) = self.provisioner_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::PoVDistribution(msg) => {
if let Some(ref mut s) = self.pov_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::RuntimeApi(msg) => {
if let Some(ref mut s) = self.runtime_api_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::AvailabilityStore(msg) => {
if let Some(ref mut s) = self.availability_store_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::NetworkBridge(msg) => {
if let Some(ref mut s) = self.network_bridge_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
} }
} }
} }
@@ -678,13 +927,16 @@ fn spawn<S: Spawn, M: Send + 'static>(
}) })
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};
use polkadot_primitives::parachain::{BlockData, PoVBlock}; use polkadot_primitives::parachain::{BlockData, PoVBlock};
use polkadot_subsystem::DummySubsystem;
use super::*; use super::*;
struct TestSubsystem1(mpsc::Sender<usize>); struct TestSubsystem1(mpsc::Sender<usize>);
impl<C> Subsystem<C> for TestSubsystem1 impl<C> Subsystem<C> for TestSubsystem1
@@ -776,10 +1028,22 @@ mod tests {
let (s1_tx, mut s1_rx) = mpsc::channel(64); let (s1_tx, mut s1_rx) = mpsc::channel(64);
let (s2_tx, mut s2_rx) = mpsc::channel(64); let (s2_tx, mut s2_rx) = mpsc::channel(64);
let all_subsystems = AllSubsystems {
candidate_validation: TestSubsystem1(s1_tx),
candidate_backing: TestSubsystem2(s2_tx),
candidate_selection: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_distribution: DummySubsystem,
provisioner: DummySubsystem,
pov_distribution: DummySubsystem,
runtime_api: DummySubsystem,
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
};
let (overseer, mut handler) = Overseer::new( let (overseer, mut handler) = Overseer::new(
vec![], vec![],
TestSubsystem1(s1_tx), all_subsystems,
TestSubsystem2(s2_tx),
spawner, spawner,
).unwrap(); ).unwrap();
let overseer_fut = overseer.run().fuse(); let overseer_fut = overseer.run().fuse();
@@ -826,10 +1090,22 @@ mod tests {
executor::block_on(async move { executor::block_on(async move {
let (s1_tx, _) = mpsc::channel(64); let (s1_tx, _) = mpsc::channel(64);
let all_subsystems = AllSubsystems {
candidate_validation: TestSubsystem1(s1_tx),
candidate_backing: TestSubsystem4,
candidate_selection: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_distribution: DummySubsystem,
provisioner: DummySubsystem,
pov_distribution: DummySubsystem,
runtime_api: DummySubsystem,
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
};
let (overseer, _handle) = Overseer::new( let (overseer, _handle) = Overseer::new(
vec![], vec![],
TestSubsystem1(s1_tx), all_subsystems,
TestSubsystem4,
spawner, spawner,
).unwrap(); ).unwrap();
let overseer_fut = overseer.run().fuse(); let overseer_fut = overseer.run().fuse();
@@ -923,11 +1199,22 @@ mod tests {
let (tx_5, mut rx_5) = mpsc::channel(64); let (tx_5, mut rx_5) = mpsc::channel(64);
let (tx_6, mut rx_6) = mpsc::channel(64); let (tx_6, mut rx_6) = mpsc::channel(64);
let all_subsystems = AllSubsystems {
candidate_validation: TestSubsystem5(tx_5),
candidate_backing: TestSubsystem6(tx_6),
candidate_selection: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_distribution: DummySubsystem,
provisioner: DummySubsystem,
pov_distribution: DummySubsystem,
runtime_api: DummySubsystem,
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
};
let (overseer, mut handler) = Overseer::new( let (overseer, mut handler) = Overseer::new(
vec![first_block], vec![first_block],
TestSubsystem5(tx_5), all_subsystems,
TestSubsystem6(tx_6),
spawner, spawner,
).unwrap(); ).unwrap();
@@ -1008,11 +1295,23 @@ mod tests {
let (tx_5, mut rx_5) = mpsc::channel(64); let (tx_5, mut rx_5) = mpsc::channel(64);
let (tx_6, mut rx_6) = mpsc::channel(64); let (tx_6, mut rx_6) = mpsc::channel(64);
let all_subsystems = AllSubsystems {
candidate_validation: TestSubsystem5(tx_5),
candidate_backing: TestSubsystem6(tx_6),
candidate_selection: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_distribution: DummySubsystem,
provisioner: DummySubsystem,
pov_distribution: DummySubsystem,
runtime_api: DummySubsystem,
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
};
// start with two forks of different height. // start with two forks of different height.
let (overseer, mut handler) = Overseer::new( let (overseer, mut handler) = Overseer::new(
vec![first_block, second_block], vec![first_block, second_block],
TestSubsystem5(tx_5), all_subsystems,
TestSubsystem6(tx_6),
spawner, spawner,
).unwrap(); ).unwrap();
+20 -33
View File
@@ -29,11 +29,8 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use sc_executor::native_executor_instance; use sc_executor::native_executor_instance;
use log::info; use log::info;
use sp_blockchain::HeaderBackend; use sp_blockchain::HeaderBackend;
use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler}; use polkadot_overseer::{self as overseer, AllSubsystems, BlockInfo, Overseer, OverseerHandler};
use polkadot_subsystem::{ use polkadot_subsystem::DummySubsystem;
Subsystem, SubsystemContext, SpawnedSubsystem,
messages::{CandidateValidationMessage, CandidateBackingMessage},
};
use polkadot_node_core_proposer::ProposerFactory; use polkadot_node_core_proposer::ProposerFactory;
use sp_trie::PrefixedMemoryDB; use sp_trie::PrefixedMemoryDB;
pub use service::{ pub use service::{
@@ -275,38 +272,28 @@ macro_rules! new_full_start {
}} }}
} }
struct CandidateValidationSubsystem;
impl<C> Subsystem<C> for CandidateValidationSubsystem
where C: SubsystemContext<Message = CandidateValidationMessage>
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {}
}))
}
}
struct CandidateBackingSubsystem;
impl<C> Subsystem<C> for CandidateBackingSubsystem
where C: SubsystemContext<Message = CandidateBackingMessage>
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {}
}))
}
}
fn real_overseer<S: futures::task::Spawn>( fn real_overseer<S: futures::task::Spawn>(
leaves: impl IntoIterator<Item = BlockInfo>, leaves: impl IntoIterator<Item = BlockInfo>,
s: S, s: S,
) -> Result<(Overseer<S>, OverseerHandler), ServiceError> { ) -> Result<(Overseer<S>, OverseerHandler), ServiceError> {
let validation = CandidateValidationSubsystem; let all_subsystems = AllSubsystems {
let candidate_backing = CandidateBackingSubsystem; candidate_validation: DummySubsystem,
Overseer::new(leaves, validation, candidate_backing, s) candidate_backing: DummySubsystem,
.map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) candidate_selection: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_distribution: DummySubsystem,
provisioner: DummySubsystem,
pov_distribution: DummySubsystem,
runtime_api: DummySubsystem,
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
};
Overseer::new(
leaves,
all_subsystems,
s,
).map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e)))
} }
/// Builds a new service for a full client. /// Builds a new service for a full client.
+18
View File
@@ -148,3 +148,21 @@ pub trait Subsystem<C: SubsystemContext> {
/// Start this `Subsystem` and return `SpawnedSubsystem`. /// Start this `Subsystem` and return `SpawnedSubsystem`.
fn start(self, ctx: C) -> SpawnedSubsystem; fn start(self, ctx: C) -> SpawnedSubsystem;
} }
/// A dummy subsystem that implements [`Subsystem`] for all
/// types of messages. Used for tests or as a placeholder.
pub struct DummySubsystem;
impl<C: SubsystemContext> Subsystem<C> for DummySubsystem {
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
loop {
match ctx.recv().await {
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
Err(_) => return,
_ => continue,
}
}
}))
}
}
+6 -6
View File
@@ -246,12 +246,12 @@ pub enum PoVDistributionMessage {
/// ///
/// This `CandidateDescriptor` should correspond to a candidate seconded under the provided /// This `CandidateDescriptor` should correspond to a candidate seconded under the provided
/// relay-parent hash. /// relay-parent hash.
FetchPoV(Hash, CandidateDescriptor, oneshot::Sender<Arc<PoVBlock>>), FetchPoV(Hash, CandidateDescriptor, oneshot::Sender<Arc<PoVBlock>>),
/// Distribute a PoV for the given relay-parent and CandidateDescriptor. /// Distribute a PoV for the given relay-parent and CandidateDescriptor.
/// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor
DistributePoV(Hash, CandidateDescriptor, Arc<PoVBlock>), DistributePoV(Hash, CandidateDescriptor, Arc<PoVBlock>),
/// An update from the network bridge. /// An update from the network bridge.
NetworkBridgeUpdate(NetworkBridgeEvent), NetworkBridgeUpdate(NetworkBridgeEvent),
} }
/// A message type tying together all message types that are used across Subsystems. /// A message type tying together all message types that are used across Subsystems.