start working on building the real overseer (#1795)

* start working on building the real overseer

Unfortunately, this fails to compile right now due to an upstream
failure to compile which is probably brought on by a recent upgrade
to rustc v1.47.

* fill in AllSubsystems internal constructors

* replace fn make_metrics with Metrics::attempt_to_register

* update to account for #1740

* remove Metrics::register, rename Metrics::attempt_to_register

* add 'static bounds to real_overseer type params

* pass authority_discovery and network_service to real_overseer

It's not straightforwardly obvious that this is the best way to handle
the case when there is no authority discovery service, but it seems
to be the best option available at the moment.

* select a proper database configuration for the availability store db

* use subdirectory for av-store database path

* apply Basti's patch which avoids needing to parameterize everything on Block

* simplify path extraction

* get all tests to compile

* Fix Prometheus double-registry error

for debugging purposes, added this to node/subsystem-util/src/lib.rs:472-476:

```rust
Some(registry) => Self::try_register(registry).map_err(|err| {
	eprintln!("PrometheusError calling {}::register: {:?}", std::any::type_name::<Self>(), err);
	err
}),
```

That pointed out where the registration was failing, which led to
this fix. The test still doesn't pass, but it now fails in a new
and different way!

* authorities must have authority discovery, but not necessarily overseer handlers

* fix broken SpawnedSubsystem impls

detailed logging determined that using the `Box::new` style of
future generation, the `self.run` method was never being called,
leading to dropped receivers / closed senders for those subsystems,
causing the overseer to shut down immediately.

This is not the final fix needed to get things working properly,
but it's a good start.

* use prometheus properly

Prometheus lets us register simple counters, which aren't very
interesting. It also allows us to register CounterVecs, which are.
With a CounterVec, you can provide a set of labels, which can
later be used to filter the counts.

We were using them wrong, though. This pattern was repeated in a
variety of places in the code:

```rust
// panics with an cardinality mismatch
let my_counter = register(CounterVec::new(opts, &["succeeded", "failed"])?, registry)?;
my_counter.with_label_values(&["succeeded"]).inc()
```

The problem is that the labels provided in the constructor are not
the set of legal values which can be annotated, but a set of individual
label names which can have individual, arbitrary values.

This commit fixes that.

* get av-store subsystem to actually run properly and not die on first signal

* typo fix: incomming -> incoming

* don't disable authority discovery in test nodes

* Fix rococo-v1 missing session keys

* Update node/core/av-store/Cargo.toml

* try dummying out av-store on non-full-nodes

* overseer and subsystems are required only for full nodes

* Reduce the amount of warnings on browser target

* Fix two more warnings

* InclusionInherent should actually have an Inherent module on rococo

* Ancestry: don't return genesis' parent hash

* Update Cargo.lock

* fix broken test

* update test script: specify chainspec as script argument

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Update node/service/src/lib.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* node/service/src/lib: Return error via ? operator

* post-merge blues

* add is_collator flag

* prevent occasional av-store test panic

* simplify fix; expand application

* run authority_discovery in Role::Discover when collating

* distinguish between proposer closed channel errors

* add IsCollator enum, remove is_collator CLI flag

* improve formatting

* remove nop loop

* Fix some stuff

Co-authored-by: Andronik Ordian <write@reusable.software>
Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
Co-authored-by: Robert Habermeier <robert@Roberts-MBP.lan1>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Peter Goodspeed-Niklaus
2020-10-28 11:26:50 +01:00
committed by GitHub
parent 5fbb8a381c
commit 1a25c41277
29 changed files with 420 additions and 180 deletions
+19 -1
View File
@@ -4922,6 +4922,7 @@ dependencies = [
"polkadot-node-subsystem-util", "polkadot-node-subsystem-util",
"polkadot-overseer", "polkadot-overseer",
"polkadot-primitives", "polkadot-primitives",
"sc-service",
"smallvec 1.4.2", "smallvec 1.4.2",
"sp-core", "sp-core",
"thiserror", "thiserror",
@@ -5108,6 +5109,7 @@ dependencies = [
"sc-network", "sc-network",
"smallvec 1.4.2", "smallvec 1.4.2",
"sp-core", "sp-core",
"substrate-prometheus-endpoint",
"thiserror", "thiserror",
] ]
@@ -5454,13 +5456,29 @@ dependencies = [
"pallet-transaction-payment-rpc-runtime-api", "pallet-transaction-payment-rpc-runtime-api",
"parity-scale-codec", "parity-scale-codec",
"parking_lot 0.9.0", "parking_lot 0.9.0",
"polkadot-availability-bitfield-distribution",
"polkadot-availability-distribution",
"polkadot-collator-protocol",
"polkadot-network-bridge",
"polkadot-node-collation-generation",
"polkadot-node-core-av-store",
"polkadot-node-core-backing",
"polkadot-node-core-bitfield-signing",
"polkadot-node-core-candidate-selection",
"polkadot-node-core-candidate-validation",
"polkadot-node-core-chain-api",
"polkadot-node-core-proposer", "polkadot-node-core-proposer",
"polkadot-node-core-provisioner",
"polkadot-node-core-runtime-api",
"polkadot-node-subsystem", "polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-overseer", "polkadot-overseer",
"polkadot-parachain", "polkadot-parachain",
"polkadot-pov-distribution",
"polkadot-primitives", "polkadot-primitives",
"polkadot-rpc", "polkadot-rpc",
"polkadot-runtime", "polkadot-runtime",
"polkadot-statement-distribution",
"polkadot-test-client", "polkadot-test-client",
"rococo-v1-runtime", "rococo-v1-runtime",
"sc-authority-discovery", "sc-authority-discovery",
@@ -5472,7 +5490,6 @@ dependencies = [
"sc-consensus-babe", "sc-consensus-babe",
"sc-executor", "sc-executor",
"sc-finality-grandpa", "sc-finality-grandpa",
"sc-keystore",
"sc-network", "sc-network",
"sc-service", "sc-service",
"sc-telemetry", "sc-telemetry",
@@ -5489,6 +5506,7 @@ dependencies = [
"sp-finality-grandpa", "sp-finality-grandpa",
"sp-inherents", "sp-inherents",
"sp-io", "sp-io",
"sp-keystore",
"sp-offchain", "sp-offchain",
"sp-runtime", "sp-runtime",
"sp-session", "sp-session",
-1
View File
@@ -20,7 +20,6 @@ use browser_utils::{
Client, Client,
browser_configuration, set_console_error_panic_hook, init_console_log, browser_configuration, set_console_error_panic_hook, init_console_log,
}; };
use std::str::FromStr;
/// Starts the client. /// Starts the client.
#[wasm_bindgen] #[wasm_bindgen]
+1
View File
@@ -148,6 +148,7 @@ pub fn run() -> Result<()> {
_ => service::build_full( _ => service::build_full(
config, config,
authority_discovery_disabled, authority_discovery_disabled,
service::IsCollator::No,
grandpa_pause, grandpa_pause,
).map(|full| full.task_manager), ).map(|full| full.task_manager),
} }
+2
View File
@@ -19,6 +19,8 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-overseer = { path = "../../overseer" } polkadot-overseer = { path = "../../overseer" }
polkadot-primitives = { path = "../../../primitives" } polkadot-primitives = { path = "../../../primitives" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
[dev-dependencies] [dev-dependencies]
env_logger = "0.7.1" env_logger = "0.7.1"
assert_matches = "1.3.0" assert_matches = "1.3.0"
+31 -12
View File
@@ -397,6 +397,23 @@ pub struct Config {
pub path: PathBuf, pub path: PathBuf,
} }
impl std::convert::TryFrom<sc_service::config::DatabaseConfig> for Config {
type Error = &'static str;
fn try_from(config: sc_service::config::DatabaseConfig) -> Result<Self, Self::Error> {
let path = config.path().ok_or("custom databases are not supported")?;
Ok(Self {
// substrate cache size is improper here; just use the default
cache_size: None,
// DB path is a sub-directory of substrate db path to give two properties:
// 1: column numbers don't conflict with substrate
// 2: commands like purge-chain work without further changes
path: path.join("parachains").join("av-store"),
})
}
}
impl AvailabilityStoreSubsystem { impl AvailabilityStoreSubsystem {
/// Create a new `AvailabilityStoreSubsystem` with a given config on disk. /// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> { pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> {
@@ -449,7 +466,6 @@ async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Contex
where where
Context: SubsystemContext<Message=AvailabilityStoreMessage>, Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{ {
let ctx = &mut ctx;
loop { loop {
// Every time the following two methods are called a read from DB is performed. // Every time the following two methods are called a read from DB is performed.
// But given that these are very small values which are essentially a newtype // But given that these are very small values which are essentially a newtype
@@ -470,16 +486,19 @@ where
ActiveLeavesUpdate { activated, .. }) ActiveLeavesUpdate { activated, .. })
)) => { )) => {
for activated in activated.into_iter() { for activated in activated.into_iter() {
process_block_activated(ctx, &subsystem.inner, activated).await?; process_block_activated(&mut ctx, &subsystem.inner, activated).await?;
} }
} }
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(hash))) => { Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(hash))) => {
process_block_finalized(&subsystem, ctx, &subsystem.inner, hash).await?; process_block_finalized(&subsystem, &mut ctx, &subsystem.inner, hash).await?;
} }
Ok(FromOverseer::Communication { msg }) => { Ok(FromOverseer::Communication { msg }) => {
process_message(&mut subsystem, ctx, msg).await?; process_message(&mut subsystem, &mut ctx, msg).await?;
} }
Err(_) => break, Err(e) => {
log::error!("AvailabilityStoreSubsystem err: {:#?}", e);
break
},
} }
} }
pov_pruning_time = pov_pruning_time => { pov_pruning_time = pov_pruning_time => {
@@ -945,15 +964,15 @@ fn query_inner<D: Decode>(db: &Arc<dyn KeyValueDB>, column: u32, key: &[u8]) ->
} }
impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
where where
Context: SubsystemContext<Message=AvailabilityStoreMessage>, Context: SubsystemContext<Message = AvailabilityStoreMessage>,
{ {
fn start(self, ctx: Context) -> SpawnedSubsystem { fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(async move { let future = run(self, ctx)
if let Err(e) = run(self, ctx).await { .map(|r| if let Err(e) = r {
log::error!(target: LOG_TARGET, "Subsystem exited with an error {:?}", e); log::error!(target: "availabilitystore", "Subsystem exited with an error {:?}", e);
} })
}); .boxed();
SpawnedSubsystem { SpawnedSubsystem {
name: "availability-store-subsystem", name: "availability-store-subsystem",
@@ -420,10 +420,10 @@ impl metrics::Metrics for Metrics {
seconds: prometheus::register( seconds: prometheus::register(
prometheus::CounterVec::new( prometheus::CounterVec::new(
prometheus::Opts::new( prometheus::Opts::new(
"candidate_selection_invalid_selections_total", "candidate_selection_seconds_total",
"Number of Candidate Selection subsystem seconding selections which proved to be invalid.", "Number of Candidate Selection subsystem seconding events.",
), ),
&["succeeded", "failed"], &["success"],
)?, )?,
registry, registry,
)?, )?,
@@ -433,7 +433,7 @@ impl metrics::Metrics for Metrics {
"candidate_selection_invalid_selections_total", "candidate_selection_invalid_selections_total",
"Number of Candidate Selection subsystem seconding selections which proved to be invalid.", "Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
), ),
&["succeeded", "failed"], &["success"],
)?, )?,
registry, registry,
)?, )?,
@@ -82,7 +82,7 @@ impl Metrics {
metrics.validation_requests.with_label_values(&["invalid"]).inc(); metrics.validation_requests.with_label_values(&["invalid"]).inc();
}, },
Err(_) => { Err(_) => {
metrics.validation_requests.with_label_values(&["failed"]).inc(); metrics.validation_requests.with_label_values(&["validation failure"]).inc();
}, },
} }
} }
@@ -98,7 +98,7 @@ impl metrics::Metrics for Metrics {
"parachain_validation_requests_total", "parachain_validation_requests_total",
"Number of validation requests served.", "Number of validation requests served.",
), ),
&["valid", "invalid", "failed"], &["validity"],
)?, )?,
registry, registry,
)?, )?,
+11 -5
View File
@@ -40,18 +40,19 @@ use polkadot_node_subsystem_util::{
}; };
use polkadot_primitives::v1::{Block, BlockId}; use polkadot_primitives::v1::{Block, BlockId};
use sp_blockchain::HeaderBackend; use sp_blockchain::HeaderBackend;
use std::sync::Arc;
use futures::prelude::*; use futures::prelude::*;
/// The Chain API Subsystem implementation. /// The Chain API Subsystem implementation.
pub struct ChainApiSubsystem<Client> { pub struct ChainApiSubsystem<Client> {
client: Client, client: Arc<Client>,
metrics: Metrics, metrics: Metrics,
} }
impl<Client> ChainApiSubsystem<Client> { impl<Client> ChainApiSubsystem<Client> {
/// Create a new Chain API subsystem with the given client. /// Create a new Chain API subsystem with the given client.
pub fn new(client: Client, metrics: Metrics) -> Self { pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
ChainApiSubsystem { ChainApiSubsystem {
client, client,
metrics, metrics,
@@ -126,10 +127,15 @@ where
// fewer than `k` ancestors are available // fewer than `k` ancestors are available
Ok(None) => None, Ok(None) => None,
Ok(Some(header)) => { Ok(Some(header)) => {
// stop at the genesis header.
if header.number == 1 {
None
} else {
hash = header.parent_hash; hash = header.parent_hash;
Some(Ok(hash)) Some(Ok(hash))
} }
} }
}
}); });
let result = next_parent.take(k).collect::<Result<Vec<_>, _>>(); let result = next_parent.take(k).collect::<Result<Vec<_>, _>>();
@@ -171,7 +177,7 @@ impl metrics::Metrics for Metrics {
"parachain_chain_api_requests_total", "parachain_chain_api_requests_total",
"Number of Chain API requests served.", "Number of Chain API requests served.",
), ),
&["succeeded", "failed"], &["success"],
)?, )?,
registry, registry,
)?, )?,
@@ -300,11 +306,11 @@ mod tests {
} }
fn test_harness( fn test_harness(
test: impl FnOnce(TestClient, TestSubsystemContextHandle<ChainApiMessage>) test: impl FnOnce(Arc<TestClient>, TestSubsystemContextHandle<ChainApiMessage>)
-> BoxFuture<'static, ()>, -> BoxFuture<'static, ()>,
) { ) {
let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
let client = TestClient::default(); let client = Arc::new(TestClient::default());
let subsystem = ChainApiSubsystem::new(client.clone(), Metrics(None)); let subsystem = ChainApiSubsystem::new(client.clone(), Metrics(None));
let chain_api_task = run(ctx, subsystem).map(|x| x.unwrap()); let chain_api_task = run(ctx, subsystem).map(|x| x.unwrap());
+6 -5
View File
@@ -145,7 +145,7 @@ where
let (sender, receiver) = futures::channel::oneshot::channel(); let (sender, receiver) = futures::channel::oneshot::channel();
overseer.wait_for_activation(parent_header_hash, sender).await?; overseer.wait_for_activation(parent_header_hash, sender).await?;
receiver.await.map_err(Error::ClosedChannelFromProvisioner)??; receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)??;
let (sender, receiver) = futures::channel::oneshot::channel(); let (sender, receiver) = futures::channel::oneshot::channel();
// strictly speaking, we don't _have_ to .await this send_msg before opening the // strictly speaking, we don't _have_ to .await this send_msg before opening the
@@ -156,7 +156,7 @@ where
ProvisionerMessage::RequestInherentData(parent_header_hash, sender), ProvisionerMessage::RequestInherentData(parent_header_hash, sender),
)).await?; )).await?;
receiver.await.map_err(Error::ClosedChannelFromProvisioner) receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData)
} }
.boxed() .boxed()
.fuse(); .fuse();
@@ -236,7 +236,8 @@ pub enum Error {
Blockchain(sp_blockchain::Error), Blockchain(sp_blockchain::Error),
Inherent(sp_inherents::Error), Inherent(sp_inherents::Error),
Timeout, Timeout,
ClosedChannelFromProvisioner(futures::channel::oneshot::Canceled), ClosedChannelAwaitingActivation,
ClosedChannelAwaitingInherentData,
Subsystem(SubsystemError) Subsystem(SubsystemError)
} }
@@ -271,7 +272,8 @@ impl fmt::Display for Error {
Self::Blockchain(err) => write!(f, "blockchain error: {}", err), Self::Blockchain(err) => write!(f, "blockchain error: {}", err),
Self::Inherent(err) => write!(f, "inherent error: {:?}", err), Self::Inherent(err) => write!(f, "inherent error: {:?}", err),
Self::Timeout => write!(f, "timeout: provisioner did not return inherent data after {:?}", PROPOSE_TIMEOUT), Self::Timeout => write!(f, "timeout: provisioner did not return inherent data after {:?}", PROPOSE_TIMEOUT),
Self::ClosedChannelFromProvisioner(err) => write!(f, "provisioner closed inherent data channel before sending: {}", err), Self::ClosedChannelAwaitingActivation => write!(f, "closed channel from overseer when awaiting activation"),
Self::ClosedChannelAwaitingInherentData => write!(f, "closed channel from provisioner when awaiting inherent data"),
Self::Subsystem(err) => write!(f, "subsystem error: {:?}", err), Self::Subsystem(err) => write!(f, "subsystem error: {:?}", err),
} }
} }
@@ -282,7 +284,6 @@ impl std::error::Error for Error {
match self { match self {
Self::Consensus(err) => Some(err), Self::Consensus(err) => Some(err),
Self::Blockchain(err) => Some(err), Self::Blockchain(err) => Some(err),
Self::ClosedChannelFromProvisioner(err) => Some(err),
Self::Subsystem(err) => Some(err), Self::Subsystem(err) => Some(err),
_ => None _ => None
} }
+2 -2
View File
@@ -482,7 +482,7 @@ impl Metrics {
fn on_inherent_data_request(&self, response: Result<(), ()>) { fn on_inherent_data_request(&self, response: Result<(), ()>) {
if let Some(metrics) = &self.0 { if let Some(metrics) = &self.0 {
match response { match response {
Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeded"]).inc(), Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeeded"]).inc(),
Err(()) => metrics.inherent_data_requests.with_label_values(&["failed"]).inc(), Err(()) => metrics.inherent_data_requests.with_label_values(&["failed"]).inc(),
} }
} }
@@ -498,7 +498,7 @@ impl metrics::Metrics for Metrics {
"parachain_inherent_data_requests_total", "parachain_inherent_data_requests_total",
"Number of InherentData requests served by provisioner.", "Number of InherentData requests served by provisioner.",
), ),
&["succeeded", "failed"], &["success"],
)?, )?,
registry, registry,
)?, )?,
+21 -16
View File
@@ -34,6 +34,7 @@ use polkadot_node_subsystem_util::{
metrics::{self, prometheus}, metrics::{self, prometheus},
}; };
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost}; use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
use std::sync::Arc;
use sp_api::{ProvideRuntimeApi}; use sp_api::{ProvideRuntimeApi};
@@ -41,19 +42,19 @@ use futures::prelude::*;
/// The `RuntimeApiSubsystem`. See module docs for more details. /// The `RuntimeApiSubsystem`. See module docs for more details.
pub struct RuntimeApiSubsystem<Client> { pub struct RuntimeApiSubsystem<Client> {
client: Client, client: Arc<Client>,
metrics: Metrics, metrics: Metrics,
} }
impl<Client> RuntimeApiSubsystem<Client> { impl<Client> RuntimeApiSubsystem<Client> {
/// Create a new Runtime API subsystem wrapping the given client and metrics. /// Create a new Runtime API subsystem wrapping the given client and metrics.
pub fn new(client: Client, metrics: Metrics) -> Self { pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
RuntimeApiSubsystem { client, metrics } RuntimeApiSubsystem { client, metrics }
} }
} }
impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
Client: ProvideRuntimeApi<Block> + Send + 'static, Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
Client::Api: ParachainHost<Block>, Client::Api: ParachainHost<Block>,
Context: SubsystemContext<Message = RuntimeApiMessage> Context: SubsystemContext<Message = RuntimeApiMessage>
{ {
@@ -79,7 +80,7 @@ async fn run<Client>(
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
FromOverseer::Communication { msg } => match msg { FromOverseer::Communication { msg } => match msg {
RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request( RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request(
&subsystem.client, &*subsystem.client,
&subsystem.metrics, &subsystem.metrics,
relay_parent, relay_parent,
request, request,
@@ -159,7 +160,7 @@ impl metrics::Metrics for Metrics {
"parachain_runtime_api_requests_total", "parachain_runtime_api_requests_total",
"Number of Runtime API requests served.", "Number of Runtime API requests served.",
), ),
&["succeeded", "failed"], &["success"],
)?, )?,
registry, registry,
)?, )?,
@@ -288,7 +289,7 @@ mod tests {
#[test] #[test]
fn requests_validators() { fn requests_validators() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default(); let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into(); let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
@@ -311,7 +312,7 @@ mod tests {
#[test] #[test]
fn requests_validator_groups() { fn requests_validator_groups() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default(); let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into(); let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
@@ -334,7 +335,7 @@ mod tests {
#[test] #[test]
fn requests_availability_cores() { fn requests_availability_cores() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default(); let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into(); let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
@@ -357,12 +358,12 @@ mod tests {
#[test] #[test]
fn requests_persisted_validation_data() { fn requests_persisted_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default(); let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into(); let relay_parent = [1; 32].into();
let para_a = 5.into(); let para_a = 5.into();
let para_b = 6.into(); let para_b = 6.into();
runtime_api.validation_data.insert(para_a, Default::default()); Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
@@ -397,12 +398,12 @@ mod tests {
#[test] #[test]
fn requests_full_validation_data() { fn requests_full_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default(); let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into(); let relay_parent = [1; 32].into();
let para_a = 5.into(); let para_a = 5.into();
let para_b = 6.into(); let para_b = 6.into();
runtime_api.validation_data.insert(para_a, Default::default()); Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
@@ -446,6 +447,8 @@ mod tests {
runtime_api.validation_outputs_results.insert(para_a, false); runtime_api.validation_outputs_results.insert(para_a, false);
runtime_api.validation_outputs_results.insert(para_b, true); runtime_api.validation_outputs_results.insert(para_b, true);
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move { let test_task = async move {
@@ -491,7 +494,7 @@ mod tests {
#[test] #[test]
fn requests_session_index_for_child() { fn requests_session_index_for_child() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default(); let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into(); let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
@@ -514,12 +517,12 @@ mod tests {
#[test] #[test]
fn requests_validation_code() { fn requests_validation_code() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default(); let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into(); let relay_parent = [1; 32].into();
let para_a = 5.into(); let para_a = 5.into();
let para_b = 6.into(); let para_b = 6.into();
runtime_api.validation_code.insert(para_a, Default::default()); Arc::get_mut(&mut runtime_api).unwrap().validation_code.insert(para_a, Default::default());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
@@ -561,6 +564,8 @@ mod tests {
runtime_api.candidate_pending_availability.insert(para_a, Default::default()); runtime_api.candidate_pending_availability.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move { let test_task = async move {
@@ -595,7 +600,7 @@ mod tests {
#[test] #[test]
fn requests_candidate_events() { fn requests_candidate_events() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default(); let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into(); let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
@@ -796,7 +796,7 @@ impl AvailabilityDistributionSubsystem {
{ {
warn!( warn!(
target: TARGET, target: TARGET,
"Failed to handle incomming network messages: {:?}", e "Failed to handle incoming network messages: {:?}", e
); );
} }
} }
@@ -169,7 +169,7 @@ impl BitfieldDistribution {
trace!(target: TARGET, "Processing NetworkMessage"); trace!(target: TARGET, "Processing NetworkMessage");
// a network message was received // a network message was received
if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await { if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await {
warn!(target: TARGET, "Failed to handle incomming network messages: {:?}", e); warn!(target: TARGET, "Failed to handle incoming network messages: {:?}", e);
} }
} }
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => { FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
@@ -45,7 +45,7 @@ use polkadot_node_subsystem_util::{
}; };
#[derive(Clone, Default)] #[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>); pub struct Metrics(Option<MetricsInner>);
impl Metrics { impl Metrics {
fn on_advertisment_made(&self) { fn on_advertisment_made(&self) {
@@ -37,7 +37,7 @@ use polkadot_node_network_protocol::{
use polkadot_primitives::v1::CollatorId; use polkadot_primitives::v1::CollatorId;
use polkadot_node_subsystem_util::{ use polkadot_node_subsystem_util::{
self as util, self as util,
metrics::{self, prometheus}, metrics::prometheus,
}; };
mod collator_side; mod collator_side;
@@ -72,8 +72,11 @@ impl From<util::validator_discovery::Error> for Error {
type Result<T> = std::result::Result<T, Error>; type Result<T> = std::result::Result<T, Error>;
enum ProtocolSide { /// What side of the collator protocol is being engaged
pub enum ProtocolSide {
/// Validators operate on the relay chain.
Validator(validator_side::Metrics), Validator(validator_side::Metrics),
/// Collators operate on a parachain.
Collator(CollatorId, collator_side::Metrics), Collator(CollatorId, collator_side::Metrics),
} }
@@ -87,13 +90,7 @@ impl CollatorProtocolSubsystem {
/// If `id` is `Some` this is a collator side of the protocol. /// If `id` is `Some` this is a collator side of the protocol.
/// If `id` is `None` this is a validator side of the protocol. /// If `id` is `None` this is a validator side of the protocol.
/// Caller must provide a registry for prometheus metrics. /// Caller must provide a registry for prometheus metrics.
pub fn new(id: Option<CollatorId>, registry: Option<&prometheus::Registry>) -> Self { pub fn new(protocol_side: ProtocolSide) -> Self {
use metrics::Metrics;
let protocol_side = match id {
Some(id) => ProtocolSide::Collator(id, collator_side::Metrics::register(registry)),
None => ProtocolSide::Validator(validator_side::Metrics::register(registry)),
};
Self { Self {
protocol_side, protocol_side,
} }
@@ -127,7 +124,7 @@ where
fn start(self, ctx: Context) -> SpawnedSubsystem { fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem { SpawnedSubsystem {
name: "collator-protocol-subsystem", name: "collator-protocol-subsystem",
future: Box::pin(async move { self.run(ctx) }.map(|_| ())), future: self.run(ctx).map(|_| ()).boxed(),
} }
} }
} }
@@ -52,7 +52,7 @@ const COST_REPORT_BAD: Rep = Rep::new(-50, "A collator was reported by another s
const BENEFIT_NOTIFY_GOOD: Rep = Rep::new(50, "A collator was noted good by another subsystem"); const BENEFIT_NOTIFY_GOOD: Rep = Rep::new(50, "A collator was noted good by another subsystem");
#[derive(Clone, Default)] #[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>); pub struct Metrics(Option<MetricsInner>);
impl Metrics { impl Metrics {
fn on_request(&self, succeeded: std::result::Result<(), ()>) { fn on_request(&self, succeeded: std::result::Result<(), ()>) {
@@ -81,7 +81,7 @@ impl metrics::Metrics for Metrics {
"parachain_collation_requests_total", "parachain_collation_requests_total",
"Number of collations requested from Collators.", "Number of collations requested from Collators.",
), ),
&["succeeded", "failed"], &["success"],
)?, )?,
registry, registry,
)? )?
@@ -86,6 +86,15 @@ impl<C> Subsystem<C> for StatementDistribution
} }
} }
impl StatementDistribution {
/// Create a new Statement Distribution Subsystem
pub fn new(metrics: Metrics) -> StatementDistribution {
StatementDistribution {
metrics,
}
}
}
/// Tracks our impression of a single peer's view of the candidates a validator has seconded /// Tracks our impression of a single peer's view of the candidates a validator has seconded
/// for a given relay-parent. /// for a given relay-parent.
/// ///
+1 -1
View File
@@ -1177,7 +1177,7 @@ where
let active_leaves = HashMap::new(); let active_leaves = HashMap::new();
let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry); let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry)?;
let activation_external_listeners = HashMap::new(); let activation_external_listeners = HashMap::new();
let this = Self { let this = Self {
+41 -5
View File
@@ -15,7 +15,6 @@ sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "mas
sc-client-db = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-db = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" }
service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
@@ -32,6 +31,7 @@ sp-block-builder = { git = "https://github.com/paritytech/substrate", branch = "
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-offchain = { package = "sp-offchain", git = "https://github.com/paritytech/substrate", branch = "master" } sp-offchain = { package = "sp-offchain", git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-session = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-session = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -61,16 +61,36 @@ serde = { version = "1.0.102", features = ["derive"] }
slog = "2.5.2" slog = "2.5.2"
# Polkadot # Polkadot
kusama-runtime = { path = "../../runtime/kusama" }
polkadot-node-core-proposer = { path = "../core/proposer" } polkadot-node-core-proposer = { path = "../core/proposer" }
polkadot-overseer = { path = "../overseer" } polkadot-overseer = { path = "../overseer" }
polkadot-parachain = { path = "../../parachain" } polkadot-parachain = { path = "../../parachain" }
polkadot-primitives = { path = "../../primitives" } polkadot-primitives = { path = "../../primitives" }
polkadot-rpc = { path = "../../rpc" } polkadot-rpc = { path = "../../rpc" }
polkadot-runtime = { path = "../../runtime/polkadot" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
rococo-runtime = { package = "rococo-v1-runtime", path = "../../runtime/rococo-v1" } polkadot-node-subsystem-util = { path = "../subsystem-util" }
# Polkadot Runtimes
polkadot-runtime = { path = "../../runtime/polkadot" }
kusama-runtime = { path = "../../runtime/kusama" }
westend-runtime = { path = "../../runtime/westend" } westend-runtime = { path = "../../runtime/westend" }
rococo-runtime = { package = "rococo-v1-runtime", path = "../../runtime/rococo-v1" }
# Polkadot Subsystems
polkadot-availability-bitfield-distribution = { path = "../network/bitfield-distribution", optional = true }
polkadot-availability-distribution = { path = "../network/availability-distribution", optional = true }
polkadot-collator-protocol = { path = "../network/collator-protocol", optional = true }
polkadot-network-bridge = { path = "../network/bridge", optional = true }
polkadot-node-collation-generation = { path = "../collation-generation", optional = true }
polkadot-node-core-av-store = { path = "../core/av-store", optional = true }
polkadot-node-core-backing = { path = "../core/backing", optional = true }
polkadot-node-core-bitfield-signing = { path = "../core/bitfield-signing", optional = true }
polkadot-node-core-candidate-selection = { path = "../core/candidate-selection", optional = true }
polkadot-node-core-candidate-validation = { path = "../core/candidate-validation", optional = true }
polkadot-node-core-chain-api = { path = "../core/chain-api", optional = true }
polkadot-node-core-provisioner = { path = "../core/provisioner", optional = true }
polkadot-node-core-runtime-api = { path = "../core/runtime-api", optional = true }
polkadot-pov-distribution = { path = "../network/pov-distribution", optional = true }
polkadot-statement-distribution = { path = "../network/statement-distribution", optional = true }
[dev-dependencies] [dev-dependencies]
polkadot-test-client = { path = "../test/client" } polkadot-test-client = { path = "../test/client" }
@@ -80,4 +100,20 @@ env_logger = "0.8.1"
default = ["db", "full-node"] default = ["db", "full-node"]
db = ["service/db"] db = ["service/db"]
runtime-benchmarks = ["polkadot-runtime/runtime-benchmarks", "kusama-runtime/runtime-benchmarks", "westend-runtime/runtime-benchmarks"] runtime-benchmarks = ["polkadot-runtime/runtime-benchmarks", "kusama-runtime/runtime-benchmarks", "westend-runtime/runtime-benchmarks"]
full-node = [] full-node = [
"polkadot-availability-bitfield-distribution",
"polkadot-availability-distribution",
"polkadot-collator-protocol",
"polkadot-network-bridge",
"polkadot-node-collation-generation",
"polkadot-node-core-av-store",
"polkadot-node-core-backing",
"polkadot-node-core-bitfield-signing",
"polkadot-node-core-candidate-selection",
"polkadot-node-core-candidate-validation",
"polkadot-node-core-chain-api",
"polkadot-node-core-provisioner",
"polkadot-node-core-runtime-api",
"polkadot-pov-distribution",
"polkadot-statement-distribution",
]
+2 -1
View File
@@ -134,13 +134,14 @@ fn westend_session_keys(
fn rococo_session_keys( fn rococo_session_keys(
babe: BabeId, babe: BabeId,
_grandpa: GrandpaId, grandpa: GrandpaId,
im_online: ImOnlineId, im_online: ImOnlineId,
parachain_validator: ValidatorId, parachain_validator: ValidatorId,
authority_discovery: AuthorityDiscoveryId authority_discovery: AuthorityDiscoveryId
) -> rococo_runtime::SessionKeys { ) -> rococo_runtime::SessionKeys {
rococo_runtime::SessionKeys { rococo_runtime::SessionKeys {
babe, babe,
grandpa,
im_online, im_online,
parachain_validator, parachain_validator,
authority_discovery, authority_discovery,
@@ -16,6 +16,7 @@
//! Polkadot-specific GRANDPA integration utilities. //! Polkadot-specific GRANDPA integration utilities.
#[cfg(feature = "full-node")]
use polkadot_primitives::v1::Hash; use polkadot_primitives::v1::Hash;
use sp_runtime::traits::{Block as BlockT, NumberFor}; use sp_runtime::traits::{Block as BlockT, NumberFor};
@@ -94,6 +95,7 @@ where
/// intermediary pending changes are replaced with a static list comprised of /// intermediary pending changes are replaced with a static list comprised of
/// w3f validators and randomly selected validators from the latest session (at /// w3f validators and randomly selected validators from the latest session (at
/// #1500988). /// #1500988).
#[cfg(feature = "full-node")]
pub(crate) fn kusama_hard_forks() -> Vec<( pub(crate) fn kusama_hard_forks() -> Vec<(
grandpa_primitives::SetId, grandpa_primitives::SetId,
(Hash, polkadot_primitives::v1::BlockNumber), (Hash, polkadot_primitives::v1::BlockNumber),
+205 -73
View File
@@ -24,27 +24,34 @@ mod client;
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
#[cfg(feature = "full-node")] #[cfg(feature = "full-node")]
use log::info; use {
use polkadot_node_core_proposer::ProposerFactory; std::convert::TryInto,
use polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler}; std::time::Duration,
use polkadot_subsystem::DummySubsystem;
use prometheus_endpoint::Registry; log::info,
use sc_client_api::ExecutorProvider; polkadot_node_core_av_store::Config as AvailabilityConfig,
use sc_executor::native_executor_instance; polkadot_node_core_proposer::ProposerFactory,
use service::{error::Error as ServiceError, RpcHandlers}; polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler},
use sp_blockchain::HeaderBackend; polkadot_primitives::v1::ParachainHost,
use sp_core::traits::SpawnNamed; authority_discovery::Service as AuthorityDiscoveryService,
use sp_trie::PrefixedMemoryDB; sp_blockchain::HeaderBackend,
sp_core::traits::SpawnNamed,
sp_keystore::SyncCryptoStorePtr,
sp_trie::PrefixedMemoryDB,
sc_client_api::ExecutorProvider,
};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use prometheus_endpoint::Registry;
use sc_executor::native_executor_instance;
use service::RpcHandlers;
pub use self::client::{AbstractClient, Client, ClientHandle, ExecuteWithClient, RuntimeApiCollection}; pub use self::client::{AbstractClient, Client, ClientHandle, ExecuteWithClient, RuntimeApiCollection};
pub use chain_spec::{PolkadotChainSpec, KusamaChainSpec, WestendChainSpec, RococoChainSpec}; pub use chain_spec::{PolkadotChainSpec, KusamaChainSpec, WestendChainSpec, RococoChainSpec};
#[cfg(feature = "full-node")]
pub use codec::Codec;
pub use consensus_common::{Proposal, SelectChain, BlockImport, RecordProof, block_validation::Chain}; pub use consensus_common::{Proposal, SelectChain, BlockImport, RecordProof, block_validation::Chain};
pub use polkadot_parachain::wasm_executor::run_worker as run_validation_worker; pub use polkadot_parachain::wasm_executor::run_worker as run_validation_worker;
pub use polkadot_primitives::v1::{Block, BlockId, CollatorId, Id as ParaId}; pub use polkadot_primitives::v1::{Block, BlockId, CollatorId, Hash, Id as ParaId};
pub use sc_client_api::{Backend, ExecutionStrategy, CallExecutor}; pub use sc_client_api::{Backend, ExecutionStrategy, CallExecutor};
pub use sc_consensus::LongestChain; pub use sc_consensus::LongestChain;
pub use sc_executor::NativeExecutionDispatch; pub use sc_executor::NativeExecutionDispatch;
@@ -115,7 +122,7 @@ impl IdentifyVariant for Box<dyn ChainSpec> {
} }
// If we're using prometheus, use a registry with a prefix of `polkadot`. // If we're using prometheus, use a registry with a prefix of `polkadot`.
fn set_prometheus_registry(config: &mut Configuration) -> Result<(), ServiceError> { fn set_prometheus_registry(config: &mut Configuration) -> Result<(), Error> {
if let Some(PrometheusConfig { registry, .. }) = config.prometheus_config.as_mut() { if let Some(PrometheusConfig { registry, .. }) = config.prometheus_config.as_mut() {
*registry = Registry::new_custom(Some("polkadot".into()), None)?; *registry = Registry::new_custom(Some("polkadot".into()), None)?;
} }
@@ -124,8 +131,10 @@ fn set_prometheus_registry(config: &mut Configuration) -> Result<(), ServiceErro
} }
pub type FullBackend = service::TFullBackend<Block>; pub type FullBackend = service::TFullBackend<Block>;
#[cfg(feature = "full-node")]
type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>; type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
pub type FullClient<RuntimeApi, Executor> = service::TFullClient<Block, RuntimeApi, Executor>; pub type FullClient<RuntimeApi, Executor> = service::TFullClient<Block, RuntimeApi, Executor>;
#[cfg(feature = "full-node")]
type FullGrandpaBlockImport<RuntimeApi, Executor> = grandpa::GrandpaBlockImport< type FullGrandpaBlockImport<RuntimeApi, Executor> = grandpa::GrandpaBlockImport<
FullBackend, Block, FullClient<RuntimeApi, Executor>, FullSelectChain FullBackend, Block, FullClient<RuntimeApi, Executor>, FullSelectChain
>; >;
@@ -276,42 +285,115 @@ fn new_partial<RuntimeApi, Executor>(config: &mut Configuration) -> Result<
}) })
} }
fn real_overseer<S: SpawnNamed>( #[cfg(feature="full-node")]
fn real_overseer<Spawner, RuntimeClient>(
leaves: impl IntoIterator<Item = BlockInfo>, leaves: impl IntoIterator<Item = BlockInfo>,
prometheus_registry: Option<&Registry>, keystore: SyncCryptoStorePtr,
s: S, runtime_client: Arc<RuntimeClient>,
) -> Result<(Overseer<S>, OverseerHandler), ServiceError> { availability_config: AvailabilityConfig,
network_service: Arc<sc_network::NetworkService<Block, Hash>>,
authority_discovery: AuthorityDiscoveryService,
registry: Option<&Registry>,
spawner: Spawner,
) -> Result<(Overseer<Spawner>, OverseerHandler), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block>,
RuntimeClient::Api: ParachainHost<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
use polkadot_node_subsystem_util::metrics::Metrics;
use polkadot_availability_distribution::AvailabilityDistributionSubsystem;
use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem;
use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem;
use polkadot_node_core_backing::CandidateBackingSubsystem;
use polkadot_node_core_candidate_selection::CandidateSelectionSubsystem;
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
use polkadot_node_core_chain_api::ChainApiSubsystem;
use polkadot_node_collation_generation::CollationGenerationSubsystem;
use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
use polkadot_network_bridge::NetworkBridge as NetworkBridgeSubsystem;
use polkadot_pov_distribution::PoVDistribution as PoVDistributionSubsystem;
use polkadot_node_core_provisioner::ProvisioningSubsystem as ProvisionerSubsystem;
use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
use polkadot_statement_distribution::StatementDistribution as StatementDistributionSubsystem;
let all_subsystems = AllSubsystems { let all_subsystems = AllSubsystems {
candidate_validation: DummySubsystem, availability_distribution: AvailabilityDistributionSubsystem::new(
candidate_backing: DummySubsystem, keystore.clone(),
candidate_selection: DummySubsystem, Metrics::register(registry)?,
statement_distribution: DummySubsystem, ),
availability_distribution: DummySubsystem, availability_store: AvailabilityStoreSubsystem::new_on_disk(
bitfield_signing: DummySubsystem, availability_config,
bitfield_distribution: DummySubsystem, Metrics::register(registry)?,
provisioner: DummySubsystem, )?,
pov_distribution: DummySubsystem, bitfield_distribution: BitfieldDistributionSubsystem::new(
runtime_api: DummySubsystem, Metrics::register(registry)?,
availability_store: DummySubsystem, ),
network_bridge: DummySubsystem, bitfield_signing: BitfieldSigningSubsystem::new(
chain_api: DummySubsystem, spawner.clone(),
collation_generation: DummySubsystem, keystore.clone(),
collator_protocol: DummySubsystem, Metrics::register(registry)?,
),
candidate_backing: CandidateBackingSubsystem::new(
spawner.clone(),
keystore.clone(),
Metrics::register(registry)?,
),
candidate_selection: CandidateSelectionSubsystem::new(
spawner.clone(),
(),
Metrics::register(registry)?,
),
candidate_validation: CandidateValidationSubsystem::new(
spawner.clone(),
Metrics::register(registry)?,
),
chain_api: ChainApiSubsystem::new(
runtime_client.clone(),
Metrics::register(registry)?,
),
collation_generation: CollationGenerationSubsystem::new(
Metrics::register(registry)?,
),
collator_protocol: CollatorProtocolSubsystem::new(
ProtocolSide::Validator(Metrics::register(registry)?),
),
network_bridge: NetworkBridgeSubsystem::new(
network_service,
authority_discovery,
),
pov_distribution: PoVDistributionSubsystem::new(
Metrics::register(registry)?,
),
provisioner: ProvisionerSubsystem::new(
spawner.clone(),
(),
Metrics::register(registry)?,
),
runtime_api: RuntimeApiSubsystem::new(
runtime_client,
Metrics::register(registry)?,
),
statement_distribution: StatementDistributionSubsystem::new(
Metrics::register(registry)?,
),
}; };
Overseer::new( Overseer::new(
leaves, leaves,
all_subsystems, all_subsystems,
prometheus_registry, registry,
s, spawner,
).map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) ).map_err(|e| Error::Other(format!("Failed to create an Overseer: {:?}", e)))
} }
#[cfg(feature = "full-node")] #[cfg(feature = "full-node")]
pub struct NewFull<C> { pub struct NewFull<C> {
pub task_manager: TaskManager, pub task_manager: TaskManager,
pub client: C, pub client: C,
pub overseer_handler: OverseerHandler, pub overseer_handler: Option<OverseerHandler>,
pub network: Arc<sc_network::NetworkService<Block, <Block as BlockT>::Hash>>, pub network: Arc<sc_network::NetworkService<Block, <Block as BlockT>::Hash>>,
pub network_status_sinks: service::NetworkStatusSinks<Block>, pub network_status_sinks: service::NetworkStatusSinks<Block>,
pub rpc_handlers: RpcHandlers, pub rpc_handlers: RpcHandlers,
@@ -334,6 +416,24 @@ impl<C> NewFull<C> {
} }
} }
/// Is this node a collator?
#[cfg(feature = "full-node")]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum IsCollator {
/// This node is a collator.
Yes,
/// This node is not a collator.
No,
}
#[cfg(feature = "full-node")]
impl IsCollator {
/// Is this a collator?
fn is_collator(&self) -> bool {
*self == Self::Yes
}
}
/// Create a new full node of arbitrary runtime and executor. /// Create a new full node of arbitrary runtime and executor.
/// ///
/// This is an advanced feature and not recommended for general use. Generally, `build_full` is /// This is an advanced feature and not recommended for general use. Generally, `build_full` is
@@ -342,6 +442,7 @@ impl<C> NewFull<C> {
pub fn new_full<RuntimeApi, Executor>( pub fn new_full<RuntimeApi, Executor>(
mut config: Configuration, mut config: Configuration,
authority_discovery_disabled: bool, authority_discovery_disabled: bool,
is_collator: IsCollator,
grandpa_pause: Option<(u32, u32)>, grandpa_pause: Option<(u32, u32)>,
) -> Result<NewFull<Arc<FullClient<RuntimeApi, Executor>>>, Error> ) -> Result<NewFull<Arc<FullClient<RuntimeApi, Executor>>>, Error>
where where
@@ -392,6 +493,8 @@ pub fn new_full<RuntimeApi, Executor>(
let telemetry_connection_sinks = service::TelemetryConnectionSinks::default(); let telemetry_connection_sinks = service::TelemetryConnectionSinks::default();
let availability_config = config.database.clone().try_into();
let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams { let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams {
config, config,
backend: backend.clone(), backend: backend.clone(),
@@ -428,7 +531,54 @@ pub fn new_full<RuntimeApi, Executor>(
}) })
.collect(); .collect();
let (overseer, overseer_handler) = real_overseer(leaves, prometheus_registry.as_ref(), spawner)?; let authority_discovery_service = if role.is_authority() || is_collator.is_collator() {
use sc_network::Event;
use futures::StreamExt;
if authority_discovery_disabled {
Err("Authority discovery is mandatory for a validator.")?;
}
let authority_discovery_role = if role.is_authority() {
authority_discovery::Role::PublishAndDiscover(
keystore_container.keystore(),
)
} else {
// don't publish our addresses when we're only a collator
authority_discovery::Role::Discover
};
let dht_event_stream = network.event_stream("authority-discovery")
.filter_map(|e| async move { match e {
Event::Dht(e) => Some(e),
_ => None,
}});
let (worker, service) = authority_discovery::new_worker_and_service(
client.clone(),
network.clone(),
Box::pin(dht_event_stream),
authority_discovery_role,
prometheus_registry.clone(),
);
task_manager.spawn_handle().spawn("authority-discovery-worker", worker.run());
Some(service)
} else {
None
};
// we'd say let overseer_handler = authority_discovery_service.map(|authority_discovery_service|, ...),
// but in that case we couldn't use ? to propagate errors
let overseer_handler = if let Some(authority_discovery_service) = authority_discovery_service {
let (overseer, overseer_handler) = real_overseer(
leaves,
keystore_container.sync_keystore(),
overseer_client.clone(),
availability_config?,
network.clone(),
authority_discovery_service,
prometheus_registry.as_ref(),
spawner,
)?;
let overseer_handler_clone = overseer_handler.clone(); let overseer_handler_clone = overseer_handler.clone();
task_manager.spawn_essential_handle().spawn_blocking("overseer", Box::pin(async move { task_manager.spawn_essential_handle().spawn_blocking("overseer", Box::pin(async move {
@@ -442,15 +592,16 @@ pub fn new_full<RuntimeApi, Executor>(
pin_mut!(overseer_fut); pin_mut!(overseer_fut);
pin_mut!(forward); pin_mut!(forward);
loop {
select! { select! {
_ = forward => break, _ = forward => (),
_ = overseer_fut => break, _ = overseer_fut => (),
complete => break, complete => (),
}
} }
})); }));
Some(overseer_handler)
} else { None };
if role.is_authority() { if role.is_authority() {
let can_author_with = let can_author_with =
consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone()); consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone());
@@ -459,7 +610,7 @@ pub fn new_full<RuntimeApi, Executor>(
task_manager.spawn_handle(), task_manager.spawn_handle(),
client.clone(), client.clone(),
transaction_pool, transaction_pool,
overseer_handler.clone(), overseer_handler.as_ref().ok_or("authorities require real overseer handlers")?.clone(),
prometheus_registry.as_ref(), prometheus_registry.as_ref(),
); );
@@ -542,30 +693,6 @@ pub fn new_full<RuntimeApi, Executor>(
grandpa::setup_disabled_grandpa(network.clone())?; grandpa::setup_disabled_grandpa(network.clone())?;
} }
if role.is_authority() && !authority_discovery_disabled {
use sc_network::Event;
use futures::StreamExt;
let authority_discovery_role = authority_discovery::Role::PublishAndDiscover(
keystore_container.keystore(),
);
let dht_event_stream = network.event_stream("authority-discovery")
.filter_map(|e| async move { match e {
Event::Dht(e) => Some(e),
_ => None,
}});
let (authority_discovery_worker, _service) = authority_discovery::new_worker_and_service(
client.clone(),
network.clone(),
Box::pin(dht_event_stream),
authority_discovery_role,
prometheus_registry.clone(),
);
task_manager.spawn_handle().spawn("authority-discovery-worker", authority_discovery_worker.run());
}
network_starter.start_network(); network_starter.start_network();
Ok(NewFull { Ok(NewFull {
@@ -701,7 +828,7 @@ pub fn new_chain_ops(mut config: &mut Configuration) -> Result<
consensus_common::import_queue::BasicQueue<Block, PrefixedMemoryDB<BlakeTwo256>>, consensus_common::import_queue::BasicQueue<Block, PrefixedMemoryDB<BlakeTwo256>>,
TaskManager, TaskManager,
), ),
ServiceError Error
> >
{ {
config.keystore = service::config::KeystoreConfig::InMemory; config.keystore = service::config::KeystoreConfig::InMemory;
@@ -725,7 +852,7 @@ pub fn new_chain_ops(mut config: &mut Configuration) -> Result<
} }
/// Build a new light node. /// Build a new light node.
pub fn build_light(config: Configuration) -> Result<(TaskManager, RpcHandlers), ServiceError> { pub fn build_light(config: Configuration) -> Result<(TaskManager, RpcHandlers), Error> {
if config.chain_spec.is_rococo() { if config.chain_spec.is_rococo() {
new_light::<rococo_runtime::RuntimeApi, RococoExecutor>(config) new_light::<rococo_runtime::RuntimeApi, RococoExecutor>(config)
} else if config.chain_spec.is_kusama() { } else if config.chain_spec.is_kusama() {
@@ -741,30 +868,35 @@ pub fn build_light(config: Configuration) -> Result<(TaskManager, RpcHandlers),
pub fn build_full( pub fn build_full(
config: Configuration, config: Configuration,
authority_discovery_disabled: bool, authority_discovery_disabled: bool,
is_collator: IsCollator,
grandpa_pause: Option<(u32, u32)>, grandpa_pause: Option<(u32, u32)>,
) -> Result<NewFull<Client>, ServiceError> { ) -> Result<NewFull<Client>, Error> {
if config.chain_spec.is_rococo() { if config.chain_spec.is_rococo() {
new_full::<rococo_runtime::RuntimeApi, RococoExecutor>( new_full::<rococo_runtime::RuntimeApi, RococoExecutor>(
config, config,
authority_discovery_disabled, authority_discovery_disabled,
is_collator,
grandpa_pause, grandpa_pause,
).map(|full| full.with_client(Client::Rococo)) ).map(|full| full.with_client(Client::Rococo))
} else if config.chain_spec.is_kusama() { } else if config.chain_spec.is_kusama() {
new_full::<kusama_runtime::RuntimeApi, KusamaExecutor>( new_full::<kusama_runtime::RuntimeApi, KusamaExecutor>(
config, config,
authority_discovery_disabled, authority_discovery_disabled,
is_collator,
grandpa_pause, grandpa_pause,
).map(|full| full.with_client(Client::Kusama)) ).map(|full| full.with_client(Client::Kusama))
} else if config.chain_spec.is_westend() { } else if config.chain_spec.is_westend() {
new_full::<westend_runtime::RuntimeApi, WestendExecutor>( new_full::<westend_runtime::RuntimeApi, WestendExecutor>(
config, config,
authority_discovery_disabled, authority_discovery_disabled,
is_collator,
grandpa_pause, grandpa_pause,
).map(|full| full.with_client(Client::Westend)) ).map(|full| full.with_client(Client::Westend))
} else { } else {
new_full::<polkadot_runtime::RuntimeApi, PolkadotExecutor>( new_full::<polkadot_runtime::RuntimeApi, PolkadotExecutor>(
config, config,
authority_discovery_disabled, authority_discovery_disabled,
is_collator,
grandpa_pause, grandpa_pause,
).map(|full| full.with_client(Client::Polkadot)) ).map(|full| full.with_client(Client::Polkadot))
} }
+8 -11
View File
@@ -470,18 +470,15 @@ pub mod metrics {
/// Try to register metrics in the Prometheus registry. /// Try to register metrics in the Prometheus registry.
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError>; fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError>;
/// Convience method to register metrics in the optional Prometheus registry. /// Convenience method to register metrics in the optional Promethius registry.
/// If the registration fails, prints a warning and returns `Default::default()`. ///
fn register(registry: Option<&prometheus::Registry>) -> Self { /// If no registry is provided, returns `Default::default()`. Otherwise, returns the same
registry.map(|r| { /// thing that `try_register` does.
match Self::try_register(r) { fn register(registry: Option<&prometheus::Registry>) -> Result<Self, prometheus::PrometheusError> {
Err(e) => { match registry {
log::warn!("Failed to register metrics: {:?}", e); None => Ok(Self::default()),
Default::default() Some(registry) => Self::try_register(registry),
},
Ok(metrics) => metrics,
} }
}).unwrap_or_default()
} }
} }
+1
View File
@@ -21,6 +21,7 @@ polkadot-statement-table = { path = "../../statement-table" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
smallvec = "1.4.1" smallvec = "1.4.1"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
thiserror = "1.0.21" thiserror = "1.0.21"
[dev-dependencies] [dev-dependencies]
+4
View File
@@ -132,6 +132,10 @@ pub enum SubsystemError {
#[error(transparent)] #[error(transparent)]
Infallible(#[from] std::convert::Infallible), Infallible(#[from] std::convert::Infallible),
/// Prometheus had a problem
#[error(transparent)]
Prometheus(#[from] substrate_prometheus_endpoint::PrometheusError),
/// An other error lacking particular type information. /// An other error lacking particular type information.
#[error("Failed to {0}")] #[error("Failed to {0}")]
Context(String), Context(String),
+4 -2
View File
@@ -26,7 +26,7 @@ use polkadot_overseer::OverseerHandler;
use polkadot_primitives::v1::{Id as ParaId, HeadData, ValidationCode, Balance}; use polkadot_primitives::v1::{Id as ParaId, HeadData, ValidationCode, Balance};
use polkadot_runtime_common::BlockHashCount; use polkadot_runtime_common::BlockHashCount;
use polkadot_service::{ use polkadot_service::{
new_full, NewFull, FullClient, ClientHandle, ExecuteWithClient, new_full, NewFull, FullClient, ClientHandle, ExecuteWithClient, IsCollator,
}; };
use polkadot_test_runtime::{Runtime, SignedExtra, SignedPayload, VERSION, ParasSudoWrapperCall, UncheckedExtrinsic}; use polkadot_test_runtime::{Runtime, SignedExtra, SignedPayload, VERSION, ParasSudoWrapperCall, UncheckedExtrinsic};
use polkadot_runtime_parachains::paras::ParaGenesisArgs; use polkadot_runtime_parachains::paras::ParaGenesisArgs;
@@ -76,6 +76,7 @@ pub fn polkadot_test_new_full(
new_full::<polkadot_test_runtime::RuntimeApi, PolkadotTestExecutor>( new_full::<polkadot_test_runtime::RuntimeApi, PolkadotTestExecutor>(
config, config,
authority_discovery_disabled, authority_discovery_disabled,
IsCollator::No,
None, None,
).map_err(Into::into) ).map_err(Into::into)
} }
@@ -204,11 +205,12 @@ pub fn run_test_node(
) -> PolkadotTestNode { ) -> PolkadotTestNode {
let config = node_config(storage_update_func, task_executor, key, boot_nodes); let config = node_config(storage_update_func, task_executor, key, boot_nodes);
let multiaddr = config.network.listen_addresses[0].clone(); let multiaddr = config.network.listen_addresses[0].clone();
let authority_discovery_disabled = true; let authority_discovery_disabled = false;
let NewFull {task_manager, client, network, rpc_handlers, overseer_handler, ..} = let NewFull {task_manager, client, network, rpc_handlers, overseer_handler, ..} =
polkadot_test_new_full(config, authority_discovery_disabled) polkadot_test_new_full(config, authority_discovery_disabled)
.expect("could not create Polkadot test service"); .expect("could not create Polkadot test service");
let overseer_handler = overseer_handler.expect("test node must have an overseer handler");
let peer_id = network.local_peer_id().clone(); let peer_id = network.local_peer_id().clone();
let addr = MultiaddrWithPeerId { multiaddr, peer_id }; let addr = MultiaddrWithPeerId { multiaddr, peer_id };
+2 -1
View File
@@ -347,6 +347,7 @@ pub type SignedPayload = generic::SignedPayload<Call, SignedExtra>;
impl_opaque_keys! { impl_opaque_keys! {
pub struct SessionKeys { pub struct SessionKeys {
pub grandpa: Grandpa,
pub babe: Babe, pub babe: Babe,
pub im_online: ImOnline, pub im_online: ImOnline,
pub parachain_validator: Initializer, pub parachain_validator: Initializer,
@@ -384,7 +385,7 @@ construct_runtime! {
ParachainOrigin: parachains_origin::{Module, Origin}, ParachainOrigin: parachains_origin::{Module, Origin},
Config: parachains_configuration::{Module, Call, Storage}, Config: parachains_configuration::{Module, Call, Storage},
Inclusion: parachains_inclusion::{Module, Call, Storage, Event<T>}, Inclusion: parachains_inclusion::{Module, Call, Storage, Event<T>},
InclusionInherent: parachains_inclusion_inherent::{Module, Call, Storage}, InclusionInherent: parachains_inclusion_inherent::{Module, Call, Storage, Inherent},
Scheduler: parachains_scheduler::{Module, Call, Storage}, Scheduler: parachains_scheduler::{Module, Call, Storage},
Paras: parachains_paras::{Module, Call, Storage}, Paras: parachains_paras::{Module, Call, Storage},
Initializer: parachains_initializer::{Module, Call, Storage}, Initializer: parachains_initializer::{Module, Call, Storage},
+5 -1
View File
@@ -6,6 +6,10 @@
set -e set -e
# chainspec defaults to polkadot-local if no arguments are passed to this script;
# if arguments are passed in, the first is the chainspec
chainspec="${1:-polkadot-local}"
PROJECT_ROOT=$(git rev-parse --show-toplevel) PROJECT_ROOT=$(git rev-parse --show-toplevel)
# shellcheck disable=SC1090 # shellcheck disable=SC1090
source "$(dirname "$0")"/common.sh source "$(dirname "$0")"/common.sh
@@ -70,7 +74,7 @@ function run_node() {
# start the node # start the node
"$polkadot" \ "$polkadot" \
--chain polkadot-local \ --chain "$chainspec" \
--tmp \ --tmp \
--port "$port" \ --port "$port" \
--rpc-port "$rpc_port" \ --rpc-port "$rpc_port" \
+4 -1
View File
@@ -35,8 +35,11 @@ fn purge_chain_works() {
.unwrap(); .unwrap();
// Let it produce some blocks. // Let it produce some blocks.
thread::sleep(Duration::from_secs(30)); // poll once per second for faster failure
for _ in 0..30 {
thread::sleep(Duration::from_secs(1));
assert!(cmd.try_wait().unwrap().is_none(), "the process should still be running"); assert!(cmd.try_wait().unwrap().is_none(), "the process should still be running");
}
// Stop the process // Stop the process
kill(Pid::from_raw(cmd.id().try_into().unwrap()), SIGINT).unwrap(); kill(Pid::from_raw(cmd.id().try_into().unwrap()), SIGINT).unwrap();