mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 21:41:12 +00:00
This reverts commit 5f637c510e.
This commit is contained in:
committed by
GitHub
parent
d13d0d4f07
commit
d711673ee2
Generated
-1
@@ -6842,7 +6842,6 @@ dependencies = [
|
||||
"kvdb",
|
||||
"kvdb-rocksdb",
|
||||
"log",
|
||||
"lru",
|
||||
"pallet-babe",
|
||||
"pallet-im-online",
|
||||
"pallet-mmr-primitives",
|
||||
|
||||
@@ -185,7 +185,6 @@ struct BehaveMaleficient;
|
||||
impl OverseerGen for BehaveMaleficient {
|
||||
fn generate<'a, Spawner, RuntimeClient>(
|
||||
&self,
|
||||
connector: OverseerConnector,
|
||||
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
|
||||
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error>
|
||||
where
|
||||
@@ -214,7 +213,7 @@ impl OverseerGen for BehaveMaleficient {
|
||||
),
|
||||
);
|
||||
|
||||
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
|
||||
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
|
||||
.map_err(|e| e.into())
|
||||
|
||||
// A builder pattern will simplify this further
|
||||
|
||||
@@ -37,7 +37,7 @@ use polkadot_cli::{
|
||||
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{AllMessages, CandidateValidationMessage},
|
||||
overseer::{self, OverseerConnector, OverseerHandle},
|
||||
overseer::{self, OverseerHandle},
|
||||
FromOverseer,
|
||||
};
|
||||
|
||||
@@ -86,7 +86,6 @@ struct BehaveMaleficient;
|
||||
impl OverseerGen for BehaveMaleficient {
|
||||
fn generate<'a, Spawner, RuntimeClient>(
|
||||
&self,
|
||||
connector: OverseerConnector,
|
||||
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
|
||||
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
|
||||
where
|
||||
@@ -114,7 +113,7 @@ impl OverseerGen for BehaveMaleficient {
|
||||
},
|
||||
);
|
||||
|
||||
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
|
||||
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,8 +29,7 @@ use polkadot_node_subsystem_types::messages::{
|
||||
use polkadot_overseer::{
|
||||
self as overseer,
|
||||
gen::{FromOverseer, SpawnedSubsystem},
|
||||
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerConnector,
|
||||
OverseerSignal, SubsystemError,
|
||||
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerSignal, SubsystemError,
|
||||
};
|
||||
use polkadot_primitives::v1::Hash;
|
||||
|
||||
@@ -174,15 +173,8 @@ fn main() {
|
||||
.replace_candidate_validation(|_| Subsystem2)
|
||||
.replace_candidate_backing(|orig| orig);
|
||||
|
||||
let (overseer, _handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
AlwaysSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let (overseer, _handle) =
|
||||
Overseer::new(vec![], all_subsystems, None, AlwaysSupportsParachains, spawner).unwrap();
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
let timer_stream = timer_stream;
|
||||
|
||||
|
||||
@@ -130,7 +130,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
|
||||
&mut self.handle
|
||||
}
|
||||
/// Obtain access to the overseer handle.
|
||||
pub fn as_handle(&self) -> &#handle {
|
||||
pub fn as_handle(&mut self) -> &#handle {
|
||||
&self.handle
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::{AllMessages, OverseerSignal};
|
||||
use polkadot_node_subsystem_types::errors::SubsystemError;
|
||||
use polkadot_overseer_gen::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext};
|
||||
|
||||
/// A dummy subsystem that implements [`Subsystem`] for all
|
||||
/// types of messages. Used for tests or as a placeholder.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct DummySubsystem;
|
||||
|
||||
impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
|
||||
where
|
||||
Context: SubsystemContext<
|
||||
Signal = OverseerSignal,
|
||||
Error = SubsystemError,
|
||||
AllMessages = AllMessages,
|
||||
>,
|
||||
{
|
||||
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
|
||||
let future = Box::pin(async move {
|
||||
loop {
|
||||
match ctx.recv().await {
|
||||
Err(_) => return Ok(()),
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
|
||||
Ok(overseer_msg) => {
|
||||
tracing::debug!(
|
||||
target: "dummy-subsystem",
|
||||
"Discarding a message sent from overseer {:?}",
|
||||
overseer_msg
|
||||
);
|
||||
continue
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
SpawnedSubsystem { name: "dummy-subsystem", future }
|
||||
}
|
||||
}
|
||||
@@ -70,6 +70,7 @@ use std::{
|
||||
|
||||
use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
|
||||
use lru::LruCache;
|
||||
use parking_lot::RwLock;
|
||||
|
||||
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
|
||||
use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost};
|
||||
@@ -90,17 +91,12 @@ pub use polkadot_node_subsystem_types::{
|
||||
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
|
||||
};
|
||||
|
||||
/// Test helper supplements.
|
||||
pub mod dummy;
|
||||
pub use self::dummy::DummySubsystem;
|
||||
|
||||
// TODO legacy, to be deleted, left for easier integration
|
||||
// TODO https://github.com/paritytech/polkadot/issues/3427
|
||||
mod subsystems;
|
||||
pub use self::subsystems::AllSubsystems;
|
||||
pub use self::subsystems::{AllSubsystems, DummySubsystem};
|
||||
|
||||
/// Metrics re-exports of `polkadot-metrics`.
|
||||
pub mod metrics;
|
||||
mod metrics;
|
||||
use self::metrics::Metrics;
|
||||
|
||||
use polkadot_node_metrics::{
|
||||
@@ -119,7 +115,7 @@ pub use polkadot_overseer_gen::{
|
||||
|
||||
/// Store 2 days worth of blocks, not accounting for forks,
|
||||
/// in the LRU cache. Assumes a 6-second block time.
|
||||
pub const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
|
||||
const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@@ -145,12 +141,18 @@ where
|
||||
///
|
||||
/// [`Overseer`]: struct.Overseer.html
|
||||
#[derive(Clone)]
|
||||
pub struct Handle(pub OverseerHandle);
|
||||
pub enum Handle {
|
||||
/// Used only at initialization to break the cyclic dependency.
|
||||
// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
|
||||
Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
|
||||
/// A handle to the overseer.
|
||||
Connected(OverseerHandle),
|
||||
}
|
||||
|
||||
impl Handle {
|
||||
/// Create a new [`Handle`].
|
||||
pub fn new(raw: OverseerHandle) -> Self {
|
||||
Self(raw)
|
||||
/// Create a new disconnected [`Handle`].
|
||||
pub fn new_disconnected() -> Self {
|
||||
Self::Disconnected(Arc::new(RwLock::new(None)))
|
||||
}
|
||||
|
||||
/// Inform the `Overseer` that that some block was imported.
|
||||
@@ -199,8 +201,58 @@ impl Handle {
|
||||
|
||||
/// Most basic operation, to stop a server.
|
||||
async fn send_and_log_error(&mut self, event: Event) {
|
||||
if self.0.send(event).await.is_err() {
|
||||
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
|
||||
self.try_connect();
|
||||
if let Self::Connected(ref mut handle) = self {
|
||||
if handle.send(event).await.is_err() {
|
||||
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
|
||||
}
|
||||
} else {
|
||||
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether the handle is disconnected.
|
||||
pub fn is_disconnected(&self) -> bool {
|
||||
match self {
|
||||
Self::Disconnected(ref x) => x.read().is_none(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Connect this handle and all disconnected clones of it to the overseer.
|
||||
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
|
||||
match self {
|
||||
Self::Disconnected(ref mut x) => {
|
||||
let mut maybe_handle = x.write();
|
||||
if maybe_handle.is_none() {
|
||||
tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
|
||||
*maybe_handle = Some(handle);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Attempting to connect a clone of a connected Handle",
|
||||
);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Attempting to connect an already connected Handle",
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
|
||||
/// after calling `connect_to_overseer` on `self` or a clone of `self`.
|
||||
fn try_connect(&mut self) {
|
||||
if let Self::Disconnected(ref mut x) = self {
|
||||
let guard = x.write();
|
||||
if let Some(ref h) = *guard {
|
||||
let handle = h.clone();
|
||||
drop(guard);
|
||||
*self = Self::Connected(handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -438,13 +490,12 @@ where
|
||||
/// # use polkadot_primitives::v1::Hash;
|
||||
/// # use polkadot_overseer::{
|
||||
/// # self as overseer,
|
||||
/// # Overseer,
|
||||
/// # OverseerSignal,
|
||||
/// # OverseerConnector,
|
||||
/// # SubsystemSender as _,
|
||||
/// # AllMessages,
|
||||
/// # AllSubsystems,
|
||||
/// # HeadSupportsParachains,
|
||||
/// # Overseer,
|
||||
/// # SubsystemError,
|
||||
/// # gen::{
|
||||
/// # SubsystemContext,
|
||||
@@ -498,7 +549,6 @@ where
|
||||
/// None,
|
||||
/// AlwaysSupportsParachains,
|
||||
/// spawner,
|
||||
/// OverseerConnector::default(),
|
||||
/// ).unwrap();
|
||||
///
|
||||
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
|
||||
@@ -565,7 +615,6 @@ where
|
||||
prometheus_registry: Option<&prometheus::Registry>,
|
||||
supports_parachains: SupportsParachains,
|
||||
s: S,
|
||||
connector: OverseerConnector,
|
||||
) -> SubsystemResult<(Self, OverseerHandle)>
|
||||
where
|
||||
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
|
||||
@@ -630,7 +679,7 @@ where
|
||||
.supports_parachains(supports_parachains)
|
||||
.metrics(metrics.clone())
|
||||
.spawner(s)
|
||||
.build_with_connector(connector)?;
|
||||
.build()?;
|
||||
|
||||
// spawn the metrics metronome task
|
||||
{
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
//! Prometheus metrics related to the overseer and its channels.
|
||||
|
||||
use super::*;
|
||||
pub use polkadot_node_metrics::metrics::{self, prometheus, Metrics as MetricsTrait};
|
||||
use polkadot_node_metrics::metrics::{self, prometheus};
|
||||
|
||||
use parity_util_mem::MemoryAllocationSnapshot;
|
||||
|
||||
@@ -110,7 +110,7 @@ impl Metrics {
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricsTrait for Metrics {
|
||||
impl metrics::Metrics for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
|
||||
let metrics = MetricsInner {
|
||||
activated_heads_total: prometheus::register(
|
||||
|
||||
@@ -19,9 +19,47 @@
|
||||
//! In the future, everything should be set up using the generated
|
||||
//! overseer builder pattern instead.
|
||||
|
||||
use crate::dummy::DummySubsystem;
|
||||
use crate::{AllMessages, OverseerSignal};
|
||||
use polkadot_node_subsystem_types::errors::SubsystemError;
|
||||
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
|
||||
use polkadot_overseer_gen::MapSubsystem;
|
||||
use polkadot_overseer_gen::{
|
||||
FromOverseer, MapSubsystem, SpawnedSubsystem, Subsystem, SubsystemContext,
|
||||
};
|
||||
|
||||
/// A dummy subsystem that implements [`Subsystem`] for all
|
||||
/// types of messages. Used for tests or as a placeholder.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct DummySubsystem;
|
||||
|
||||
impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
|
||||
where
|
||||
Context: SubsystemContext<
|
||||
Signal = OverseerSignal,
|
||||
Error = SubsystemError,
|
||||
AllMessages = AllMessages,
|
||||
>,
|
||||
{
|
||||
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
|
||||
let future = Box::pin(async move {
|
||||
loop {
|
||||
match ctx.recv().await {
|
||||
Err(_) => return Ok(()),
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
|
||||
Ok(overseer_msg) => {
|
||||
tracing::debug!(
|
||||
target: "dummy-subsystem",
|
||||
"Discarding a message sent from overseer {:?}",
|
||||
overseer_msg
|
||||
);
|
||||
continue
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
SpawnedSubsystem { name: "dummy-subsystem", future }
|
||||
}
|
||||
}
|
||||
|
||||
/// This struct is passed as an argument to create a new instance of an [`Overseer`].
|
||||
///
|
||||
|
||||
@@ -32,7 +32,7 @@ use polkadot_primitives::v1::{
|
||||
ValidatorIndex,
|
||||
};
|
||||
|
||||
use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer, OverseerConnector};
|
||||
use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer};
|
||||
use metered_channel as metered;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
@@ -164,16 +164,9 @@ fn overseer_works() {
|
||||
.replace_candidate_validation(move |_| TestSubsystem1(s1_tx))
|
||||
.replace_candidate_backing(move |_| TestSubsystem2(s2_tx));
|
||||
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
let (overseer, handle) =
|
||||
Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -234,10 +227,9 @@ fn overseer_metrics_work() {
|
||||
Some(®istry),
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -288,15 +280,8 @@ fn overseer_ends_on_subsystem_exit() {
|
||||
executor::block_on(async move {
|
||||
let all_subsystems =
|
||||
AllSubsystems::<()>::dummy().replace_candidate_backing(|_| ReturnOnStart);
|
||||
let (overseer, _handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let (overseer, _handle) =
|
||||
Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap();
|
||||
|
||||
overseer.run().await.unwrap();
|
||||
})
|
||||
@@ -397,16 +382,10 @@ fn overseer_start_stop_works() {
|
||||
let all_subsystems = AllSubsystems::<()>::dummy()
|
||||
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
|
||||
.replace_candidate_backing(move |_| TestSubsystem6(tx_6));
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![first_block],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
let (overseer, handle) =
|
||||
Overseer::new(vec![first_block], all_subsystems, None, MockSupportsParachains, spawner)
|
||||
.unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -507,10 +486,9 @@ fn overseer_finalize_works() {
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
let mut handle = Handle::Connected(handle);
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -595,16 +573,10 @@ fn do_not_send_empty_leaves_update_on_block_finalization() {
|
||||
let all_subsystems =
|
||||
AllSubsystems::<()>::dummy().replace_candidate_backing(move |_| TestSubsystem6(tx_5));
|
||||
|
||||
let (overseer, handle) = Overseer::new(
|
||||
Vec::new(),
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
let (overseer, handle) =
|
||||
Overseer::new(Vec::new(), all_subsystems, None, MockSupportsParachains, spawner)
|
||||
.unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -877,17 +849,9 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
|
||||
dispute_distribution: subsystem.clone(),
|
||||
chain_selection: subsystem.clone(),
|
||||
};
|
||||
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
let (overseer, handle) =
|
||||
Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
@@ -26,7 +26,6 @@ sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste
|
||||
sc-basic-authorship = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
|
||||
telemetry = { package = "sc-telemetry", git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
lru = "0.6"
|
||||
|
||||
# Substrate Primitives
|
||||
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
+111
-169
@@ -54,7 +54,7 @@ use {
|
||||
pub use sp_core::traits::SpawnNamed;
|
||||
#[cfg(feature = "full-node")]
|
||||
pub use {
|
||||
polkadot_overseer::{Handle, Overseer, OverseerConnector, OverseerHandle},
|
||||
polkadot_overseer::{Handle, Overseer, OverseerHandle},
|
||||
polkadot_primitives::v1::ParachainHost,
|
||||
sc_client_api::AuxStore,
|
||||
sp_authority_discovery::AuthorityDiscoveryApi,
|
||||
@@ -68,8 +68,6 @@ use polkadot_subsystem::jaeger;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use prometheus_endpoint::Registry;
|
||||
#[cfg(feature = "full-node")]
|
||||
use service::KeystoreContainer;
|
||||
use service::RpcHandlers;
|
||||
use telemetry::TelemetryWorker;
|
||||
#[cfg(feature = "full-node")]
|
||||
@@ -304,15 +302,14 @@ fn jaeger_launch_collector_with_agent(
|
||||
}
|
||||
|
||||
#[cfg(feature = "full-node")]
|
||||
type FullSelectChain = relay_chain_selection::SelectRelayChain<FullBackend>;
|
||||
type FullSelectChain = relay_chain_selection::SelectRelayChainWithFallback<FullBackend>;
|
||||
#[cfg(feature = "full-node")]
|
||||
type FullGrandpaBlockImport<RuntimeApi, ExecutorDispatch, ChainSelection = FullSelectChain> =
|
||||
grandpa::GrandpaBlockImport<
|
||||
FullBackend,
|
||||
Block,
|
||||
FullClient<RuntimeApi, ExecutorDispatch>,
|
||||
ChainSelection,
|
||||
>;
|
||||
type FullGrandpaBlockImport<RuntimeApi, ExecutorDispatch> = grandpa::GrandpaBlockImport<
|
||||
FullBackend,
|
||||
Block,
|
||||
FullClient<RuntimeApi, ExecutorDispatch>,
|
||||
FullSelectChain,
|
||||
>;
|
||||
|
||||
#[cfg(feature = "light-node")]
|
||||
type LightBackend = service::TLightBackendWithHash<Block, sp_runtime::traits::BlakeTwo256>;
|
||||
@@ -322,29 +319,36 @@ type LightClient<RuntimeApi, ExecutorDispatch> =
|
||||
service::TLightClientWithBackend<Block, RuntimeApi, ExecutorDispatch, LightBackend>;
|
||||
|
||||
#[cfg(feature = "full-node")]
|
||||
struct Basics<RuntimeApi, ExecutorDispatch>
|
||||
where
|
||||
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
RuntimeApi::RuntimeApi:
|
||||
RuntimeApiCollection<StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>,
|
||||
ExecutorDispatch: NativeExecutionDispatch + 'static,
|
||||
{
|
||||
task_manager: TaskManager,
|
||||
client: Arc<FullClient<RuntimeApi, ExecutorDispatch>>,
|
||||
backend: Arc<FullBackend>,
|
||||
keystore_container: KeystoreContainer,
|
||||
telemetry: Option<Telemetry>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "full-node")]
|
||||
fn new_partial_basics<RuntimeApi, ExecutorDispatch>(
|
||||
fn new_partial<RuntimeApi, ExecutorDispatch>(
|
||||
config: &mut Configuration,
|
||||
jaeger_agent: Option<std::net::SocketAddr>,
|
||||
telemetry_worker_handle: Option<TelemetryWorkerHandle>,
|
||||
) -> Result<Basics<RuntimeApi, ExecutorDispatch>, Error>
|
||||
) -> Result<
|
||||
service::PartialComponents<
|
||||
FullClient<RuntimeApi, ExecutorDispatch>,
|
||||
FullBackend,
|
||||
FullSelectChain,
|
||||
sc_consensus::DefaultImportQueue<Block, FullClient<RuntimeApi, ExecutorDispatch>>,
|
||||
sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi, ExecutorDispatch>>,
|
||||
(
|
||||
impl service::RpcExtensionBuilder,
|
||||
(
|
||||
babe::BabeBlockImport<
|
||||
Block,
|
||||
FullClient<RuntimeApi, ExecutorDispatch>,
|
||||
FullGrandpaBlockImport<RuntimeApi, ExecutorDispatch>,
|
||||
>,
|
||||
grandpa::LinkHalf<Block, FullClient<RuntimeApi, ExecutorDispatch>, FullSelectChain>,
|
||||
babe::BabeLink<Block>,
|
||||
beefy_gadget::notification::BeefySignedCommitmentSender<Block>,
|
||||
),
|
||||
grandpa::SharedVoterState,
|
||||
std::time::Duration, // slot-duration
|
||||
Option<Telemetry>,
|
||||
),
|
||||
>,
|
||||
Error,
|
||||
>
|
||||
where
|
||||
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
|
||||
+ Send
|
||||
@@ -387,62 +391,21 @@ where
|
||||
)?;
|
||||
let client = Arc::new(client);
|
||||
|
||||
jaeger_launch_collector_with_agent(task_manager.spawn_handle(), &*config, jaeger_agent)?;
|
||||
|
||||
let telemetry: Option<_> = telemetry.map(|(worker, telemetry)| {
|
||||
let telemetry = telemetry.map(|(worker, telemetry)| {
|
||||
if let Some(worker) = worker {
|
||||
task_manager.spawn_handle().spawn("telemetry", worker.run());
|
||||
}
|
||||
telemetry
|
||||
});
|
||||
|
||||
Ok(Basics { task_manager, client, backend, keystore_container, telemetry })
|
||||
}
|
||||
jaeger_launch_collector_with_agent(task_manager.spawn_handle(), &*config, jaeger_agent)?;
|
||||
|
||||
let select_chain = relay_chain_selection::SelectRelayChainWithFallback::new(
|
||||
backend.clone(),
|
||||
Handle::new_disconnected(),
|
||||
polkadot_node_subsystem_util::metrics::Metrics::register(config.prometheus_registry())?,
|
||||
);
|
||||
|
||||
#[cfg(feature = "full-node")]
|
||||
fn new_partial<RuntimeApi, ExecutorDispatch, ChainSelection>(
|
||||
config: &mut Configuration,
|
||||
Basics { task_manager, backend, client, keystore_container, telemetry }: Basics<
|
||||
RuntimeApi,
|
||||
ExecutorDispatch,
|
||||
>,
|
||||
select_chain: ChainSelection,
|
||||
) -> Result<
|
||||
service::PartialComponents<
|
||||
FullClient<RuntimeApi, ExecutorDispatch>,
|
||||
FullBackend,
|
||||
ChainSelection,
|
||||
sc_consensus::DefaultImportQueue<Block, FullClient<RuntimeApi, ExecutorDispatch>>,
|
||||
sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi, ExecutorDispatch>>,
|
||||
(
|
||||
impl service::RpcExtensionBuilder,
|
||||
(
|
||||
babe::BabeBlockImport<
|
||||
Block,
|
||||
FullClient<RuntimeApi, ExecutorDispatch>,
|
||||
FullGrandpaBlockImport<RuntimeApi, ExecutorDispatch, ChainSelection>,
|
||||
>,
|
||||
grandpa::LinkHalf<Block, FullClient<RuntimeApi, ExecutorDispatch>, ChainSelection>,
|
||||
babe::BabeLink<Block>,
|
||||
beefy_gadget::notification::BeefySignedCommitmentSender<Block>,
|
||||
),
|
||||
grandpa::SharedVoterState,
|
||||
std::time::Duration, // slot-duration
|
||||
Option<Telemetry>,
|
||||
),
|
||||
>,
|
||||
Error,
|
||||
>
|
||||
where
|
||||
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
RuntimeApi::RuntimeApi:
|
||||
RuntimeApiCollection<StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>,
|
||||
ExecutorDispatch: NativeExecutionDispatch + 'static,
|
||||
ChainSelection: 'static + SelectChain<Block>,
|
||||
{
|
||||
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
|
||||
config.transaction_pool.clone(),
|
||||
config.role.is_authority().into(),
|
||||
@@ -711,50 +674,23 @@ where
|
||||
let disable_grandpa = config.disable_grandpa;
|
||||
let name = config.network.node_name.clone();
|
||||
|
||||
let overseer_connector = OverseerConnector::default();
|
||||
|
||||
let handle = Handle(overseer_connector.as_handle().clone());
|
||||
|
||||
let basics = new_partial_basics::<RuntimeApi, ExecutorDispatch>(
|
||||
let service::PartialComponents {
|
||||
client,
|
||||
backend,
|
||||
mut task_manager,
|
||||
keystore_container,
|
||||
mut select_chain,
|
||||
import_queue,
|
||||
transaction_pool,
|
||||
other: (rpc_extensions_builder, import_setup, rpc_setup, slot_duration, mut telemetry),
|
||||
} = new_partial::<RuntimeApi, ExecutorDispatch>(
|
||||
&mut config,
|
||||
jaeger_agent,
|
||||
telemetry_worker_handle,
|
||||
)?;
|
||||
|
||||
// we should remove this check before we deploy parachains on polkadot
|
||||
// TODO: https://github.com/paritytech/polkadot/issues/3326
|
||||
let chain_spec = &config.chain_spec as &dyn IdentifyVariant;
|
||||
|
||||
let is_relay_chain = chain_spec.is_kusama() ||
|
||||
chain_spec.is_westend() ||
|
||||
chain_spec.is_rococo() ||
|
||||
chain_spec.is_wococo();
|
||||
|
||||
let prometheus_registry = config.prometheus_registry().cloned();
|
||||
|
||||
use relay_chain_selection::SelectRelayChain;
|
||||
|
||||
let select_chain = SelectRelayChain::new(
|
||||
basics.backend.clone(),
|
||||
is_relay_chain,
|
||||
handle.clone(),
|
||||
polkadot_node_subsystem_util::metrics::Metrics::register(prometheus_registry.as_ref())?,
|
||||
);
|
||||
let service::PartialComponents::<_, _, SelectRelayChain<_>, _, _, _> {
|
||||
client,
|
||||
backend,
|
||||
mut task_manager,
|
||||
keystore_container,
|
||||
select_chain,
|
||||
import_queue,
|
||||
transaction_pool,
|
||||
other: (rpc_extensions_builder, import_setup, rpc_setup, slot_duration, mut telemetry),
|
||||
} = new_partial::<RuntimeApi, ExecutorDispatch, SelectRelayChain<_>>(
|
||||
&mut config,
|
||||
basics,
|
||||
select_chain,
|
||||
)?;
|
||||
|
||||
let shared_voter_state = rpc_setup;
|
||||
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
|
||||
|
||||
@@ -914,10 +850,8 @@ where
|
||||
local_keystore.and_then(move |k| authority_discovery_service.map(|a| (a, k)));
|
||||
|
||||
let overseer_handle = if let Some((authority_discovery_service, keystore)) = maybe_params {
|
||||
// already have access to the handle
|
||||
let (overseer, _handle) = overseer_gen
|
||||
let (overseer, overseer_handle) = overseer_gen
|
||||
.generate::<service::SpawnTaskHandle, FullClient<RuntimeApi, ExecutorDispatch>>(
|
||||
overseer_connector,
|
||||
OverseerGenArgs {
|
||||
leaves: active_leaves,
|
||||
keystore,
|
||||
@@ -941,29 +875,40 @@ where
|
||||
dispute_coordinator_config,
|
||||
},
|
||||
)?;
|
||||
let handle = Handle::Connected(overseer_handle.clone());
|
||||
let handle_clone = handle.clone();
|
||||
|
||||
{
|
||||
let handle = handle.clone();
|
||||
task_manager.spawn_essential_handle().spawn_blocking(
|
||||
"overseer",
|
||||
Box::pin(async move {
|
||||
use futures::{pin_mut, select, FutureExt};
|
||||
task_manager.spawn_essential_handle().spawn_blocking(
|
||||
"overseer",
|
||||
Box::pin(async move {
|
||||
use futures::{pin_mut, select, FutureExt};
|
||||
|
||||
let forward = polkadot_overseer::forward_events(overseer_client, handle);
|
||||
let forward = polkadot_overseer::forward_events(overseer_client, handle_clone);
|
||||
|
||||
let forward = forward.fuse();
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
let forward = forward.fuse();
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
pin_mut!(forward);
|
||||
pin_mut!(overseer_fut);
|
||||
pin_mut!(forward);
|
||||
|
||||
select! {
|
||||
_ = forward => (),
|
||||
_ = overseer_fut => (),
|
||||
complete => (),
|
||||
}
|
||||
}),
|
||||
);
|
||||
select! {
|
||||
_ = forward => (),
|
||||
_ = overseer_fut => (),
|
||||
complete => (),
|
||||
}
|
||||
}),
|
||||
);
|
||||
// we should remove this check before we deploy parachains on polkadot
|
||||
// TODO: https://github.com/paritytech/polkadot/issues/3326
|
||||
let should_connect_overseer = chain_spec.is_kusama() ||
|
||||
chain_spec.is_westend() ||
|
||||
chain_spec.is_rococo() ||
|
||||
chain_spec.is_wococo();
|
||||
|
||||
if should_connect_overseer {
|
||||
select_chain.connect_to_overseer(overseer_handle.clone());
|
||||
} else {
|
||||
tracing::info!("Overseer is running in the disconnected state");
|
||||
}
|
||||
Some(handle)
|
||||
} else {
|
||||
@@ -1283,31 +1228,6 @@ where
|
||||
Ok((task_manager, rpc_handlers))
|
||||
}
|
||||
|
||||
macro_rules! chain_ops {
|
||||
($config:expr, $jaeger_agent:expr, $telemetry_worker_handle:expr; $scope:ident, $executor:ident, $variant:ident) => {{
|
||||
let telemetry_worker_handle = $telemetry_worker_handle;
|
||||
let jaeger_agent = $jaeger_agent;
|
||||
let mut config = $config;
|
||||
let basics = new_partial_basics::<$scope::RuntimeApi, $executor>(
|
||||
config,
|
||||
jaeger_agent,
|
||||
telemetry_worker_handle,
|
||||
)?;
|
||||
|
||||
use ::sc_consensus::LongestChain;
|
||||
// use the longest chain selection, since there is no overseer available
|
||||
let chain_selection = LongestChain::new(basics.backend.clone());
|
||||
|
||||
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
|
||||
new_partial::<$scope::RuntimeApi, $executor, LongestChain<_, Block>>(
|
||||
&mut config,
|
||||
basics,
|
||||
chain_selection,
|
||||
)?;
|
||||
Ok((Arc::new(Client::$variant(client)), backend, import_queue, task_manager))
|
||||
}};
|
||||
}
|
||||
|
||||
/// Builds a new object suitable for chain operations.
|
||||
#[cfg(feature = "full-node")]
|
||||
pub fn new_chain_ops(
|
||||
@@ -1324,26 +1244,48 @@ pub fn new_chain_ops(
|
||||
> {
|
||||
config.keystore = service::config::KeystoreConfig::InMemory;
|
||||
|
||||
let telemetry_worker_handle = None;
|
||||
|
||||
#[cfg(feature = "rococo-native")]
|
||||
if config.chain_spec.is_rococo() || config.chain_spec.is_wococo() {
|
||||
return chain_ops!(config, jaeger_agent, telemetry_worker_handle; rococo_runtime, RococoExecutorDispatch, Rococo)
|
||||
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
|
||||
new_partial::<rococo_runtime::RuntimeApi, RococoExecutorDispatch>(
|
||||
config,
|
||||
jaeger_agent,
|
||||
None,
|
||||
)?;
|
||||
return Ok((Arc::new(Client::Rococo(client)), backend, import_queue, task_manager))
|
||||
}
|
||||
|
||||
#[cfg(feature = "kusama-native")]
|
||||
if config.chain_spec.is_kusama() {
|
||||
return chain_ops!(config, jaeger_agent, telemetry_worker_handle; kusama_runtime, KusamaExecutorDispatch, Kusama)
|
||||
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
|
||||
new_partial::<kusama_runtime::RuntimeApi, KusamaExecutorDispatch>(
|
||||
config,
|
||||
jaeger_agent,
|
||||
None,
|
||||
)?;
|
||||
return Ok((Arc::new(Client::Kusama(client)), backend, import_queue, task_manager))
|
||||
}
|
||||
|
||||
#[cfg(feature = "westend-native")]
|
||||
if config.chain_spec.is_westend() {
|
||||
return chain_ops!(config, jaeger_agent, telemetry_worker_handle; westend_runtime, WestendExecutorDispatch, Westend)
|
||||
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
|
||||
new_partial::<westend_runtime::RuntimeApi, WestendExecutorDispatch>(
|
||||
config,
|
||||
jaeger_agent,
|
||||
None,
|
||||
)?;
|
||||
return Ok((Arc::new(Client::Westend(client)), backend, import_queue, task_manager))
|
||||
}
|
||||
|
||||
#[cfg(feature = "polkadot-native")]
|
||||
{
|
||||
chain_ops!(config, jaeger_agent, telemetry_worker_handle; polkadot_runtime, PolkadotExecutorDispatch, Polkadot)
|
||||
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
|
||||
new_partial::<polkadot_runtime::RuntimeApi, PolkadotExecutorDispatch>(
|
||||
config,
|
||||
jaeger_agent,
|
||||
None,
|
||||
)?;
|
||||
return Ok((Arc::new(Client::Polkadot(client)), backend, import_queue, task_manager))
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "polkadot-native"))]
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use super::{AuthorityDiscoveryApi, Block, Error, Hash, IsCollator, Registry, SpawnNamed};
|
||||
use lru::LruCache;
|
||||
use polkadot_availability_distribution::IncomingRequestReceivers;
|
||||
use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig;
|
||||
use polkadot_node_core_av_store::Config as AvailabilityConfig;
|
||||
@@ -23,14 +22,7 @@ use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig
|
||||
use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
|
||||
use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
|
||||
use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver};
|
||||
#[cfg(any(feature = "malus", test))]
|
||||
pub use polkadot_overseer::dummy::DummySubsystem;
|
||||
pub use polkadot_overseer::{
|
||||
metrics::{Metrics, MetricsTrait},
|
||||
AllSubsystems, BlockInfo, HeadSupportsParachains, Overseer, OverseerBuilder, OverseerConnector,
|
||||
OverseerHandle,
|
||||
};
|
||||
|
||||
use polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandle};
|
||||
use polkadot_primitives::v1::ParachainHost;
|
||||
use sc_authority_discovery::Service as AuthorityDiscoveryService;
|
||||
use sc_client_api::AuxStore;
|
||||
@@ -263,176 +255,6 @@ where
|
||||
Ok(all_subsystems)
|
||||
}
|
||||
|
||||
/// Obtain a prepared `OverseerBuilder`, that is initialized
|
||||
/// with all default values.
|
||||
pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
|
||||
OverseerGenArgs {
|
||||
leaves,
|
||||
keystore,
|
||||
runtime_client,
|
||||
parachains_db,
|
||||
network_service,
|
||||
authority_discovery_service,
|
||||
pov_req_receiver,
|
||||
chunk_req_receiver,
|
||||
collation_req_receiver,
|
||||
available_data_req_receiver,
|
||||
statement_req_receiver,
|
||||
dispute_req_receiver,
|
||||
registry,
|
||||
spawner,
|
||||
is_collator,
|
||||
approval_voting_config,
|
||||
availability_config,
|
||||
candidate_validation_config,
|
||||
chain_selection_config,
|
||||
dispute_coordinator_config,
|
||||
}: OverseerGenArgs<'a, Spawner, RuntimeClient>,
|
||||
) -> Result<
|
||||
OverseerBuilder<
|
||||
Spawner,
|
||||
Arc<RuntimeClient>,
|
||||
CandidateValidationSubsystem,
|
||||
CandidateBackingSubsystem<Spawner>,
|
||||
StatementDistributionSubsystem,
|
||||
AvailabilityDistributionSubsystem,
|
||||
AvailabilityRecoverySubsystem,
|
||||
BitfieldSigningSubsystem<Spawner>,
|
||||
BitfieldDistributionSubsystem,
|
||||
ProvisionerSubsystem<Spawner>,
|
||||
RuntimeApiSubsystem<RuntimeClient>,
|
||||
AvailabilityStoreSubsystem,
|
||||
NetworkBridgeSubsystem<
|
||||
Arc<sc_network::NetworkService<Block, Hash>>,
|
||||
AuthorityDiscoveryService,
|
||||
>,
|
||||
ChainApiSubsystem<RuntimeClient>,
|
||||
CollationGenerationSubsystem,
|
||||
CollatorProtocolSubsystem,
|
||||
ApprovalDistributionSubsystem,
|
||||
ApprovalVotingSubsystem,
|
||||
GossipSupportSubsystem,
|
||||
DisputeCoordinatorSubsystem,
|
||||
DisputeParticipationSubsystem,
|
||||
DisputeDistributionSubsystem<AuthorityDiscoveryService>,
|
||||
ChainSelectionSubsystem,
|
||||
>,
|
||||
Error,
|
||||
>
|
||||
where
|
||||
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
|
||||
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin,
|
||||
{
|
||||
use polkadot_node_subsystem_util::metrics::Metrics;
|
||||
use std::iter::FromIterator;
|
||||
|
||||
let metrics = <polkadot_overseer::metrics::Metrics as MetricsTrait>::register(registry)?;
|
||||
|
||||
let builder = Overseer::builder()
|
||||
.availability_distribution(AvailabilityDistributionSubsystem::new(
|
||||
keystore.clone(),
|
||||
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.availability_recovery(AvailabilityRecoverySubsystem::with_chunks_only(
|
||||
available_data_req_receiver,
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.availability_store(AvailabilityStoreSubsystem::new(
|
||||
parachains_db.clone(),
|
||||
availability_config,
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
|
||||
.bitfield_signing(BitfieldSigningSubsystem::new(
|
||||
spawner.clone(),
|
||||
keystore.clone(),
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.candidate_backing(CandidateBackingSubsystem::new(
|
||||
spawner.clone(),
|
||||
keystore.clone(),
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.candidate_validation(CandidateValidationSubsystem::with_config(
|
||||
candidate_validation_config,
|
||||
Metrics::register(registry)?, // candidate-validation metrics
|
||||
Metrics::register(registry)?, // validation host metrics
|
||||
))
|
||||
.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
|
||||
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
|
||||
.collator_protocol({
|
||||
let side = match is_collator {
|
||||
IsCollator::Yes(collator_pair) => ProtocolSide::Collator(
|
||||
network_service.local_peer_id().clone(),
|
||||
collator_pair,
|
||||
collation_req_receiver,
|
||||
Metrics::register(registry)?,
|
||||
),
|
||||
IsCollator::No => ProtocolSide::Validator {
|
||||
keystore: keystore.clone(),
|
||||
eviction_policy: Default::default(),
|
||||
metrics: Metrics::register(registry)?,
|
||||
},
|
||||
};
|
||||
CollatorProtocolSubsystem::new(side)
|
||||
})
|
||||
.network_bridge(NetworkBridgeSubsystem::new(
|
||||
network_service.clone(),
|
||||
authority_discovery_service.clone(),
|
||||
Box::new(network_service.clone()),
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.provisioner(ProvisionerSubsystem::new(spawner.clone(), (), Metrics::register(registry)?))
|
||||
.runtime_api(RuntimeApiSubsystem::new(
|
||||
runtime_client.clone(),
|
||||
Metrics::register(registry)?,
|
||||
spawner.clone(),
|
||||
))
|
||||
.statement_distribution(StatementDistributionSubsystem::new(
|
||||
keystore.clone(),
|
||||
statement_req_receiver,
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.approval_distribution(ApprovalDistributionSubsystem::new(Metrics::register(registry)?))
|
||||
.approval_voting(ApprovalVotingSubsystem::with_config(
|
||||
approval_voting_config,
|
||||
parachains_db.clone(),
|
||||
keystore.clone(),
|
||||
Box::new(network_service.clone()),
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.gossip_support(GossipSupportSubsystem::new(keystore.clone()))
|
||||
.dispute_coordinator(DisputeCoordinatorSubsystem::new(
|
||||
parachains_db.clone(),
|
||||
dispute_coordinator_config,
|
||||
keystore.clone(),
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.dispute_participation(DisputeParticipationSubsystem::new())
|
||||
.dispute_distribution(DisputeDistributionSubsystem::new(
|
||||
keystore.clone(),
|
||||
dispute_req_receiver,
|
||||
authority_discovery_service.clone(),
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
|
||||
.leaves(Vec::from_iter(
|
||||
leaves
|
||||
.into_iter()
|
||||
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
|
||||
))
|
||||
.activation_external_listeners(Default::default())
|
||||
.span_per_active_leaf(Default::default())
|
||||
.active_leaves(Default::default())
|
||||
.supports_parachains(runtime_client)
|
||||
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
|
||||
.metrics(metrics)
|
||||
.spawner(spawner);
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
/// Trait for the `fn` generating the overseer.
|
||||
///
|
||||
/// Default behavior is to create an unmodified overseer, as `RealOverseerGen`
|
||||
@@ -441,7 +263,6 @@ pub trait OverseerGen {
|
||||
/// Overwrite the full generation of the overseer, including the subsystems.
|
||||
fn generate<'a, Spawner, RuntimeClient>(
|
||||
&self,
|
||||
connector: OverseerConnector,
|
||||
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
|
||||
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
|
||||
where
|
||||
@@ -450,22 +271,19 @@ pub trait OverseerGen {
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin,
|
||||
{
|
||||
let gen = RealOverseerGen;
|
||||
RealOverseerGen::generate::<Spawner, RuntimeClient>(&gen, connector, args)
|
||||
RealOverseerGen::generate::<Spawner, RuntimeClient>(&gen, args)
|
||||
}
|
||||
// It would be nice to make `create_subsystems` part of this trait,
|
||||
// but the amount of generic arguments that would be required as
|
||||
// as consequence make this rather annoying to implement and use.
|
||||
}
|
||||
|
||||
use polkadot_overseer::KNOWN_LEAVES_CACHE_SIZE;
|
||||
|
||||
/// The regular set of subsystems.
|
||||
pub struct RealOverseerGen;
|
||||
|
||||
impl OverseerGen for RealOverseerGen {
|
||||
fn generate<'a, Spawner, RuntimeClient>(
|
||||
&self,
|
||||
connector: OverseerConnector,
|
||||
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
|
||||
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
|
||||
where
|
||||
@@ -473,8 +291,14 @@ impl OverseerGen for RealOverseerGen {
|
||||
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin,
|
||||
{
|
||||
prepared_overseer_builder(args)?
|
||||
.build_with_connector(connector)
|
||||
let spawner = args.spawner.clone();
|
||||
let leaves = args.leaves.clone();
|
||||
let runtime_client = args.runtime_client.clone();
|
||||
let registry = args.registry.clone();
|
||||
|
||||
let all_subsystems = create_default_subsystems::<Spawner, RuntimeClient>(args)?;
|
||||
|
||||
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ use super::{HeaderProvider, HeaderProviderProvider};
|
||||
use consensus_common::{Error as ConsensusError, SelectChain};
|
||||
use futures::channel::oneshot;
|
||||
use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
||||
use polkadot_overseer::{AllMessages, Handle};
|
||||
use polkadot_overseer::{AllMessages, Handle, OverseerHandle};
|
||||
use polkadot_primitives::v1::{
|
||||
Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader,
|
||||
};
|
||||
@@ -109,57 +109,66 @@ impl Metrics {
|
||||
}
|
||||
|
||||
/// A chain-selection implementation which provides safety for relay chains.
|
||||
pub struct SelectRelayChain<B: sc_client_api::Backend<PolkadotBlock>> {
|
||||
is_relay_chain: bool,
|
||||
longest_chain: sc_consensus::LongestChain<B, PolkadotBlock>,
|
||||
selection: SelectRelayChainInner<B, Handle>,
|
||||
pub struct SelectRelayChainWithFallback<B: sc_client_api::Backend<PolkadotBlock>> {
|
||||
// A fallback to use in case the overseer is disconnected.
|
||||
//
|
||||
// This is used on relay chains which have not yet enabled
|
||||
// parachains as well as situations where the node is offline.
|
||||
fallback: sc_consensus::LongestChain<B, PolkadotBlock>,
|
||||
selection: SelectRelayChain<B, Handle>,
|
||||
}
|
||||
|
||||
impl<B> Clone for SelectRelayChain<B>
|
||||
impl<B> Clone for SelectRelayChainWithFallback<B>
|
||||
where
|
||||
B: sc_client_api::Backend<PolkadotBlock>,
|
||||
SelectRelayChainInner<B, Handle>: Clone,
|
||||
SelectRelayChain<B, Handle>: Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
is_relay_chain: self.is_relay_chain,
|
||||
longest_chain: self.longest_chain.clone(),
|
||||
selection: self.selection.clone(),
|
||||
Self { fallback: self.fallback.clone(), selection: self.selection.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> SelectRelayChainWithFallback<B>
|
||||
where
|
||||
B: sc_client_api::Backend<PolkadotBlock> + 'static,
|
||||
{
|
||||
/// Create a new [`SelectRelayChainWithFallback`] wrapping the given chain backend
|
||||
/// and a handle to the overseer.
|
||||
pub fn new(backend: Arc<B>, overseer: Handle, metrics: Metrics) -> Self {
|
||||
SelectRelayChainWithFallback {
|
||||
fallback: sc_consensus::LongestChain::new(backend.clone()),
|
||||
selection: SelectRelayChain::new(backend, overseer, metrics),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> SelectRelayChain<B>
|
||||
impl<B> SelectRelayChainWithFallback<B>
|
||||
where
|
||||
B: sc_client_api::Backend<PolkadotBlock> + 'static,
|
||||
{
|
||||
/// Create a new [`SelectRelayChain`] wrapping the given chain backend
|
||||
/// and a handle to the overseer.
|
||||
pub fn new(backend: Arc<B>, is_relay_chain: bool, overseer: Handle, metrics: Metrics) -> Self {
|
||||
SelectRelayChain {
|
||||
is_relay_chain,
|
||||
longest_chain: sc_consensus::LongestChain::new(backend.clone()),
|
||||
selection: SelectRelayChainInner::new(backend, overseer, metrics),
|
||||
}
|
||||
/// Given an overseer handle, this connects the [`SelectRelayChainWithFallback`]'s
|
||||
/// internal handle and its clones to the same overseer.
|
||||
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
|
||||
self.selection.overseer.connect_to_overseer(handle);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>
|
||||
impl<B> SelectChain<PolkadotBlock> for SelectRelayChainWithFallback<B>
|
||||
where
|
||||
B: sc_client_api::Backend<PolkadotBlock> + 'static,
|
||||
{
|
||||
async fn leaves(&self) -> Result<Vec<Hash>, ConsensusError> {
|
||||
if !self.is_relay_chain {
|
||||
return self.longest_chain.leaves().await
|
||||
if self.selection.overseer.is_disconnected() {
|
||||
return self.fallback.leaves().await
|
||||
}
|
||||
|
||||
self.selection.leaves().await
|
||||
}
|
||||
|
||||
async fn best_chain(&self) -> Result<PolkadotHeader, ConsensusError> {
|
||||
if !self.is_relay_chain {
|
||||
return self.longest_chain.best_chain().await
|
||||
if self.selection.overseer.is_disconnected() {
|
||||
return self.fallback.best_chain().await
|
||||
}
|
||||
self.selection.best_chain().await
|
||||
}
|
||||
@@ -170,34 +179,34 @@ where
|
||||
maybe_max_number: Option<BlockNumber>,
|
||||
) -> Result<Option<Hash>, ConsensusError> {
|
||||
let longest_chain_best =
|
||||
self.longest_chain.finality_target(target_hash, maybe_max_number).await?;
|
||||
self.fallback.finality_target(target_hash, maybe_max_number).await?;
|
||||
|
||||
if !self.is_relay_chain {
|
||||
if self.selection.overseer.is_disconnected() {
|
||||
return Ok(longest_chain_best)
|
||||
}
|
||||
self.selection
|
||||
.finality_target_with_longest_chain(target_hash, longest_chain_best, maybe_max_number)
|
||||
.finality_target_with_fallback(target_hash, longest_chain_best, maybe_max_number)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// A chain-selection implementation which provides safety for relay chains
|
||||
/// but does not handle situations where the overseer is not yet connected.
|
||||
pub struct SelectRelayChainInner<B, OH> {
|
||||
pub struct SelectRelayChain<B, OH> {
|
||||
backend: Arc<B>,
|
||||
overseer: OH,
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
impl<B, OH> SelectRelayChainInner<B, OH>
|
||||
impl<B, OH> SelectRelayChain<B, OH>
|
||||
where
|
||||
B: HeaderProviderProvider<PolkadotBlock>,
|
||||
OH: OverseerHandleT,
|
||||
{
|
||||
/// Create a new [`SelectRelayChainInner`] wrapping the given chain backend
|
||||
/// Create a new [`SelectRelayChain`] wrapping the given chain backend
|
||||
/// and a handle to the overseer.
|
||||
pub fn new(backend: Arc<B>, overseer: OH, metrics: Metrics) -> Self {
|
||||
SelectRelayChainInner { backend, overseer, metrics }
|
||||
SelectRelayChain { backend, overseer, metrics }
|
||||
}
|
||||
|
||||
fn block_header(&self, hash: Hash) -> Result<PolkadotHeader, ConsensusError> {
|
||||
@@ -225,13 +234,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, OH> Clone for SelectRelayChainInner<B, OH>
|
||||
impl<B, OH> Clone for SelectRelayChain<B, OH>
|
||||
where
|
||||
B: HeaderProviderProvider<PolkadotBlock> + Send + Sync,
|
||||
OH: OverseerHandleT,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
SelectRelayChainInner {
|
||||
SelectRelayChain {
|
||||
backend: self.backend.clone(),
|
||||
overseer: self.overseer.clone(),
|
||||
metrics: self.metrics.clone(),
|
||||
@@ -264,7 +273,7 @@ impl OverseerHandleT for Handle {
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, OH> SelectRelayChainInner<B, OH>
|
||||
impl<B, OH> SelectRelayChain<B, OH>
|
||||
where
|
||||
B: HeaderProviderProvider<PolkadotBlock>,
|
||||
OH: OverseerHandleT,
|
||||
@@ -308,7 +317,7 @@ where
|
||||
///
|
||||
/// It will also constrain the chain to only chains which are fully
|
||||
/// approved, and chains which contain no disputes.
|
||||
pub(crate) async fn finality_target_with_longest_chain(
|
||||
pub(crate) async fn finality_target_with_fallback(
|
||||
&self,
|
||||
target_hash: Hash,
|
||||
best_leaf: Option<Hash>,
|
||||
|
||||
@@ -79,7 +79,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
|
||||
|
||||
let (finality_target_tx, finality_target_rx) = oneshot::channel::<Option<Hash>>();
|
||||
|
||||
let select_relay_chain = SelectRelayChainInner::<TestChainStorage, TestSubsystemSender>::new(
|
||||
let select_relay_chain = SelectRelayChain::<TestChainStorage, TestSubsystemSender>::new(
|
||||
Arc::new(case_vars.chain.clone()),
|
||||
context.sender().clone(),
|
||||
Default::default(),
|
||||
|
||||
@@ -372,9 +372,7 @@ mod tests {
|
||||
use super::*;
|
||||
use futures::executor::block_on;
|
||||
use polkadot_node_subsystem::messages::CollatorProtocolMessage;
|
||||
use polkadot_overseer::{
|
||||
AllSubsystems, Handle, HeadSupportsParachains, Overseer, OverseerConnector,
|
||||
};
|
||||
use polkadot_overseer::{AllSubsystems, Handle, HeadSupportsParachains, Overseer};
|
||||
use polkadot_primitives::v1::Hash;
|
||||
|
||||
struct AlwaysSupportsParachains;
|
||||
@@ -396,10 +394,9 @@ mod tests {
|
||||
None,
|
||||
AlwaysSupportsParachains,
|
||||
spawner.clone(),
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
let mut handle = Handle::Connected(handle);
|
||||
|
||||
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
|
||||
|
||||
|
||||
@@ -24,9 +24,7 @@
|
||||
pub use jaeger::*;
|
||||
pub use polkadot_node_jaeger as jaeger;
|
||||
|
||||
pub use polkadot_overseer::{
|
||||
self as overseer, dummy::DummySubsystem, ActiveLeavesUpdate, OverseerConnector, OverseerSignal,
|
||||
};
|
||||
pub use polkadot_overseer::{self as overseer, ActiveLeavesUpdate, DummySubsystem, OverseerSignal};
|
||||
|
||||
pub use polkadot_node_subsystem_types::{
|
||||
errors::{self, *},
|
||||
|
||||
Reference in New Issue
Block a user