remove AllSubsystems and AllSubsystemsGen types (#3874)

* introduce the OverseerConnector, use it

* introduce is_relay_chain to RelayChainSelection

* Update node/service/src/lib.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* avoid the deferred setting of `is_relay_chain` in `RelayChainSelection`

* positive assertion is not mandated, only the negative one, to avoid a stall

* cleanup: overseer residue

* spellcheck

* fixin

* groundwork to obsolete Overseer::new and AllSubsystemsGen proc-macro

* Now all malus & tests can be ported to the builder pattern.

Obsoletes `Overseer::new`, `AllSubsystemsGen` derive macro, `AllSubsystems`.

* spellcheck

* adjust tests, minor fixes

* remove derive macro AllSubsystemsGen

* add forgotten file dummy.rs

* remove residue

* good news everyone!

* spellcheck

* address review comments

* fixup imports

* make it conditional

* fixup docs

* reduce import

* chore: fmt

* chore: fmt

* chore: spellcheck / nlprules

* fixup malus variant-a

* fmt

* fix

* fixins

* pfmt

* fixins

* chore: fmt

* remove expanded overseer generation

* tracing version

* Update node/network/statement-distribution/src/lib.rs

Co-authored-by: Robert Habermeier <rphmeier@gmail.com>

* use future::ready instead

* silence warning

* chore: fmt

Co-authored-by: Andronik Ordian <write@reusable.software>
Co-authored-by: Robert Habermeier <rphmeier@gmail.com>
This commit is contained in:
Bernhard Schuster
2021-09-29 16:24:56 +02:00
committed by GitHub
parent f9de0040c9
commit c57a1e7934
42 changed files with 600 additions and 1389 deletions
-11
View File
@@ -6553,7 +6553,6 @@ dependencies = [
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem-types",
"polkadot-overseer-all-subsystems-gen",
"polkadot-overseer-gen",
"polkadot-primitives",
"sc-client-api",
@@ -6562,16 +6561,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "polkadot-overseer-all-subsystems-gen"
version = "0.9.9"
dependencies = [
"proc-macro2",
"quote",
"syn",
"trybuild",
]
[[package]]
name = "polkadot-overseer-gen"
version = "0.9.9"
-1
View File
@@ -73,7 +73,6 @@ members = [
"node/overseer",
"node/overseer/overseer-gen",
"node/overseer/overseer-gen/proc-macro",
"node/overseer/all-subsystems-gen",
"node/malus",
"node/primitives",
"node/service",
+1 -1
View File
@@ -29,7 +29,7 @@ pub use service::RuntimeApiCollection;
pub use service::{self, Block, CoreApi, IdentifyVariant, ProvideRuntimeApi, TFullClient};
#[cfg(feature = "malus")]
pub use service::create_default_subsystems;
pub use service::overseer::prepared_overseer_builder;
#[cfg(feature = "cli")]
pub use cli::*;
+2 -2
View File
@@ -150,7 +150,7 @@ impl Artifacts {
/// Inform the table about the artifact with the given ID. The state will be set to "preparing".
///
/// This function must be used only for brand new artifacts and should never be used for
/// This function must be used only for brand-new artifacts and should never be used for
/// replacing existing ones.
pub fn insert_preparing(&mut self, artifact_id: ArtifactId) {
// See the precondition.
@@ -159,7 +159,7 @@ impl Artifacts {
/// Insert an artifact with the given ID as "prepared".
///
/// This function must be used only for brand new artifacts and should never be used for
/// This function must be used only for brand-new artifacts and should never be used for
/// replacing existing ones.
#[cfg(test)]
pub fn insert_prepared(&mut self, artifact_id: ArtifactId, last_time_needed: SystemTime) {
+9 -16
View File
@@ -24,10 +24,10 @@
use color_eyre::eyre;
use polkadot_cli::{
create_default_subsystems,
prepared_overseer_builder,
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer,
OverseerGen, OverseerGenArgs, ParachainHost, ProvideRuntimeApi, SpawnNamed,
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, OverseerGen,
OverseerGenArgs, ParachainHost, ProvideRuntimeApi, SpawnNamed,
},
Cli,
};
@@ -37,7 +37,7 @@ use polkadot_cli::{
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateValidationMessage},
overseer::{self, OverseerConnector, OverseerHandle},
overseer::{self, Overseer, OverseerConnector, OverseerHandle},
FromOverseer,
};
@@ -94,15 +94,10 @@ impl OverseerGen for BehaveMaleficient {
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
let spawner = args.spawner.clone();
let leaves = args.leaves.clone();
let runtime_client = args.runtime_client.clone();
let registry = args.registry.clone();
let candidate_validation_config = args.candidate_validation_config.clone();
// modify the subsystem(s) as needed:
let all_subsystems = create_default_subsystems(args)?.replace_candidate_validation(
// create the filtered subsystem
|orig: CandidateValidationSubsystem| {
prepared_overseer_builder(args)?
.replace_candidate_validation(|orig: CandidateValidationSubsystem| {
InterceptedSubsystem::new(
CandidateValidationSubsystem::with_config(
candidate_validation_config,
@@ -111,10 +106,8 @@ impl OverseerGen for BehaveMaleficient {
),
Skippy::default(),
)
},
);
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
})
.build_with_connector(connector)
.map_err(|e| e.into())
}
}
@@ -56,7 +56,7 @@ pub struct SessionInfo {
/// validators.
pub validator_groups: Vec<Vec<AuthorityDiscoveryId>>,
/// Information about ourself:
/// Information about ourselves:
pub our_index: ValidatorIndex,
/// Remember to which group we belong, so we won't start fetching chunks for candidates with
@@ -18,7 +18,7 @@
//! futures will still get polled, but will not count towards length. So length will only count
//! futures, which are still considered live.
//!
//! Usecase: If futures take longer than we would like them too, we maybe able to request the data
//! Usecase: If futures take longer than we would like them too, we may be able to request the data
//! from somewhere else as well. We don't really want to cancel the old future, because maybe it
//! was almost done, thus we would have wasted time with our impatience. By simply making them
//! not count towards length, we can make sure to have enough "live" requests ongoing, while at the
@@ -29,7 +29,7 @@ use crate::{sender, LOG_TARGET};
pub enum Error {
/// Fatal errors of dispute distribution.
Fatal(Fatal),
/// Non fatal errors of dispute distribution.
/// Non-fatal errors of dispute distribution.
NonFatal(NonFatal),
}
@@ -39,7 +39,7 @@ pub type FatalResult<T> = std::result::Result<T, Fatal>;
pub enum Error {
/// Fatal errors of dispute distribution.
Fatal(Fatal),
/// Non fatal errors of dispute distribution.
/// Non-fatal errors of dispute distribution.
NonFatal(NonFatal),
}
@@ -105,7 +105,7 @@ const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20;
/// The statement distribution subsystem.
pub struct StatementDistribution {
/// Pointer to a keystore, which is required for determining this nodes validator index.
/// Pointer to a keystore, which is required for determining this node's validator index.
keystore: SyncCryptoStorePtr,
/// Receiver for incoming large statement requests.
req_receiver: Option<IncomingRequestReceiver<request_v1::StatementFetchingRequest>>,
-1
View File
@@ -16,7 +16,6 @@ polkadot-node-subsystem-types = { path = "../subsystem-types" }
polkadot-node-metrics = { path = "../metrics" }
polkadot-primitives = { path = "../../primitives" }
polkadot-overseer-gen = { path = "./overseer-gen" }
polkadot-overseer-all-subsystems-gen = { path = "./all-subsystems-gen" }
tracing = "0.1.28"
lru = "0.6"
parity-util-mem = { version = ">= 0.10.1", default-features = false }
@@ -1,17 +0,0 @@
[package]
name = "polkadot-overseer-all-subsystems-gen"
version = "0.9.9"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Small proc macro to create mocking level iface type helpers"
[lib]
proc-macro = true
[dependencies]
syn = { version = "1.0.77", features = ["full", "extra-traits"] }
quote = "1.0.9"
proc-macro2 = "1.0.24"
[dev-dependencies]
trybuild = "1.0.45"
@@ -1,222 +0,0 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashSet;
use proc_macro2::TokenStream;
use quote::quote;
use syn::{parse2, Error, GenericParam, Ident, Result, Type};
#[proc_macro_derive(AllSubsystemsGen)]
pub fn subsystems_gen(item: proc_macro::TokenStream) -> proc_macro::TokenStream {
let item: TokenStream = item.into();
impl_subsystems_gen(item).unwrap_or_else(|err| err.to_compile_error()).into()
}
fn impl_subsystems_gen(item: TokenStream) -> Result<proc_macro2::TokenStream> {
let span = proc_macro2::Span::call_site();
let ds = parse2::<syn::ItemStruct>(item.clone())?;
match ds.fields {
syn::Fields::Named(named) => {
#[derive(Clone)]
struct NameTyTup {
field: Ident,
ty: Type,
}
let mut orig_generics = ds.generics;
// remove default types
orig_generics.params = orig_generics
.params
.into_iter()
.map(|mut generic| {
match generic {
GenericParam::Type(ref mut param) => {
param.eq_token = None;
param.default = None;
},
_ => {},
}
generic
})
.collect();
// prepare a hashmap of generic type to member that uses it
let generic_types = orig_generics
.params
.iter()
.filter_map(|generic| {
if let GenericParam::Type(param) = generic {
Some(param.ident.clone())
} else {
None
}
})
.collect::<HashSet<Ident>>();
let strukt_ty = ds.ident;
if generic_types.is_empty() {
return Err(Error::new(
strukt_ty.span(),
"struct must have at least one generic parameter.",
))
}
// collect all fields that exist, and all fields that are replaceable
let mut replacable_items = Vec::<NameTyTup>::with_capacity(64);
let mut all_fields = replacable_items.clone();
let mut duplicate_generic_detection = HashSet::<Ident>::with_capacity(64);
for field in named.named {
let field_ident = field
.ident
.clone()
.ok_or_else(|| Error::new(span, "Member field must have a name."))?;
let ty = field.ty.clone();
let ntt = NameTyTup { field: field_ident, ty };
replacable_items.push(ntt.clone());
// assure every generic is used exactly once
let ty_ident = match field.ty {
Type::Path(path) => path.path.get_ident().cloned().ok_or_else(|| {
Error::new(
proc_macro2::Span::call_site(),
"Expected an identifier, but got a path.",
)
}),
_ => return Err(Error::new(proc_macro2::Span::call_site(), "Must be path.")),
}?;
if generic_types.contains(&ty_ident) {
if let Some(previous) = duplicate_generic_detection.replace(ty_ident) {
return Err(Error::new(previous.span(), "Generic type parameters may only be used for exactly one field, but is used more than once."));
}
}
all_fields.push(ntt);
}
let msg = "Generated by #[derive(AllSubsystemsGen)] derive proc-macro.";
let mut additive = TokenStream::new();
// generate an impl of `fn replace_#name`
for NameTyTup { field: replacable_item, ty: replacable_item_ty } in replacable_items {
let keeper = &all_fields
.iter()
.filter(|ntt| ntt.field != replacable_item)
.map(|ntt| ntt.field.clone())
.collect::<Vec<_>>();
let strukt_ty = strukt_ty.clone();
let fname = Ident::new(&format!("replace_{}", replacable_item), span);
// adjust the generics such that the appropriate member type is replaced
let mut modified_generics = orig_generics.clone();
modified_generics.params = modified_generics
.params
.into_iter()
.map(|mut generic| {
match generic {
GenericParam::Type(ref mut param) => {
param.eq_token = None;
param.default = None;
if match &replacable_item_ty {
Type::Path(path) => path
.path
.get_ident()
.filter(|&ident| ident == &param.ident)
.is_some(),
_ => false,
} {
param.ident = Ident::new("NEW", span);
}
},
_ => {},
}
generic
})
.collect();
additive.extend(quote! {
impl #orig_generics #strukt_ty #orig_generics {
#[doc = #msg]
pub fn #fname < NEW, F > (self, gen_replacement_fn: F) -> #strukt_ty #modified_generics
where
F: FnOnce(#replacable_item_ty) -> NEW,
{
let Self {
// To be replaced field:
#replacable_item,
// Fields to keep:
#(
#keeper,
)*
} = self;
// Some cases require that parts of the original are copied
// over, since they include a one time initialization.
let replacement = gen_replacement_fn(#replacable_item);
#strukt_ty :: #modified_generics {
#replacable_item: replacement,
#(
#keeper,
)*
}
}
}
});
}
Ok(additive)
},
syn::Fields::Unit =>
Err(Error::new(span, "Must be a struct with named fields. Not an unit struct.")),
syn::Fields::Unnamed(_) => Err(Error::new(
span,
"Must be a struct with named fields. Not an unnamed fields struct.",
)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic() {
let item = quote! {
pub struct AllSubsystems<A,B,CD> {
pub a: A,
pub beee: B,
pub dj: CD,
}
};
let output = impl_subsystems_gen(item).expect("Simple example always works. qed");
println!("//generated:");
println!("{}", output);
}
#[test]
fn ui() {
let t = trybuild::TestCases::new();
t.compile_fail("tests/ui/err-*.rs");
t.pass("tests/ui/ok-*.rs");
}
}
@@ -1,13 +0,0 @@
#![allow(dead_code)]
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
#[derive(Clone, AllSubsystemsGen)]
enum AllSubsystems<A,B> {
A(A),
B(B),
}
fn main() {
let all = AllSubsystems::<u8,u16>::A(0u8);
}
@@ -1,5 +0,0 @@
error: expected `struct`
--> $DIR/err-01-enum.rs:6:1
|
6 | enum AllSubsystems<A,B> {
| ^^^^
@@ -1,16 +0,0 @@
#![allow(dead_code)]
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
#[derive(Clone, AllSubsystemsGen)]
struct AllSubsystems<X> {
a: X,
b: X,
}
fn main() {
let all = AllSubsystems::<u16> {
a: 0_u16,
b: 1_u16,
};
let _all = all.replace_a(|_| 77u8);
}
@@ -1,14 +0,0 @@
error: Generic type parameters may only be used for exactly one field, but is used more than once.
--> $DIR/err-01-generic-used-twice.rs:6:5
|
6 | a: X,
| ^
error[E0599]: no method named `replace_a` found for struct `AllSubsystems` in the current scope
--> $DIR/err-01-generic-used-twice.rs:15:17
|
5 | struct AllSubsystems<X> {
| ----------------------- method `replace_a` not found for this
...
15 | let _all = all.replace_a(|_| 77u8);
| ^^^^^^^^^ method not found in `AllSubsystems<u16>`
@@ -1,17 +0,0 @@
#![allow(dead_code)]
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
#[derive(Clone, AllSubsystemsGen)]
struct AllSubsystems {
a: f32,
b: u16,
}
fn main() {
let all = AllSubsystems {
a: 0_f32,
b: 1_u16,
};
let _all = all.replace_a(|_| 77u8);
}
@@ -1,14 +0,0 @@
error: struct must have at least one generic parameter.
--> $DIR/err-01-no-generic.rs:6:8
|
6 | struct AllSubsystems {
| ^^^^^^^^^^^^^
error[E0599]: no method named `replace_a` found for struct `AllSubsystems` in the current scope
--> $DIR/err-01-no-generic.rs:16:17
|
6 | struct AllSubsystems {
| -------------------- method `replace_a` not found for this
...
16 | let _all = all.replace_a(|_| 77u8);
| ^^^^^^^^^ method not found in `AllSubsystems`
@@ -1,14 +0,0 @@
error: Generic type parameters may only be used once have at least one generic parameter.
--> $DIR/err-01-no-generics.rs:7:5
|
7 | a: X,
| ^
error[E0599]: no method named `replace_a` found for struct `AllSubsystems<u16>` in the current scope
--> $DIR/err-01-no-generics.rs:16:17
|
6 | struct AllSubsystems<X> {
| ----------------------- method `replace_a` not found for this
...
16 | let _all = all.replace_a(|_| 77u8);
| ^^^^^^^^^ method not found in `AllSubsystems<u16>`
@@ -1,17 +0,0 @@
#![allow(dead_code)]
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
#[derive(Clone, AllSubsystemsGen)]
struct AllSubsystems<A, B> {
a: A,
b: B,
}
fn main() {
let all = AllSubsystems::<u8, u16> {
a: 0u8,
b: 1u16,
};
let _all: AllSubsystems<_,_> = all.replace_a::<u32,_>(|_| 777_777u32);
}
@@ -28,9 +28,9 @@ use polkadot_node_subsystem_types::messages::{
};
use polkadot_overseer::{
self as overseer,
dummy::dummy_overseer_builder,
gen::{FromOverseer, SpawnedSubsystem},
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerConnector,
OverseerSignal, SubsystemError,
AllMessages, HeadSupportsParachains, OverseerSignal, SubsystemError,
};
use polkadot_primitives::v1::Hash;
@@ -170,19 +170,13 @@ fn main() {
Delay::new(Duration::from_secs(1)).await;
});
let all_subsystems = AllSubsystems::<()>::dummy()
let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
.unwrap()
.replace_candidate_validation(|_| Subsystem2)
.replace_candidate_backing(|orig| orig);
.replace_candidate_backing(|orig| orig)
.build()
.unwrap();
let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
AlwaysSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;
@@ -19,3 +19,9 @@ proc-macro-crate = "1.1.0"
[dev-dependencies]
assert_matches = "1.5.0"
[features]
default = []
# write the expanded version to a `overlord-expansion.rs`
# in the `cwd`
expansion = []
@@ -19,6 +19,39 @@ use syn::Ident;
use super::*;
/// Returns all combinations for a single replacement:
/// 1. generic args with `NEW` in place
/// 2. subsystem type to be replaced
/// 3. the subsystem name to be replaced by a new type and value
/// 4. all other subsystems that are supposed to be kept
fn derive_replacable_generic_lists(
info: &OverseerInfo,
) -> Vec<(TokenStream, Ident, Ident, Vec<Ident>)> {
// subsystem generic types
let builder_generic_ty = info.builder_generic_types();
let to_be_replaced_name = info.subsystem_names_without_wip();
let baggage_generic_ty = &info.baggage_generic_types();
builder_generic_ty
.iter()
.enumerate()
.map(|(idx, to_be_replaced_ty)| {
let mut to_keep_name = to_be_replaced_name.clone();
let to_be_replaced_name: Ident = to_keep_name.remove(idx);
let mut builder_generic_ty = builder_generic_ty.clone();
builder_generic_ty[idx] = format_ident!("NEW");
let generics_ts = quote! {
<S, #( #baggage_generic_ty, )* #( #builder_generic_ty, )* >
};
(generics_ts, to_be_replaced_ty.clone(), to_be_replaced_name, to_keep_name)
})
.collect::<Vec<(_, _, _, _)>>()
}
/// Implement a builder pattern for the `Overseer`-type,
/// which acts as the gateway to constructing the overseer.
///
@@ -35,6 +68,12 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
.iter()
.map(|subsystem_name| format_ident!("{}_with", subsystem_name))
.collect::<Vec<_>>();
let subsystem_name_replace_with = &info
.subsystem_names_without_wip()
.iter()
.map(|subsystem_name| format_ident!("replace_{}", subsystem_name))
.collect::<Vec<_>>();
let builder_generic_ty = &info.builder_generic_types();
let channel_name = &info.channel_names_without_wip("");
@@ -50,6 +89,8 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
let baggage_name = &info.baggage_names();
let baggage_ty = &info.baggage_types();
let subsystem_ctx_name = format_ident!("{}SubsystemContext", overseer_name);
let error_ty = &info.extern_error_ty;
let support_crate = info.support_crate_name();
@@ -155,7 +196,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
/// Convenience alias.
type SubsystemInitFn<T> = Box<dyn FnOnce(#handle) -> ::std::result::Result<T, #error_ty> >;
/// Init kind of a field of the overseer.
/// Initialization type to be used for a field of the overseer.
enum FieldInitMethod<T> {
/// Defer initialization to a point where the `handle` is available.
Fn(SubsystemInitFn<T>),
@@ -242,13 +283,13 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
)*
/// Complete the construction and create the overseer type.
pub fn build(mut self) -> ::std::result::Result<(#overseer_name #generics, #handle), #error_ty> {
pub fn build(self) -> ::std::result::Result<(#overseer_name #generics, #handle), #error_ty> {
let connector = #connector ::default();
self.build_with_connector(connector)
}
/// Complete the construction and create the overseer type based on an existing `connector`.
pub fn build_with_connector(mut self, connector: #connector) -> ::std::result::Result<(#overseer_name #generics, #handle), #error_ty>
pub fn build_with_connector(self, connector: #connector) -> ::std::result::Result<(#overseer_name #generics, #handle), #error_ty>
{
let #connector {
handle: events_tx,
@@ -321,7 +362,6 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#channel_name_tx,
signal_tx,
unbounded_meter,
channels_out.clone(),
ctx,
#subsystem_name,
&mut running_subsystems,
@@ -330,9 +370,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#(
let #baggage_name = self. #baggage_name .expect(
&format!("Baggage variable `{1}` of `{0}` ",
stringify!(#overseer_name),
stringify!( #baggage_name )
&format!("Baggage variable `{0}` of `{1}` must be set by the user!",
stringify!(#baggage_name),
stringify!(#overseer_name)
)
);
)*
@@ -359,6 +399,83 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
}
}
};
let mut acc = TokenStream::new();
for (
(
(
ref modified_generics,
ref to_be_replaced_ty,
ref to_be_replaced_name,
ref to_keep_name,
),
subsystem_name_replace_with,
),
consumes,
) in derive_replacable_generic_lists(info)
.into_iter()
.zip(subsystem_name_replace_with.iter())
.zip(consumes.iter())
{
let replace1 = quote! {
/// Replace a subsystem by another implementation for the
/// consumable message type.
pub fn #subsystem_name_replace_with < NEW, F >
(self, gen_replacement_fn: F) -> #builder #modified_generics
where
#to_be_replaced_ty: 'static,
F: 'static + FnOnce(#to_be_replaced_ty) -> NEW,
NEW: #support_crate ::Subsystem<#subsystem_ctx_name< #consumes >, #error_ty>,
{
let Self {
#to_be_replaced_name,
#(
#to_keep_name,
)*
#(
#baggage_name,
)*
spawner,
} = self;
// Some cases require that parts of the original are copied
// over, since they include a one time initialization.
let replacement: FieldInitMethod<NEW> = match #to_be_replaced_name {
FieldInitMethod::Fn(fx) => FieldInitMethod::Fn(
Box::new(move |handle: #handle| {
let orig = fx(handle)?;
Ok(gen_replacement_fn(orig))
})
),
FieldInitMethod::Value(val) => FieldInitMethod::Value(gen_replacement_fn(val)),
FieldInitMethod::Uninitialized => panic!("Must have a value before it can be replaced. qed"),
};
#builder :: #modified_generics {
#to_be_replaced_name: replacement,
#(
#to_keep_name,
)*
#(
#baggage_name,
)*
spawner,
}
}
};
acc.extend(replace1);
}
ts.extend(quote! {
impl #builder_generics #builder #builder_generics
#builder_where_clause
{
#acc
}
});
ts.extend(impl_task_kind(info));
ts
}
@@ -369,9 +486,6 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
let support_crate = info.support_crate_name();
let ts = quote! {
use #support_crate ::FutureExt as _;
/// Task kind to launch.
pub trait TaskKind {
/// Spawn a task, it depends on the implementer if this is blocking or not.
@@ -401,8 +515,6 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
signal_tx: #support_crate ::metered::MeteredSender< #signal >,
// meter for the unbounded channel
unbounded_meter: #support_crate ::metered::Meter,
// connection to the subsystems
channels_out: ChannelsOut,
ctx: Ctx,
s: SubSys,
futures: &mut #support_crate ::FuturesUnordered<BoxFuture<'static, ::std::result::Result<(), #error_ty> >>,
@@ -94,8 +94,6 @@ pub(crate) fn impl_channels_out_struct(info: &OverseerInfo) -> Result<proc_macro
signals_received: usize,
message: #message_wrapper,
) {
use ::std::sync::mpsc::TrySendError;
let res: ::std::result::Result<_, _> = match message {
#(
#message_wrapper :: #consumes_variant (inner) => {
@@ -56,8 +56,6 @@ pub(crate) fn impl_overseer_struct(info: &OverseerInfo) -> proc_macro2::TokenStr
syn::LitStr::new(overseer_name.to_string().to_lowercase().as_str(), overseer_name.span());
let ts = quote! {
const STOP_DELAY: ::std::time::Duration = ::std::time::Duration::from_secs(1);
/// Capacity of a bounded message channel between overseer and subsystem
/// but also for bounded channels between two subsystems.
const CHANNEL_CAPACITY: usize = #message_channel_capacity;
@@ -116,10 +114,9 @@ pub(crate) fn impl_overseer_struct(info: &OverseerInfo) -> proc_macro2::TokenStr
loop {
select! {
_ = self.running_subsystems.next() => {
if self.running_subsystems.is_empty() {
break;
}
_ = self.running_subsystems.next() =>
if self.running_subsystems.is_empty() {
break;
},
_ = timeout_fut => break,
complete => break,
@@ -187,9 +184,6 @@ pub(crate) fn impl_overseen_subsystem(info: &OverseerInfo) -> proc_macro2::Token
let support_crate = info.support_crate_name();
let ts = quote::quote! {
use #support_crate ::futures::SinkExt as _;
/// A subsystem that the overseer oversees.
///
/// Ties together the [`Subsystem`] itself and it's running instance
@@ -98,5 +98,28 @@ pub(crate) fn impl_overseer_gen(
additive.extend(impl_message_wrapper_enum(&info)?);
additive.extend(impl_dispatch(&info));
#[cfg(feature = "expansion")]
{
use std::io::Write;
let cwd = std::env::current_dir().unwrap();
let path: std::path::PathBuf = cwd.join("overlord-expansion.rs");
let mut f = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.expect("File exists. qed");
f.write_all(
&mut format!("// {:?} \n{}", std::time::SystemTime::now(), additive).as_bytes(),
)
.expect("Got permissions to write to file. qed");
std::process::Command::new("rustfmt")
.arg("--edition=2018")
.arg(&path)
.current_dir(cwd)
.spawn()
.expect("Running rustfmt works. qed");
}
Ok(additive)
}
+144 -2
View File
@@ -14,8 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{AllMessages, OverseerSignal};
use polkadot_node_subsystem_types::errors::SubsystemError;
use crate::{
prometheus::Registry, AllMessages, HeadSupportsParachains, MetricsTrait, Overseer,
OverseerBuilder, OverseerMetrics, OverseerSignal, OverseerSubsystemContext, SpawnNamed,
KNOWN_LEAVES_CACHE_SIZE,
};
use lru::LruCache;
use polkadot_node_subsystem_types::{errors::SubsystemError, messages::*};
use polkadot_overseer_gen::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext};
/// A dummy subsystem that implements [`Subsystem`] for all
@@ -52,3 +57,140 @@ where
SpawnedSubsystem { name: "dummy-subsystem", future }
}
}
/// Create an overseer with all subsystem being `Sub`.
///
/// Preferred way of initializing a dummy overseer for subsystem tests.
pub fn dummy_overseer_builder<'a, Spawner, SupportsParachains>(
spawner: Spawner,
supports_parachains: SupportsParachains,
registry: Option<&'a Registry>,
) -> Result<
OverseerBuilder<
Spawner,
SupportsParachains,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
>,
SubsystemError,
>
where
Spawner: SpawnNamed + Send + Sync + 'static,
SupportsParachains: HeadSupportsParachains,
{
one_for_all_overseer_builder(spawner, supports_parachains, DummySubsystem, registry)
}
/// Create an overseer with all subsystem being `Sub`.
pub fn one_for_all_overseer_builder<'a, Spawner, SupportsParachains, Sub>(
spawner: Spawner,
supports_parachains: SupportsParachains,
subsystem: Sub,
registry: Option<&'a Registry>,
) -> Result<
OverseerBuilder<
Spawner,
SupportsParachains,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
Sub,
>,
SubsystemError,
>
where
Spawner: SpawnNamed + Send + Sync + 'static,
SupportsParachains: HeadSupportsParachains,
Sub: Clone
+ Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<AvailabilityRecoveryMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<CandidateBackingMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<ChainApiMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<CollationGenerationMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<ProvisionerMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<RuntimeApiMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<StatementDistributionMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<ApprovalDistributionMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<ApprovalVotingMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<GossipSupportMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<DisputeCoordinatorMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<DisputeParticipationMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<DisputeDistributionMessage>, SubsystemError>
+ Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError>,
{
let metrics = <OverseerMetrics as MetricsTrait>::register(registry)?;
let builder = Overseer::builder()
.availability_distribution(subsystem.clone())
.availability_recovery(subsystem.clone())
.availability_store(subsystem.clone())
.bitfield_distribution(subsystem.clone())
.bitfield_signing(subsystem.clone())
.candidate_backing(subsystem.clone())
.candidate_validation(subsystem.clone())
.chain_api(subsystem.clone())
.collation_generation(subsystem.clone())
.collator_protocol(subsystem.clone())
.network_bridge(subsystem.clone())
.provisioner(subsystem.clone())
.runtime_api(subsystem.clone())
.statement_distribution(subsystem.clone())
.approval_distribution(subsystem.clone())
.approval_voting(subsystem.clone())
.gossip_support(subsystem.clone())
.dispute_coordinator(subsystem.clone())
.dispute_participation(subsystem.clone())
.dispute_distribution(subsystem.clone())
.chain_selection(subsystem)
.activation_external_listeners(Default::default())
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
.leaves(Default::default())
.spawner(spawner)
.metrics(metrics)
.supports_parachains(supports_parachains);
Ok(builder)
}
+174 -318
View File
@@ -62,7 +62,6 @@
use std::{
collections::{hash_map, HashMap},
fmt::{self, Debug},
iter::FromIterator,
pin::Pin,
sync::Arc,
time::Duration,
@@ -90,17 +89,13 @@ pub use polkadot_node_subsystem_types::{
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
};
/// Test helper supplements.
pub mod metrics;
pub use self::metrics::Metrics as OverseerMetrics;
/// A dummy subsystem, mostly useful for placeholders and tests.
pub mod dummy;
pub use self::dummy::DummySubsystem;
// TODO legacy, to be deleted, left for easier integration
// TODO https://github.com/paritytech/polkadot/issues/3427
mod subsystems;
pub use self::subsystems::AllSubsystems;
pub mod metrics;
pub use polkadot_node_metrics::{
metrics::{prometheus, Metrics as MetricsTrait},
Metronome,
@@ -292,7 +287,119 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
}
}
/// The `Overseer` itself.
/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
///
/// This returns the overseer along with an [`OverseerHandle`] which can
/// be used to send messages from external parts of the codebase.
///
/// The [`OverseerHandle`] returned from this function is connected to
/// the returned [`Overseer`].
///
/// ```text
/// +------------------------------------+
/// | Overseer |
/// +------------------------------------+
/// / | | \
/// ................. subsystems...................................
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// . | | | | | | | | .
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// ...............................................................
/// |
/// probably `spawn`
/// a `job`
/// |
/// V
/// +-----------+
/// | |
/// +-----------+
///
/// ```
///
/// [`Subsystem`]: trait.Subsystem.html
///
/// # Example
///
/// The [`Subsystems`] may be any type as long as they implement an expected interface.
/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them.
/// For the sake of simplicity the termination of the example is done with a timeout.
/// ```
/// # use std::time::Duration;
/// # use futures::{executor, pin_mut, select, FutureExt};
/// # use futures_timer::Delay;
/// # use polkadot_primitives::v1::Hash;
/// # use polkadot_overseer::{
/// # self as overseer,
/// # OverseerSignal,
/// # SubsystemSender as _,
/// # AllMessages,
/// # HeadSupportsParachains,
/// # Overseer,
/// # SubsystemError,
/// # gen::{
/// # SubsystemContext,
/// # FromOverseer,
/// # SpawnedSubsystem,
/// # },
/// # };
/// # use polkadot_node_subsystem_types::messages::{
/// # CandidateValidationMessage, CandidateBackingMessage,
/// # NetworkBridgeMessage,
/// # };
///
/// struct ValidationSubsystem;
///
/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
/// where
/// Ctx: overseer::SubsystemContext<
/// Message=CandidateValidationMessage,
/// AllMessages=AllMessages,
/// Signal=OverseerSignal,
/// Error=SubsystemError,
/// >,
/// {
/// fn start(
/// self,
/// mut ctx: Ctx,
/// ) -> SpawnedSubsystem<SubsystemError> {
/// SpawnedSubsystem {
/// name: "validation-subsystem",
/// future: Box::pin(async move {
/// loop {
/// Delay::new(Duration::from_secs(1)).await;
/// }
/// }),
/// }
/// }
/// }
///
/// # fn main() { executor::block_on(async move {
///
/// struct AlwaysSupportsParachains;
/// impl HeadSupportsParachains for AlwaysSupportsParachains {
/// fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
/// }
/// let spawner = sp_core::testing::TaskExecutor::new();
/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
/// .unwrap()
/// .replace_candidate_validation(|_| ValidationSubsystem)
/// .build()
/// .unwrap();
///
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
///
/// let overseer_fut = overseer.run().fuse();
/// pin_mut!(timer);
/// pin_mut!(overseer_fut);
///
/// select! {
/// _ = overseer_fut => (),
/// _ = timer => (),
/// }
/// #
/// # });
/// # }
/// ```
#[overlord(
gen=AllMessages,
event=Event,
@@ -385,7 +492,60 @@ pub struct Overseer<SupportsParachains> {
pub known_leaves: LruCache<Hash, ()>,
/// Various Prometheus metrics.
pub metrics: crate::metrics::Metrics,
pub metrics: OverseerMetrics,
}
/// Spawn the metrics metronome task.
pub fn spawn_metronome_metrics<S, SupportsParachains>(
overseer: &mut Overseer<S, SupportsParachains>,
metronome_metrics: OverseerMetrics,
) -> Result<(), SubsystemError>
where
S: SpawnNamed,
SupportsParachains: HeadSupportsParachains,
{
struct ExtractNameAndMeters;
impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem<T>> for ExtractNameAndMeters {
type Output = Option<(&'static str, SubsystemMeters)>;
fn map_subsystem(&self, subsystem: &'a OverseenSubsystem<T>) -> Self::Output {
subsystem
.instance
.as_ref()
.map(|instance| (instance.name, instance.meters.clone()))
}
}
let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
let memory_stats =
MemoryAllocationTracker::new().expect("Jemalloc is the default allocator. qed");
let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
match memory_stats.snapshot() {
Ok(memory_stats_snapshot) => {
tracing::trace!(target: LOG_TARGET, "memory_stats: {:?}", &memory_stats_snapshot);
metronome_metrics.memory_stats_snapshot(memory_stats_snapshot);
},
Err(e) => tracing::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
}
// We combine the amount of messages from subsystems to the overseer
// as well as the amount of messages from external sources to the overseer
// into one `to_overseer` value.
metronome_metrics.channel_fill_level_snapshot(
subsystem_meters
.iter()
.cloned()
.filter_map(|x| x)
.map(|(name, ref meters)| (name, meters.read())),
);
futures::future::ready(())
});
overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
Ok(())
}
impl<S, SupportsParachains> Overseer<S, SupportsParachains>
@@ -393,313 +553,6 @@ where
SupportsParachains: HeadSupportsParachains,
S: SpawnNamed,
{
/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
///
/// This returns the overseer along with an [`OverseerHandle`] which can
/// be used to send messages from external parts of the codebase.
///
/// The [`OverseerHandle`] returned from this function is connected to
/// the returned [`Overseer`].
///
/// ```text
/// +------------------------------------+
/// | Overseer |
/// +------------------------------------+
/// / | | \
/// ................. subsystems...................................
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// . | | | | | | | | .
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// ...............................................................
/// |
/// probably `spawn`
/// a `job`
/// |
/// V
/// +-----------+
/// | |
/// +-----------+
///
/// ```
///
/// [`Subsystem`]: trait.Subsystem.html
///
/// # Example
///
/// The [`Subsystems`] may be any type as long as they implement an expected interface.
/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them.
/// For the sake of simplicity the termination of the example is done with a timeout.
/// ```
/// # use std::time::Duration;
/// # use futures::{executor, pin_mut, select, FutureExt};
/// # use futures_timer::Delay;
/// # use polkadot_primitives::v1::Hash;
/// # use polkadot_overseer::{
/// # self as overseer,
/// # Overseer,
/// # OverseerSignal,
/// # OverseerConnector,
/// # SubsystemSender as _,
/// # AllMessages,
/// # AllSubsystems,
/// # HeadSupportsParachains,
/// # SubsystemError,
/// # gen::{
/// # SubsystemContext,
/// # FromOverseer,
/// # SpawnedSubsystem,
/// # },
/// # };
/// # use polkadot_node_subsystem_types::messages::{
/// # CandidateValidationMessage, CandidateBackingMessage,
/// # NetworkBridgeMessage,
/// # };
///
/// struct ValidationSubsystem;
///
/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
/// where
/// Ctx: overseer::SubsystemContext<
/// Message=CandidateValidationMessage,
/// AllMessages=AllMessages,
/// Signal=OverseerSignal,
/// Error=SubsystemError,
/// >,
/// {
/// fn start(
/// self,
/// mut ctx: Ctx,
/// ) -> SpawnedSubsystem<SubsystemError> {
/// SpawnedSubsystem {
/// name: "validation-subsystem",
/// future: Box::pin(async move {
/// loop {
/// Delay::new(Duration::from_secs(1)).await;
/// }
/// }),
/// }
/// }
/// }
///
/// # fn main() { executor::block_on(async move {
///
/// struct AlwaysSupportsParachains;
/// impl HeadSupportsParachains for AlwaysSupportsParachains {
/// fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
/// }
/// let spawner = sp_core::testing::TaskExecutor::new();
/// let all_subsystems = AllSubsystems::<()>::dummy()
/// .replace_candidate_validation(|_| ValidationSubsystem);
/// let (overseer, _handle) = Overseer::new(
/// vec![],
/// all_subsystems,
/// None,
/// AlwaysSupportsParachains,
/// spawner,
/// OverseerConnector::default(),
/// ).unwrap();
///
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
///
/// let overseer_fut = overseer.run().fuse();
/// pin_mut!(timer);
/// pin_mut!(overseer_fut);
///
/// select! {
/// _ = overseer_fut => (),
/// _ = timer => (),
/// }
/// #
/// # });
/// # }
/// ```
pub fn new<
CV,
CB,
SD,
AD,
AR,
BS,
BD,
P,
RA,
AS,
NB,
CA,
CG,
CP,
ApD,
ApV,
GS,
DC,
DP,
DD,
CS,
>(
leaves: impl IntoIterator<Item = BlockInfo>,
all_subsystems: AllSubsystems<
CV,
CB,
SD,
AD,
AR,
BS,
BD,
P,
RA,
AS,
NB,
CA,
CG,
CP,
ApD,
ApV,
GS,
DC,
DP,
DD,
CS,
>,
prometheus_registry: Option<&prometheus::Registry>,
supports_parachains: SupportsParachains,
s: S,
connector: OverseerConnector,
) -> SubsystemResult<(Self, OverseerHandle)>
where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>, SubsystemError> + Send,
SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>, SubsystemError>
+ Send,
AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>, SubsystemError>
+ Send,
AR: Subsystem<OverseerSubsystemContext<AvailabilityRecoveryMessage>, SubsystemError> + Send,
BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>, SubsystemError> + Send,
BD: Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>, SubsystemError> + Send,
P: Subsystem<OverseerSubsystemContext<ProvisionerMessage>, SubsystemError> + Send,
RA: Subsystem<OverseerSubsystemContext<RuntimeApiMessage>, SubsystemError> + Send,
AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>, SubsystemError> + Send,
NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>, SubsystemError> + Send,
CA: Subsystem<OverseerSubsystemContext<ChainApiMessage>, SubsystemError> + Send,
CG: Subsystem<OverseerSubsystemContext<CollationGenerationMessage>, SubsystemError> + Send,
CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>, SubsystemError> + Send,
ApD:
Subsystem<OverseerSubsystemContext<ApprovalDistributionMessage>, SubsystemError> + Send,
ApV: Subsystem<OverseerSubsystemContext<ApprovalVotingMessage>, SubsystemError> + Send,
GS: Subsystem<OverseerSubsystemContext<GossipSupportMessage>, SubsystemError> + Send,
DC: Subsystem<OverseerSubsystemContext<DisputeCoordinatorMessage>, SubsystemError> + Send,
DP: Subsystem<OverseerSubsystemContext<DisputeParticipationMessage>, SubsystemError> + Send,
DD: Subsystem<OverseerSubsystemContext<DisputeDistributionMessage>, SubsystemError> + Send,
CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send,
S: SpawnNamed,
{
let metrics = <crate::metrics::Metrics as MetricsTrait>::register(prometheus_registry)?;
let (mut overseer, handle) = Self::builder()
.candidate_validation(all_subsystems.candidate_validation)
.candidate_backing(all_subsystems.candidate_backing)
.statement_distribution(all_subsystems.statement_distribution)
.availability_distribution(all_subsystems.availability_distribution)
.availability_recovery(all_subsystems.availability_recovery)
.bitfield_signing(all_subsystems.bitfield_signing)
.bitfield_distribution(all_subsystems.bitfield_distribution)
.provisioner(all_subsystems.provisioner)
.runtime_api(all_subsystems.runtime_api)
.availability_store(all_subsystems.availability_store)
.network_bridge(all_subsystems.network_bridge)
.chain_api(all_subsystems.chain_api)
.collation_generation(all_subsystems.collation_generation)
.collator_protocol(all_subsystems.collator_protocol)
.approval_distribution(all_subsystems.approval_distribution)
.approval_voting(all_subsystems.approval_voting)
.gossip_support(all_subsystems.gossip_support)
.dispute_coordinator(all_subsystems.dispute_coordinator)
.dispute_participation(all_subsystems.dispute_participation)
.dispute_distribution(all_subsystems.dispute_distribution)
.chain_selection(all_subsystems.chain_selection)
.leaves(Vec::from_iter(
leaves
.into_iter()
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
))
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
.active_leaves(Default::default())
.span_per_active_leaf(Default::default())
.activation_external_listeners(Default::default())
.supports_parachains(supports_parachains)
.metrics(metrics.clone())
.spawner(s)
.build_with_connector(connector)?;
// spawn the metrics metronome task
{
struct ExtractNameAndMeters;
impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem<T>> for ExtractNameAndMeters {
type Output = Option<(&'static str, SubsystemMeters)>;
fn map_subsystem(&self, subsystem: &'a OverseenSubsystem<T>) -> Self::Output {
subsystem
.instance
.as_ref()
.map(|instance| (instance.name, instance.meters.clone()))
}
}
let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
let memory_stats = match MemoryAllocationTracker::new() {
Ok(memory_stats) => Some(memory_stats),
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
"Failed to initialize memory allocation tracker: {:?}",
error
);
None
},
};
let metronome_metrics = metrics.clone();
let metronome =
Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
if let Some(ref memory_stats) = memory_stats {
match memory_stats.snapshot() {
Ok(memory_stats_snapshot) => {
tracing::trace!(
target: LOG_TARGET,
"memory_stats: {:?}",
&memory_stats_snapshot
);
metronome_metrics.memory_stats_snapshot(memory_stats_snapshot);
},
Err(e) => tracing::debug!(
target: LOG_TARGET,
"Failed to obtain memory stats: {:?}",
e
),
}
}
// We combine the amount of messages from subsystems to the overseer
// as well as the amount of messages from external sources to the overseer
// into one `to_overseer` value.
metronome_metrics.channel_fill_level_snapshot(
subsystem_meters
.iter()
.cloned()
.filter_map(|x| x)
.map(|(name, ref meters)| (name, meters.read())),
);
async move { () }
});
overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
}
Ok((overseer, handle))
}
/// Stop the overseer.
async fn stop(mut self) {
let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
@@ -707,6 +560,9 @@ where
/// Run the `Overseer`.
pub async fn run(mut self) -> SubsystemResult<()> {
let metrics = self.metrics.clone();
spawn_metronome_metrics(&mut self, metrics)?;
// Notify about active leaves on startup before starting the loop
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
-344
View File
@@ -1,344 +0,0 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Legacy way of defining subsystems.
//!
//! In the future, everything should be set up using the generated
//! overseer builder pattern instead.
use crate::dummy::DummySubsystem;
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
use polkadot_overseer_gen::MapSubsystem;
/// This struct is passed as an argument to create a new instance of an [`Overseer`].
///
/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows
/// mocking in the test code:
///
/// Each [`Subsystem`] is supposed to implement some interface that is generic over
/// message type that is specific to this [`Subsystem`]. At the moment not all
/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`].
#[derive(Debug, Clone, AllSubsystemsGen)]
pub struct AllSubsystems<
CV = (),
CB = (),
SD = (),
AD = (),
AR = (),
BS = (),
BD = (),
P = (),
RA = (),
AS = (),
NB = (),
CA = (),
CG = (),
CP = (),
ApD = (),
ApV = (),
GS = (),
DC = (),
DP = (),
DD = (),
CS = (),
> {
/// A candidate validation subsystem.
pub candidate_validation: CV,
/// A candidate backing subsystem.
pub candidate_backing: CB,
/// A statement distribution subsystem.
pub statement_distribution: SD,
/// An availability distribution subsystem.
pub availability_distribution: AD,
/// An availability recovery subsystem.
pub availability_recovery: AR,
/// A bitfield signing subsystem.
pub bitfield_signing: BS,
/// A bitfield distribution subsystem.
pub bitfield_distribution: BD,
/// A provisioner subsystem.
pub provisioner: P,
/// A runtime API subsystem.
pub runtime_api: RA,
/// An availability store subsystem.
pub availability_store: AS,
/// A network bridge subsystem.
pub network_bridge: NB,
/// A Chain API subsystem.
pub chain_api: CA,
/// A Collation Generation subsystem.
pub collation_generation: CG,
/// A Collator Protocol subsystem.
pub collator_protocol: CP,
/// An Approval Distribution subsystem.
pub approval_distribution: ApD,
/// An Approval Voting subsystem.
pub approval_voting: ApV,
/// A Connection Request Issuer subsystem.
pub gossip_support: GS,
/// A Dispute Coordinator subsystem.
pub dispute_coordinator: DC,
/// A Dispute Participation subsystem.
pub dispute_participation: DP,
/// A Dispute Distribution subsystem.
pub dispute_distribution: DD,
/// A Chain Selection subsystem.
pub chain_selection: CS,
}
impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>
AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>
{
/// Create a new instance of [`AllSubsystems`].
///
/// Each subsystem is set to [`DummySystem`].
///
///# Note
///
/// Because of a bug in rustc it is required that when calling this function,
/// you provide a "random" type for the first generic parameter:
///
/// ```
/// polkadot_overseer::AllSubsystems::<()>::dummy();
/// ```
pub fn dummy() -> AllSubsystems<
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
> {
AllSubsystems {
candidate_validation: DummySubsystem,
candidate_backing: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
availability_recovery: DummySubsystem,
bitfield_signing: DummySubsystem,
bitfield_distribution: DummySubsystem,
provisioner: DummySubsystem,
runtime_api: DummySubsystem,
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
chain_api: DummySubsystem,
collation_generation: DummySubsystem,
collator_protocol: DummySubsystem,
approval_distribution: DummySubsystem,
approval_voting: DummySubsystem,
gossip_support: DummySubsystem,
dispute_coordinator: DummySubsystem,
dispute_participation: DummySubsystem,
dispute_distribution: DummySubsystem,
chain_selection: DummySubsystem,
}
}
/// Reference every individual subsystem.
pub fn as_ref(
&self,
) -> AllSubsystems<
&'_ CV,
&'_ CB,
&'_ SD,
&'_ AD,
&'_ AR,
&'_ BS,
&'_ BD,
&'_ P,
&'_ RA,
&'_ AS,
&'_ NB,
&'_ CA,
&'_ CG,
&'_ CP,
&'_ ApD,
&'_ ApV,
&'_ GS,
&'_ DC,
&'_ DP,
&'_ DD,
&'_ CS,
> {
AllSubsystems {
candidate_validation: &self.candidate_validation,
candidate_backing: &self.candidate_backing,
statement_distribution: &self.statement_distribution,
availability_distribution: &self.availability_distribution,
availability_recovery: &self.availability_recovery,
bitfield_signing: &self.bitfield_signing,
bitfield_distribution: &self.bitfield_distribution,
provisioner: &self.provisioner,
runtime_api: &self.runtime_api,
availability_store: &self.availability_store,
network_bridge: &self.network_bridge,
chain_api: &self.chain_api,
collation_generation: &self.collation_generation,
collator_protocol: &self.collator_protocol,
approval_distribution: &self.approval_distribution,
approval_voting: &self.approval_voting,
gossip_support: &self.gossip_support,
dispute_coordinator: &self.dispute_coordinator,
dispute_participation: &self.dispute_participation,
dispute_distribution: &self.dispute_distribution,
chain_selection: &self.chain_selection,
}
}
/// Map each subsystem.
pub fn map_subsystems<Mapper>(
self,
mapper: Mapper,
) -> AllSubsystems<
<Mapper as MapSubsystem<CV>>::Output,
<Mapper as MapSubsystem<CB>>::Output,
<Mapper as MapSubsystem<SD>>::Output,
<Mapper as MapSubsystem<AD>>::Output,
<Mapper as MapSubsystem<AR>>::Output,
<Mapper as MapSubsystem<BS>>::Output,
<Mapper as MapSubsystem<BD>>::Output,
<Mapper as MapSubsystem<P>>::Output,
<Mapper as MapSubsystem<RA>>::Output,
<Mapper as MapSubsystem<AS>>::Output,
<Mapper as MapSubsystem<NB>>::Output,
<Mapper as MapSubsystem<CA>>::Output,
<Mapper as MapSubsystem<CG>>::Output,
<Mapper as MapSubsystem<CP>>::Output,
<Mapper as MapSubsystem<ApD>>::Output,
<Mapper as MapSubsystem<ApV>>::Output,
<Mapper as MapSubsystem<GS>>::Output,
<Mapper as MapSubsystem<DC>>::Output,
<Mapper as MapSubsystem<DP>>::Output,
<Mapper as MapSubsystem<DD>>::Output,
<Mapper as MapSubsystem<CS>>::Output,
>
where
Mapper: MapSubsystem<CV>,
Mapper: MapSubsystem<CB>,
Mapper: MapSubsystem<SD>,
Mapper: MapSubsystem<AD>,
Mapper: MapSubsystem<AR>,
Mapper: MapSubsystem<BS>,
Mapper: MapSubsystem<BD>,
Mapper: MapSubsystem<P>,
Mapper: MapSubsystem<RA>,
Mapper: MapSubsystem<AS>,
Mapper: MapSubsystem<NB>,
Mapper: MapSubsystem<CA>,
Mapper: MapSubsystem<CG>,
Mapper: MapSubsystem<CP>,
Mapper: MapSubsystem<ApD>,
Mapper: MapSubsystem<ApV>,
Mapper: MapSubsystem<GS>,
Mapper: MapSubsystem<DC>,
Mapper: MapSubsystem<DP>,
Mapper: MapSubsystem<DD>,
Mapper: MapSubsystem<CS>,
{
AllSubsystems {
candidate_validation: <Mapper as MapSubsystem<CV>>::map_subsystem(
&mapper,
self.candidate_validation,
),
candidate_backing: <Mapper as MapSubsystem<CB>>::map_subsystem(
&mapper,
self.candidate_backing,
),
statement_distribution: <Mapper as MapSubsystem<SD>>::map_subsystem(
&mapper,
self.statement_distribution,
),
availability_distribution: <Mapper as MapSubsystem<AD>>::map_subsystem(
&mapper,
self.availability_distribution,
),
availability_recovery: <Mapper as MapSubsystem<AR>>::map_subsystem(
&mapper,
self.availability_recovery,
),
bitfield_signing: <Mapper as MapSubsystem<BS>>::map_subsystem(
&mapper,
self.bitfield_signing,
),
bitfield_distribution: <Mapper as MapSubsystem<BD>>::map_subsystem(
&mapper,
self.bitfield_distribution,
),
provisioner: <Mapper as MapSubsystem<P>>::map_subsystem(&mapper, self.provisioner),
runtime_api: <Mapper as MapSubsystem<RA>>::map_subsystem(&mapper, self.runtime_api),
availability_store: <Mapper as MapSubsystem<AS>>::map_subsystem(
&mapper,
self.availability_store,
),
network_bridge: <Mapper as MapSubsystem<NB>>::map_subsystem(
&mapper,
self.network_bridge,
),
chain_api: <Mapper as MapSubsystem<CA>>::map_subsystem(&mapper, self.chain_api),
collation_generation: <Mapper as MapSubsystem<CG>>::map_subsystem(
&mapper,
self.collation_generation,
),
collator_protocol: <Mapper as MapSubsystem<CP>>::map_subsystem(
&mapper,
self.collator_protocol,
),
approval_distribution: <Mapper as MapSubsystem<ApD>>::map_subsystem(
&mapper,
self.approval_distribution,
),
approval_voting: <Mapper as MapSubsystem<ApV>>::map_subsystem(
&mapper,
self.approval_voting,
),
gossip_support: <Mapper as MapSubsystem<GS>>::map_subsystem(
&mapper,
self.gossip_support,
),
dispute_coordinator: <Mapper as MapSubsystem<DC>>::map_subsystem(
&mapper,
self.dispute_coordinator,
),
dispute_participation: <Mapper as MapSubsystem<DP>>::map_subsystem(
&mapper,
self.dispute_participation,
),
dispute_distribution: <Mapper as MapSubsystem<DD>>::map_subsystem(
&mapper,
self.dispute_distribution,
),
chain_selection: <Mapper as MapSubsystem<CS>>::map_subsystem(
&mapper,
self.chain_selection,
),
}
}
}
+71 -109
View File
@@ -32,7 +32,12 @@ use polkadot_primitives::v1::{
ValidatorIndex,
};
use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer, OverseerConnector};
use crate::{
self as overseer,
dummy::{dummy_overseer_builder, one_for_all_overseer_builder},
gen::Delay,
HeadSupportsParachains,
};
use metered_channel as metered;
use assert_matches::assert_matches;
@@ -40,6 +45,15 @@ use sp_core::crypto::Pair as _;
use super::*;
fn block_info_to_pair(blocks: impl IntoIterator<Item = BlockInfo>) -> Vec<(Hash, BlockNumber)> {
use std::iter::FromIterator;
Vec::from_iter(
blocks
.into_iter()
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
)
}
type SpawnedSubsystem = crate::gen::SpawnedSubsystem<SubsystemError>;
struct TestSubsystem1(metered::MeteredSender<usize>);
@@ -159,20 +173,12 @@ fn overseer_works() {
let mut s1_rx = s1_rx.fuse();
let mut s2_rx = s2_rx.fuse();
let all_subsystems = AllSubsystems::<()>::dummy()
let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None)
.unwrap()
.replace_candidate_validation(move |_| TestSubsystem1(s1_tx))
.replace_candidate_backing(move |_| TestSubsystem2(s2_tx));
let (overseer, handle) = Overseer::new(
vec![],
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
.replace_candidate_backing(move |_| TestSubsystem2(s2_tx))
.build()
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
@@ -226,17 +232,14 @@ fn overseer_metrics_work() {
let third_block =
BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 };
let all_subsystems = AllSubsystems::<()>::dummy();
let registry = prometheus::Registry::new();
let (overseer, handle) = Overseer::new(
vec![first_block],
all_subsystems,
Some(&registry),
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let (overseer, handle) =
dummy_overseer_builder(spawner, MockSupportsParachains, Some(&registry))
.unwrap()
.leaves(block_info_to_pair(vec![first_block]))
.build()
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
@@ -264,13 +267,20 @@ fn overseer_metrics_work() {
fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> {
let gather = registry.gather();
let gather = &gather[2..];
assert_eq!(gather[0].get_name(), "parachain_activated_heads_total");
assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total");
assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total");
let activated = gather[0].get_metric()[0].get_counter().get_value() as u64;
let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64;
let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64;
assert!(!gather.is_empty(), "Gathered metrics are not empty. qed");
let extract = |name: &str| -> u64 {
gather
.iter()
.find(|&mf| dbg!(mf.get_name()) == dbg!(name))
.expect(&format!("Must contain `{}` metric", name))
.get_metric()[0]
.get_counter()
.get_value() as u64
};
let activated = extract("parachain_activated_heads_total");
let deactivated = extract("parachain_deactivated_heads_total");
let relayed = extract("parachain_messages_relayed_total");
let mut result = HashMap::new();
result.insert("activated", activated);
result.insert("deactivated", deactivated);
@@ -286,17 +296,11 @@ fn overseer_ends_on_subsystem_exit() {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let all_subsystems =
AllSubsystems::<()>::dummy().replace_candidate_backing(|_| ReturnOnStart);
let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let (overseer, _handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None)
.unwrap()
.replace_candidate_backing(|_| ReturnOnStart)
.build()
.unwrap();
overseer.run().await.unwrap();
})
@@ -394,18 +398,14 @@ fn overseer_start_stop_works() {
let (tx_5, mut rx_5) = metered::channel(64);
let (tx_6, mut rx_6) = metered::channel(64);
let all_subsystems = AllSubsystems::<()>::dummy()
let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None)
.unwrap()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6));
let (overseer, handle) = Overseer::new(
vec![first_block],
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
.leaves(block_info_to_pair(vec![first_block]))
.build()
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
@@ -496,20 +496,15 @@ fn overseer_finalize_works() {
let (tx_5, mut rx_5) = metered::channel(64);
let (tx_6, mut rx_6) = metered::channel(64);
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6));
// start with two forks of different height.
let (overseer, handle) = Overseer::new(
vec![first_block, second_block],
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None)
.unwrap()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
.leaves(block_info_to_pair(vec![first_block, second_block]))
.build()
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
@@ -592,18 +587,12 @@ fn do_not_send_empty_leaves_update_on_block_finalization() {
let (tx_5, mut rx_5) = metered::channel(64);
let all_subsystems =
AllSubsystems::<()>::dummy().replace_candidate_backing(move |_| TestSubsystem6(tx_5));
let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None)
.unwrap()
.replace_candidate_backing(move |_| TestSubsystem6(tx_5))
.build()
.unwrap();
let (overseer, handle) = Overseer::new(
Vec::new(),
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
@@ -854,39 +843,12 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
msgs_received.clone(),
);
let all_subsystems = AllSubsystems {
candidate_validation: subsystem.clone(),
candidate_backing: subsystem.clone(),
collation_generation: subsystem.clone(),
collator_protocol: subsystem.clone(),
statement_distribution: subsystem.clone(),
availability_distribution: subsystem.clone(),
availability_recovery: subsystem.clone(),
bitfield_signing: subsystem.clone(),
bitfield_distribution: subsystem.clone(),
provisioner: subsystem.clone(),
runtime_api: subsystem.clone(),
availability_store: subsystem.clone(),
network_bridge: subsystem.clone(),
chain_api: subsystem.clone(),
approval_distribution: subsystem.clone(),
approval_voting: subsystem.clone(),
gossip_support: subsystem.clone(),
dispute_coordinator: subsystem.clone(),
dispute_participation: subsystem.clone(),
dispute_distribution: subsystem.clone(),
chain_selection: subsystem.clone(),
};
let (overseer, handle) =
one_for_all_overseer_builder(spawner, MockSupportsParachains, subsystem, None)
.unwrap()
.build()
.unwrap();
let (overseer, handle) = Overseer::new(
vec![],
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
+1 -1
View File
@@ -224,7 +224,7 @@ pub struct Collation<BlockNumber = polkadot_primitives::v1::BlockNumber> {
pub hrmp_watermark: BlockNumber,
}
/// Signal that is being returned back when a collation was seconded by a validator.
/// Signal that is being returned when a collation was seconded by a validator.
#[derive(Debug)]
pub struct CollationSecondedSignal {
/// The hash of the relay chain block that was used as context to sign [`Self::statement`].
+5 -7
View File
@@ -24,12 +24,10 @@ mod parachains_db;
mod relay_chain_selection;
#[cfg(feature = "full-node")]
mod overseer;
pub mod overseer;
#[cfg(feature = "full-node")]
pub use self::overseer::{
create_default_subsystems, OverseerGen, OverseerGenArgs, RealOverseerGen,
};
pub use self::overseer::{OverseerGen, OverseerGenArgs, RealOverseerGen};
#[cfg(all(test, feature = "disputes"))]
mod tests;
@@ -56,6 +54,7 @@ pub use sp_core::traits::SpawnNamed;
pub use {
polkadot_overseer::{Handle, Overseer, OverseerConnector, OverseerHandle},
polkadot_primitives::v1::ParachainHost,
relay_chain_selection::SelectRelayChain,
sc_client_api::AuxStore,
sp_authority_discovery::AuthorityDiscoveryApi,
sp_blockchain::HeaderBackend,
@@ -736,14 +735,13 @@ where
is_relay_chain &&
(role.is_authority() || is_collator.is_collator());
use relay_chain_selection::SelectRelayChain;
let select_chain = SelectRelayChain::new(
basics.backend.clone(),
overseer_handle.clone(),
requires_overseer_for_chain_sel,
polkadot_node_subsystem_util::metrics::Metrics::register(prometheus_registry.as_ref())?,
);
let service::PartialComponents::<_, _, SelectRelayChain<_>, _, _, _> {
client,
backend,
@@ -1292,6 +1290,7 @@ where
Ok((task_manager, rpc_handlers))
}
#[cfg(feature = "full-node")]
macro_rules! chain_ops {
($config:expr, $jaeger_agent:expr, $telemetry_worker_handle:expr; $scope:ident, $executor:ident, $variant:ident) => {{
let telemetry_worker_handle = $telemetry_worker_handle;
@@ -1354,7 +1353,6 @@ pub fn new_chain_ops(
{
return chain_ops!(config, jaeger_agent, telemetry_worker_handle; polkadot_runtime, PolkadotExecutorDispatch, Polkadot)
}
#[cfg(not(feature = "polkadot-native"))]
Err(Error::NoRuntime)
}
+7 -163
View File
@@ -24,10 +24,13 @@ use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver};
#[cfg(any(feature = "malus", test))]
pub use polkadot_overseer::dummy::DummySubsystem;
pub use polkadot_overseer::{
metrics::Metrics, AllSubsystems, BlockInfo, HeadSupportsParachains, MetricsTrait, Overseer,
OverseerBuilder, OverseerConnector, OverseerHandle,
dummy::{dummy_overseer_builder, DummySubsystem},
HeadSupportsParachains,
};
use polkadot_overseer::{
metrics::Metrics as OverseerMetrics, BlockInfo, MetricsTrait, Overseer, OverseerBuilder,
OverseerConnector, OverseerHandle,
};
use polkadot_primitives::v1::ParachainHost;
@@ -106,165 +109,6 @@ where
pub dispute_coordinator_config: DisputeCoordinatorConfig,
}
/// Create a default, unaltered set of subsystems.
///
/// A convenience for usage with malus, to avoid
/// repetitive code across multiple behavior strain implementations.
pub fn create_default_subsystems<'a, Spawner, RuntimeClient>(
OverseerGenArgs {
keystore,
runtime_client,
parachains_db,
network_service,
authority_discovery_service,
pov_req_receiver,
chunk_req_receiver,
collation_req_receiver,
available_data_req_receiver,
statement_req_receiver,
dispute_req_receiver,
registry,
spawner,
is_collator,
approval_voting_config,
availability_config,
candidate_validation_config,
chain_selection_config,
dispute_coordinator_config,
..
}: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<
AllSubsystems<
CandidateValidationSubsystem,
CandidateBackingSubsystem<Spawner>,
StatementDistributionSubsystem,
AvailabilityDistributionSubsystem,
AvailabilityRecoverySubsystem,
BitfieldSigningSubsystem<Spawner>,
BitfieldDistributionSubsystem,
ProvisionerSubsystem<Spawner>,
RuntimeApiSubsystem<RuntimeClient>,
AvailabilityStoreSubsystem,
NetworkBridgeSubsystem<
Arc<sc_network::NetworkService<Block, Hash>>,
AuthorityDiscoveryService,
>,
ChainApiSubsystem<RuntimeClient>,
CollationGenerationSubsystem,
CollatorProtocolSubsystem,
ApprovalDistributionSubsystem,
ApprovalVotingSubsystem,
GossipSupportSubsystem<AuthorityDiscoveryService>,
DisputeCoordinatorSubsystem,
DisputeParticipationSubsystem,
DisputeDistributionSubsystem<AuthorityDiscoveryService>,
ChainSelectionSubsystem,
>,
Error,
>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
use polkadot_node_subsystem_util::metrics::Metrics;
let all_subsystems = AllSubsystems {
availability_distribution: AvailabilityDistributionSubsystem::new(
keystore.clone(),
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
Metrics::register(registry)?,
),
availability_recovery: AvailabilityRecoverySubsystem::with_chunks_only(
available_data_req_receiver,
Metrics::register(registry)?,
),
availability_store: AvailabilityStoreSubsystem::new(
parachains_db.clone(),
availability_config,
Metrics::register(registry)?,
),
bitfield_distribution: BitfieldDistributionSubsystem::new(Metrics::register(registry)?),
bitfield_signing: BitfieldSigningSubsystem::new(
spawner.clone(),
keystore.clone(),
Metrics::register(registry)?,
),
candidate_backing: CandidateBackingSubsystem::new(
spawner.clone(),
keystore.clone(),
Metrics::register(registry)?,
),
candidate_validation: CandidateValidationSubsystem::with_config(
candidate_validation_config,
Metrics::register(registry)?, // candidate-validation metrics
Metrics::register(registry)?, // validation host metrics
),
chain_api: ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?),
collation_generation: CollationGenerationSubsystem::new(Metrics::register(registry)?),
collator_protocol: {
let side = match is_collator {
IsCollator::Yes(collator_pair) => ProtocolSide::Collator(
network_service.local_peer_id().clone(),
collator_pair,
collation_req_receiver,
Metrics::register(registry)?,
),
IsCollator::No => ProtocolSide::Validator {
keystore: keystore.clone(),
eviction_policy: Default::default(),
metrics: Metrics::register(registry)?,
},
};
CollatorProtocolSubsystem::new(side)
},
network_bridge: NetworkBridgeSubsystem::new(
network_service.clone(),
authority_discovery_service.clone(),
Box::new(network_service.clone()),
Metrics::register(registry)?,
),
provisioner: ProvisionerSubsystem::new(spawner.clone(), (), Metrics::register(registry)?),
runtime_api: RuntimeApiSubsystem::new(
runtime_client.clone(),
Metrics::register(registry)?,
spawner.clone(),
),
statement_distribution: StatementDistributionSubsystem::new(
keystore.clone(),
statement_req_receiver,
Metrics::register(registry)?,
),
approval_distribution: ApprovalDistributionSubsystem::new(Metrics::register(registry)?),
approval_voting: ApprovalVotingSubsystem::with_config(
approval_voting_config,
parachains_db.clone(),
keystore.clone(),
Box::new(network_service.clone()),
Metrics::register(registry)?,
),
gossip_support: GossipSupportSubsystem::new(
keystore.clone(),
authority_discovery_service.clone(),
),
dispute_coordinator: DisputeCoordinatorSubsystem::new(
parachains_db.clone(),
dispute_coordinator_config,
keystore.clone(),
Metrics::register(registry)?,
),
dispute_participation: DisputeParticipationSubsystem::new(),
dispute_distribution: DisputeDistributionSubsystem::new(
keystore.clone(),
dispute_req_receiver,
authority_discovery_service.clone(),
Metrics::register(registry)?,
),
chain_selection: ChainSelectionSubsystem::new(chain_selection_config, parachains_db),
};
Ok(all_subsystems)
}
/// Obtain a prepared `OverseerBuilder`, that is initialized
/// with all default values.
pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
@@ -329,7 +173,7 @@ where
use polkadot_node_subsystem_util::metrics::Metrics;
use std::iter::FromIterator;
let metrics = <polkadot_overseer::metrics::Metrics as MetricsTrait>::register(registry)?;
let metrics = <OverseerMetrics as MetricsTrait>::register(registry)?;
let builder = Overseer::builder()
.availability_distribution(AvailabilityDistributionSubsystem::new(
@@ -372,9 +372,7 @@ mod tests {
use super::*;
use futures::executor::block_on;
use polkadot_node_subsystem::messages::CollatorProtocolMessage;
use polkadot_overseer::{
AllSubsystems, Handle, HeadSupportsParachains, Overseer, OverseerConnector,
};
use polkadot_overseer::{dummy::dummy_overseer_builder, Handle, HeadSupportsParachains};
use polkadot_primitives::v1::Hash;
struct AlwaysSupportsParachains;
@@ -388,17 +386,14 @@ mod tests {
fn forward_subsystem_works() {
let spawner = sp_core::testing::TaskExecutor::new();
let (tx, rx) = mpsc::channel(2);
let all_subsystems =
AllSubsystems::<()>::dummy().replace_collator_protocol(|_| ForwardSubsystem(tx));
let (overseer, handle) = Overseer::new(
Vec::new(),
all_subsystems,
None,
AlwaysSupportsParachains,
spawner.clone(),
OverseerConnector::default(),
)
.unwrap();
let (overseer, handle) =
dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None)
.unwrap()
.replace_collator_protocol(|_| ForwardSubsystem(tx))
.leaves(vec![])
.build()
.unwrap();
let mut handle = Handle::new(handle);
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
@@ -523,7 +523,7 @@ pub enum ChainApiMessage {
/// Get the cumulative weight of the given block, by hash.
/// If the block or weight is unknown, this returns `None`.
///
/// Note: this the weight within the low-level fork-choice rule,
/// Note: this is the weight within the low-level fork-choice rule,
/// not the high-level one implemented in the chain-selection subsystem.
///
/// Weight is used for comparing blocks in a fork-choice rule.
+1 -1
View File
@@ -275,7 +275,7 @@ pub mod pallet {
FirstPeriodTooFarInFuture,
/// Last lease period must be greater than first lease period.
LastPeriodBeforeFirstPeriod,
/// The last lease period cannot be more then 3 periods after the first period.
/// The last lease period cannot be more than 3 periods after the first period.
LastPeriodTooFarInFuture,
/// The campaign ends before the current block number. The end must be in the future.
CannotEndInPast,
+1 -1
View File
@@ -34,7 +34,7 @@ use sp_std::prelude::*;
type BalanceOf<T> =
<<T as Config>::Currency as Currency<<T as frame_system::Config>::AccountId>>::Balance;
/// The kind of a statement an account needs to make for a claim to be valid.
/// The kind of statement an account needs to make for a claim to be valid.
#[derive(Encode, Decode, Clone, Copy, Eq, PartialEq, RuntimeDebug, TypeInfo)]
pub enum AccountValidity {
/// Account is not valid.
+1 -1
View File
@@ -127,7 +127,7 @@ pub trait Leaser {
) -> Result<(), LeaseError>;
/// Return the amount of balance currently held in reserve on `leaser`'s account for leasing `para`. This won't
/// go down outside of a lease period.
/// go down outside a lease period.
fn deposit_held(
para: ParaId,
leaser: &Self::AccountId,
+1 -1
View File
@@ -387,7 +387,7 @@ pub mod pallet {
/// parameters.
///
/// - `proposed_max_capacity` - specifies how many messages can be in the channel at once.
/// - `proposed_max_message_size` - specifies the maximum size of any of the messages.
/// - `proposed_max_message_size` - specifies the maximum size of the messages.
///
/// These numbers are a subject to the relay-chain configuration limits.
///
+2 -2
View File
@@ -79,7 +79,7 @@ pub mod pallet {
/// The overarching event type.
type Event: From<Event<Self>> + IsType<<Self as frame_system::Config>::Event>;
/// Required origin for sending XCM messages. If successful, the it resolves to `MultiLocation`
/// Required origin for sending XCM messages. If successful, it resolves to `MultiLocation`
/// which exists as an interior location within this chain's XCM context.
type SendXcmOrigin: EnsureOrigin<<Self as SysConfig>::Origin, Success = MultiLocation>;
@@ -328,7 +328,7 @@ pub mod pallet {
#[pallet::storage]
pub(super) type SafeXcmVersion<T: Config> = StorageValue<_, XcmVersion, OptionQuery>;
/// Latest versions that we know various locations support.
/// The Latest versions that we know various locations support.
#[pallet::storage]
pub(super) type SupportedVersion<T: Config> = StorageDoubleMap<
_,
+2 -2
View File
@@ -99,7 +99,7 @@ impl MultiLocation {
MultiLocation { parents, interior: Junctions::Here }
}
/// Whether or not the `MultiLocation` has no parents and has a `Here` interior.
/// Whether the `MultiLocation` has no parents and has a `Here` interior.
pub const fn is_here(&self) -> bool {
self.parents == 0 && self.interior.len() == 0
}
@@ -119,7 +119,7 @@ impl MultiLocation {
self.parents
}
/// Returns boolean indicating whether or not `self` contains only the specified amount of
/// Returns boolean indicating whether `self` contains only the specified amount of
/// parents and no interior junctions.
pub const fn contains_parents_only(&self, count: u8) -> bool {
matches!(self.interior, Junctions::Here) && self.parents == count