Add some message types from subsystem definitions (#1265)

* introduce polkadot-node-primitives

* guide: change statement distribution message types

* guide: remove variant from `CandidateSelectionMessage`

* add a few more message types

* add TODOs

* Almost all messages

* NewBackedCandidate notification

* Formatting

* Use AttestedCandidate as BackedCandidate

* Update node/primitives/src/lib.rs

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Fix the tests

* Bring in types from #1242

* Adds network bridge messages

* More message types from doc

* use fn pointer type

* Fixes from the review

* Add missing Runtime subsystem message

* rename to CandidateValidationMessage and fix tests

Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
This commit is contained in:
Robert Habermeier
2020-06-17 19:49:17 -04:00
committed by GitHub
parent 87ae6e42f5
commit c226c4403d
13 changed files with 477 additions and 52 deletions
+33 -23
View File
@@ -76,7 +76,7 @@ use polkadot_primitives::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
pub use messages::{
OverseerSignal, ValidationSubsystemMessage, CandidateBackingSubsystemMessage, AllMessages,
OverseerSignal, CandidateValidationMessage, CandidateBackingMessage, AllMessages,
FromOverseer,
};
@@ -367,10 +367,10 @@ struct OverseenSubsystem<M: Debug> {
/// The `Overseer` itself.
pub struct Overseer<S: Spawn> {
/// A validation subsystem
validation_subsystem: OverseenSubsystem<ValidationSubsystemMessage>,
validation_subsystem: OverseenSubsystem<CandidateValidationMessage>,
/// A candidate backing subsystem
candidate_backing_subsystem: OverseenSubsystem<CandidateBackingSubsystemMessage>,
candidate_backing_subsystem: OverseenSubsystem<CandidateBackingMessage>,
/// Spawner to spawn tasks to.
s: S,
@@ -443,14 +443,14 @@ where
/// # use futures_timer::Delay;
/// # use polkadot_overseer::{
/// # Overseer, Subsystem, SpawnedSubsystem, SubsystemContext,
/// # ValidationSubsystemMessage, CandidateBackingSubsystemMessage,
/// # CandidateValidationMessage, CandidateBackingMessage,
/// # };
///
/// struct ValidationSubsystem;
/// impl Subsystem<ValidationSubsystemMessage> for ValidationSubsystem {
/// impl Subsystem<CandidateValidationMessage> for ValidationSubsystem {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<ValidationSubsystemMessage>,
/// mut ctx: SubsystemContext<CandidateValidationMessage>,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
@@ -461,10 +461,10 @@ where
/// }
///
/// struct CandidateBackingSubsystem;
/// impl Subsystem<CandidateBackingSubsystemMessage> for CandidateBackingSubsystem {
/// impl Subsystem<CandidateBackingMessage> for CandidateBackingSubsystem {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>,
/// mut ctx: SubsystemContext<CandidateBackingMessage>,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
@@ -498,8 +498,8 @@ where
/// ```
pub fn new(
leaves: impl IntoIterator<Item = BlockInfo>,
validation: Box<dyn Subsystem<ValidationSubsystemMessage> + Send>,
candidate_backing: Box<dyn Subsystem<CandidateBackingSubsystemMessage> + Send>,
validation: Box<dyn Subsystem<CandidateValidationMessage> + Send>,
candidate_backing: Box<dyn Subsystem<CandidateBackingMessage> + Send>,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
@@ -670,7 +670,7 @@ where
async fn route_message(&mut self, msg: AllMessages) {
match msg {
AllMessages::Validation(msg) => {
AllMessages::CandidateValidation(msg) => {
if let Some(ref mut s) = self.validation_subsystem.instance {
let _= s.tx.send(FromOverseer::Communication { msg }).await;
}
@@ -717,12 +717,14 @@ fn spawn<S: Spawn, M: Debug>(
#[cfg(test)]
mod tests {
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};
use polkadot_primitives::parachain::{BlockData, PoVBlock};
use super::*;
struct TestSubsystem1(mpsc::Sender<usize>);
impl Subsystem<ValidationSubsystemMessage> for TestSubsystem1 {
fn start(&mut self, mut ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateValidationMessage> for TestSubsystem1 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
let mut i = 0;
@@ -744,15 +746,23 @@ mod tests {
struct TestSubsystem2(mpsc::Sender<usize>);
impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem2 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateBackingMessage> for TestSubsystem2 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
let mut c: usize = 0;
loop {
if c < 10 {
let (tx, _) = oneshot::channel();
ctx.send_msg(
AllMessages::Validation(
ValidationSubsystemMessage::ValidityAttestation
AllMessages::CandidateValidation(
CandidateValidationMessage::Validate(
Default::default(),
Default::default(),
PoVBlock {
block_data: BlockData(Vec::new()),
},
tx,
)
)
).await.unwrap();
c += 1;
@@ -776,8 +786,8 @@ mod tests {
struct TestSubsystem4;
impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem4 {
fn start(&mut self, mut _ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateBackingMessage> for TestSubsystem4 {
fn start(&mut self, mut _ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
// Do nothing and exit.
}))
@@ -861,8 +871,8 @@ mod tests {
struct TestSubsystem5(mpsc::Sender<OverseerSignal>);
impl Subsystem<ValidationSubsystemMessage> for TestSubsystem5 {
fn start(&mut self, mut ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateValidationMessage> for TestSubsystem5 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
@@ -885,8 +895,8 @@ mod tests {
struct TestSubsystem6(mpsc::Sender<OverseerSignal>);
impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem6 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateBackingMessage> for TestSubsystem6 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {