remove connected disconnected state, 3rd attempt (#3898)

* overseer: remove mut in connector

* rename SelectRelayChainWFallback -> SelectRelayChain

* split Basics

* introduce the OverseerConnector, use it

* introduce is_relay_chain to RelayChainSelection

* chore: rename var

* avoid dummy import in subsystem

* actually remove Disconnecte/Connected enum

* extract DummySubsystem into mod dummy.

* Handle::Connected -> Handle::new

* chore: fmt

* fix test

* select relay chain takes no arg, simplification

* fmt

* Update node/service/src/lib.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* chore: improve malus tests

* avoid the deferred setting of `is_relay_chain` in `RelayChainSelection`

* positive assertion is not mandated, only the negative one, to avoid a stall

* chore: fmt

* assure the `RelayChainSelection` is not used before the overseer is up and running

Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Bernhard Schuster
2021-09-28 15:01:04 +02:00
committed by GitHub
parent 408b49268f
commit c9662531b6
18 changed files with 642 additions and 332 deletions
+1
View File
@@ -6884,6 +6884,7 @@ dependencies = [
"kvdb",
"kvdb-rocksdb",
"log",
"lru",
"pallet-babe",
"pallet-im-online",
"pallet-mmr-primitives",
+2 -1
View File
@@ -185,6 +185,7 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error>
where
@@ -213,7 +214,7 @@ impl OverseerGen for BehaveMaleficient {
),
);
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
.map_err(|e| e.into())
// A builder pattern will simplify this further
+72 -22
View File
@@ -20,8 +20,8 @@ use polkadot_node_subsystem_test_helpers::*;
use polkadot_node_subsystem::{
messages::{AllMessages, AvailabilityStoreMessage},
overseer::{gen::TimeoutExt, Subsystem},
DummySubsystem,
overseer::{dummy::DummySubsystem, gen::TimeoutExt, Subsystem},
SubsystemError,
};
#[derive(Clone, Debug)]
@@ -48,34 +48,38 @@ where
}
}
#[derive(Clone, Debug)]
struct PassInterceptor;
impl<Sender> MessageInterceptor<Sender> for PassInterceptor
where
Sender: overseer::SubsystemSender<AllMessages>
+ overseer::SubsystemSender<AvailabilityStoreMessage>
+ Clone
+ 'static,
{
type Message = AvailabilityStoreMessage;
}
async fn overseer_send<T: Into<AllMessages>>(overseer: &mut TestSubsystemContextHandle<T>, msg: T) {
overseer.send(FromOverseer::Communication { msg }).await;
}
#[test]
fn integrity_test() {
fn launch_harness<F, M, Sub, G>(test_gen: G)
where
F: Future<Output = TestSubsystemContextHandle<M>> + Send,
M: Into<AllMessages> + std::fmt::Debug + Send + 'static,
AllMessages: From<M>,
Sub: Subsystem<TestSubsystemContext<M, sp_core::testing::TaskExecutor>, SubsystemError>,
G: Fn(TestSubsystemContextHandle<M>) -> (F, Sub),
{
let pool = sp_core::testing::TaskExecutor::new();
let (context, mut overseer) = make_subsystem_context(pool);
let (context, overseer) = make_subsystem_context(pool);
let sub = DummySubsystem;
let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor);
// Try to send a message we know is going to be filtered.
let test_fut = async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
};
let (test_fut, subsystem) = test_gen(overseer);
let subsystem = async move {
sub_intercepted.start(context).future.await.unwrap();
subsystem.start(context).future.await.unwrap();
};
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
@@ -88,3 +92,49 @@ fn integrity_test() {
))
.1;
}
#[test]
fn integrity_test_intercept() {
launch_harness(|mut overseer| {
let sub = DummySubsystem;
let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor);
(
async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
},
sub_intercepted,
)
})
}
#[test]
fn integrity_test_pass() {
launch_harness(|mut overseer| {
let sub = DummySubsystem;
let sub_intercepted = InterceptedSubsystem::new(sub, PassInterceptor);
(
async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
},
sub_intercepted,
)
})
}
+3 -2
View File
@@ -37,7 +37,7 @@ use polkadot_cli::{
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateValidationMessage},
overseer::{self, OverseerHandle},
overseer::{self, OverseerConnector, OverseerHandle},
FromOverseer,
};
@@ -86,6 +86,7 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
where
@@ -113,7 +114,7 @@ impl OverseerGen for BehaveMaleficient {
},
);
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
.map_err(|e| e.into())
}
}
@@ -29,7 +29,8 @@ use polkadot_node_subsystem_types::messages::{
use polkadot_overseer::{
self as overseer,
gen::{FromOverseer, SpawnedSubsystem},
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerSignal, SubsystemError,
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerConnector,
OverseerSignal, SubsystemError,
};
use polkadot_primitives::v1::Hash;
@@ -173,8 +174,15 @@ fn main() {
.replace_candidate_validation(|_| Subsystem2)
.replace_candidate_backing(|orig| orig);
let (overseer, _handle) =
Overseer::new(vec![], all_subsystems, None, AlwaysSupportsParachains, spawner).unwrap();
let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
AlwaysSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;
@@ -130,9 +130,13 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
&mut self.handle
}
/// Obtain access to the overseer handle.
pub fn as_handle(&mut self) -> &#handle {
pub fn as_handle(&self) -> &#handle {
&self.handle
}
/// Obtain a clone of the handle.
pub fn handle(&self) -> #handle {
self.handle.clone()
}
}
impl ::std::default::Default for #connector {
+54
View File
@@ -0,0 +1,54 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{AllMessages, OverseerSignal};
use polkadot_node_subsystem_types::errors::SubsystemError;
use polkadot_overseer_gen::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext};
/// A dummy subsystem that implements [`Subsystem`] for all
/// types of messages. Used for tests or as a placeholder.
#[derive(Clone, Copy, Debug)]
pub struct DummySubsystem;
impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
where
Context: SubsystemContext<
Signal = OverseerSignal,
Error = SubsystemError,
AllMessages = AllMessages,
>,
{
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Err(_) => return Ok(()),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(overseer_msg) => {
tracing::debug!(
target: "dummy-subsystem",
"Discarding a message sent from overseer {:?}",
overseer_msg
);
continue
},
}
}
});
SpawnedSubsystem { name: "dummy-subsystem", future }
}
}
+21 -72
View File
@@ -70,7 +70,6 @@ use std::{
use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::RwLock;
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost};
@@ -91,15 +90,18 @@ pub use polkadot_node_subsystem_types::{
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
};
/// Test helper supplements.
pub mod dummy;
pub use self::dummy::DummySubsystem;
// TODO legacy, to be deleted, left for easier integration
// TODO https://github.com/paritytech/polkadot/issues/3427
mod subsystems;
pub use self::subsystems::{AllSubsystems, DummySubsystem};
pub use self::subsystems::AllSubsystems;
mod metrics;
use self::metrics::Metrics;
pub mod metrics;
use polkadot_node_metrics::{
pub use polkadot_node_metrics::{
metrics::{prometheus, Metrics as MetricsTrait},
Metronome,
};
@@ -115,7 +117,7 @@ pub use polkadot_overseer_gen::{
/// Store 2 days worth of blocks, not accounting for forks,
/// in the LRU cache. Assumes a 6-second block time.
const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
pub const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
#[cfg(test)]
mod tests;
@@ -141,18 +143,12 @@ where
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub enum Handle {
/// Used only at initialization to break the cyclic dependency.
// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
/// A handle to the overseer.
Connected(OverseerHandle),
}
pub struct Handle(OverseerHandle);
impl Handle {
/// Create a new disconnected [`Handle`].
pub fn new_disconnected() -> Self {
Self::Disconnected(Arc::new(RwLock::new(None)))
/// Create a new [`Handle`].
pub fn new(raw: OverseerHandle) -> Self {
Self(raw)
}
/// Inform the `Overseer` that that some block was imported.
@@ -201,58 +197,8 @@ impl Handle {
/// Most basic operation, to stop a server.
async fn send_and_log_error(&mut self, event: Event) {
self.try_connect();
if let Self::Connected(ref mut handle) = self {
if handle.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
} else {
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
}
}
/// Whether the handle is disconnected.
pub fn is_disconnected(&self) -> bool {
match self {
Self::Disconnected(ref x) => x.read().is_none(),
_ => false,
}
}
/// Connect this handle and all disconnected clones of it to the overseer.
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
match self {
Self::Disconnected(ref mut x) => {
let mut maybe_handle = x.write();
if maybe_handle.is_none() {
tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
*maybe_handle = Some(handle);
} else {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect a clone of a connected Handle",
);
}
},
_ => {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect an already connected Handle",
);
},
}
}
/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
/// after calling `connect_to_overseer` on `self` or a clone of `self`.
fn try_connect(&mut self) {
if let Self::Disconnected(ref mut x) = self {
let guard = x.write();
if let Some(ref h) = *guard {
let handle = h.clone();
drop(guard);
*self = Self::Connected(handle);
}
if self.0.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
}
}
@@ -439,7 +385,7 @@ pub struct Overseer<SupportsParachains> {
pub known_leaves: LruCache<Hash, ()>,
/// Various Prometheus metrics.
pub metrics: Metrics,
pub metrics: crate::metrics::Metrics,
}
impl<S, SupportsParachains> Overseer<S, SupportsParachains>
@@ -490,12 +436,13 @@ where
/// # use polkadot_primitives::v1::Hash;
/// # use polkadot_overseer::{
/// # self as overseer,
/// # Overseer,
/// # OverseerSignal,
/// # OverseerConnector,
/// # SubsystemSender as _,
/// # AllMessages,
/// # AllSubsystems,
/// # HeadSupportsParachains,
/// # Overseer,
/// # SubsystemError,
/// # gen::{
/// # SubsystemContext,
@@ -549,6 +496,7 @@ where
/// None,
/// AlwaysSupportsParachains,
/// spawner,
/// OverseerConnector::default(),
/// ).unwrap();
///
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
@@ -615,6 +563,7 @@ where
prometheus_registry: Option<&prometheus::Registry>,
supports_parachains: SupportsParachains,
s: S,
connector: OverseerConnector,
) -> SubsystemResult<(Self, OverseerHandle)>
where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
@@ -643,7 +592,7 @@ where
CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send,
S: SpawnNamed,
{
let metrics: Metrics = <Metrics as MetricsTrait>::register(prometheus_registry)?;
let metrics = <crate::metrics::Metrics as MetricsTrait>::register(prometheus_registry)?;
let (mut overseer, handle) = Self::builder()
.candidate_validation(all_subsystems.candidate_validation)
@@ -679,7 +628,7 @@ where
.supports_parachains(supports_parachains)
.metrics(metrics.clone())
.spawner(s)
.build()?;
.build_with_connector(connector)?;
// spawn the metrics metronome task
{
+2 -2
View File
@@ -17,7 +17,7 @@
//! Prometheus metrics related to the overseer and its channels.
use super::*;
use polkadot_node_metrics::metrics::{self, prometheus};
pub use polkadot_node_metrics::metrics::{self, prometheus, Metrics as MetricsTrait};
use parity_util_mem::MemoryAllocationSnapshot;
@@ -110,7 +110,7 @@ impl Metrics {
}
}
impl metrics::Metrics for Metrics {
impl MetricsTrait for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
activated_heads_total: prometheus::register(
+2 -40
View File
@@ -19,47 +19,9 @@
//! In the future, everything should be set up using the generated
//! overseer builder pattern instead.
use crate::{AllMessages, OverseerSignal};
use polkadot_node_subsystem_types::errors::SubsystemError;
use crate::dummy::DummySubsystem;
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
use polkadot_overseer_gen::{
FromOverseer, MapSubsystem, SpawnedSubsystem, Subsystem, SubsystemContext,
};
/// A dummy subsystem that implements [`Subsystem`] for all
/// types of messages. Used for tests or as a placeholder.
#[derive(Clone, Copy, Debug)]
pub struct DummySubsystem;
impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
where
Context: SubsystemContext<
Signal = OverseerSignal,
Error = SubsystemError,
AllMessages = AllMessages,
>,
{
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Err(_) => return Ok(()),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(overseer_msg) => {
tracing::debug!(
target: "dummy-subsystem",
"Discarding a message sent from overseer {:?}",
overseer_msg
);
continue
},
}
}
});
SpawnedSubsystem { name: "dummy-subsystem", future }
}
}
use polkadot_overseer_gen::MapSubsystem;
/// This struct is passed as an argument to create a new instance of an [`Overseer`].
///
+55 -19
View File
@@ -32,7 +32,7 @@ use polkadot_primitives::v1::{
ValidatorIndex,
};
use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer};
use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer, OverseerConnector};
use metered_channel as metered;
use assert_matches::assert_matches;
@@ -164,9 +164,16 @@ fn overseer_works() {
.replace_candidate_validation(move |_| TestSubsystem1(s1_tx))
.replace_candidate_backing(move |_| TestSubsystem2(s2_tx));
let (overseer, handle) =
Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap();
let mut handle = Handle::Connected(handle);
let (overseer, handle) = Overseer::new(
vec![],
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
@@ -227,9 +234,10 @@ fn overseer_metrics_work() {
Some(&registry),
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let mut handle = Handle::Connected(handle);
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
@@ -280,8 +288,15 @@ fn overseer_ends_on_subsystem_exit() {
executor::block_on(async move {
let all_subsystems =
AllSubsystems::<()>::dummy().replace_candidate_backing(|_| ReturnOnStart);
let (overseer, _handle) =
Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap();
let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
overseer.run().await.unwrap();
})
@@ -382,10 +397,16 @@ fn overseer_start_stop_works() {
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6));
let (overseer, handle) =
Overseer::new(vec![first_block], all_subsystems, None, MockSupportsParachains, spawner)
.unwrap();
let mut handle = Handle::Connected(handle);
let (overseer, handle) = Overseer::new(
vec![first_block],
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
@@ -486,9 +507,10 @@ fn overseer_finalize_works() {
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let mut handle = Handle::Connected(handle);
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
@@ -573,10 +595,16 @@ fn do_not_send_empty_leaves_update_on_block_finalization() {
let all_subsystems =
AllSubsystems::<()>::dummy().replace_candidate_backing(move |_| TestSubsystem6(tx_5));
let (overseer, handle) =
Overseer::new(Vec::new(), all_subsystems, None, MockSupportsParachains, spawner)
.unwrap();
let mut handle = Handle::Connected(handle);
let (overseer, handle) = Overseer::new(
Vec::new(),
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
@@ -849,9 +877,17 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
dispute_distribution: subsystem.clone(),
chain_selection: subsystem.clone(),
};
let (overseer, handle) =
Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap();
let mut handle = Handle::Connected(handle);
let (overseer, handle) = Overseer::new(
vec![],
all_subsystems,
None,
MockSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let mut handle = Handle::new(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
+1
View File
@@ -68,6 +68,7 @@ thiserror = "1.0.26"
kvdb = "0.10.0"
kvdb-rocksdb = { version = "0.14.0", optional = true }
async-trait = "0.1.51"
lru = "0.6"
# Polkadot
polkadot-node-core-parachains-inherent = { path = "../core/parachains-inherent" }
+178 -111
View File
@@ -54,7 +54,7 @@ use {
pub use sp_core::traits::SpawnNamed;
#[cfg(feature = "full-node")]
pub use {
polkadot_overseer::{Handle, Overseer, OverseerHandle},
polkadot_overseer::{Handle, Overseer, OverseerConnector, OverseerHandle},
polkadot_primitives::v1::ParachainHost,
sc_client_api::AuxStore,
sp_authority_discovery::AuthorityDiscoveryApi,
@@ -68,6 +68,8 @@ use polkadot_subsystem::jaeger;
use std::{sync::Arc, time::Duration};
use prometheus_endpoint::Registry;
#[cfg(feature = "full-node")]
use service::KeystoreContainer;
use service::RpcHandlers;
use telemetry::TelemetryWorker;
#[cfg(feature = "full-node")]
@@ -302,14 +304,15 @@ fn jaeger_launch_collector_with_agent(
}
#[cfg(feature = "full-node")]
type FullSelectChain = relay_chain_selection::SelectRelayChainWithFallback<FullBackend>;
type FullSelectChain = relay_chain_selection::SelectRelayChain<FullBackend>;
#[cfg(feature = "full-node")]
type FullGrandpaBlockImport<RuntimeApi, ExecutorDispatch> = grandpa::GrandpaBlockImport<
FullBackend,
Block,
FullClient<RuntimeApi, ExecutorDispatch>,
FullSelectChain,
>;
type FullGrandpaBlockImport<RuntimeApi, ExecutorDispatch, ChainSelection = FullSelectChain> =
grandpa::GrandpaBlockImport<
FullBackend,
Block,
FullClient<RuntimeApi, ExecutorDispatch>,
ChainSelection,
>;
#[cfg(feature = "light-node")]
type LightBackend = service::TLightBackendWithHash<Block, sp_runtime::traits::BlakeTwo256>;
@@ -319,36 +322,29 @@ type LightClient<RuntimeApi, ExecutorDispatch> =
service::TLightClientWithBackend<Block, RuntimeApi, ExecutorDispatch, LightBackend>;
#[cfg(feature = "full-node")]
fn new_partial<RuntimeApi, ExecutorDispatch>(
struct Basics<RuntimeApi, ExecutorDispatch>
where
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
+ Send
+ Sync
+ 'static,
RuntimeApi::RuntimeApi:
RuntimeApiCollection<StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>,
ExecutorDispatch: NativeExecutionDispatch + 'static,
{
task_manager: TaskManager,
client: Arc<FullClient<RuntimeApi, ExecutorDispatch>>,
backend: Arc<FullBackend>,
keystore_container: KeystoreContainer,
telemetry: Option<Telemetry>,
}
#[cfg(feature = "full-node")]
fn new_partial_basics<RuntimeApi, ExecutorDispatch>(
config: &mut Configuration,
jaeger_agent: Option<std::net::SocketAddr>,
telemetry_worker_handle: Option<TelemetryWorkerHandle>,
) -> Result<
service::PartialComponents<
FullClient<RuntimeApi, ExecutorDispatch>,
FullBackend,
FullSelectChain,
sc_consensus::DefaultImportQueue<Block, FullClient<RuntimeApi, ExecutorDispatch>>,
sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi, ExecutorDispatch>>,
(
impl service::RpcExtensionBuilder,
(
babe::BabeBlockImport<
Block,
FullClient<RuntimeApi, ExecutorDispatch>,
FullGrandpaBlockImport<RuntimeApi, ExecutorDispatch>,
>,
grandpa::LinkHalf<Block, FullClient<RuntimeApi, ExecutorDispatch>, FullSelectChain>,
babe::BabeLink<Block>,
beefy_gadget::notification::BeefySignedCommitmentSender<Block>,
),
grandpa::SharedVoterState,
std::time::Duration, // slot-duration
Option<Telemetry>,
),
>,
Error,
>
) -> Result<Basics<RuntimeApi, ExecutorDispatch>, Error>
where
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
+ Send
@@ -400,12 +396,53 @@ where
jaeger_launch_collector_with_agent(task_manager.spawn_handle(), &*config, jaeger_agent)?;
let select_chain = relay_chain_selection::SelectRelayChainWithFallback::new(
backend.clone(),
Handle::new_disconnected(),
polkadot_node_subsystem_util::metrics::Metrics::register(config.prometheus_registry())?,
);
Ok(Basics { task_manager, client, backend, keystore_container, telemetry })
}
#[cfg(feature = "full-node")]
fn new_partial<RuntimeApi, ExecutorDispatch, ChainSelection>(
config: &mut Configuration,
Basics { task_manager, backend, client, keystore_container, telemetry }: Basics<
RuntimeApi,
ExecutorDispatch,
>,
select_chain: ChainSelection,
) -> Result<
service::PartialComponents<
FullClient<RuntimeApi, ExecutorDispatch>,
FullBackend,
ChainSelection,
sc_consensus::DefaultImportQueue<Block, FullClient<RuntimeApi, ExecutorDispatch>>,
sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi, ExecutorDispatch>>,
(
impl service::RpcExtensionBuilder,
(
babe::BabeBlockImport<
Block,
FullClient<RuntimeApi, ExecutorDispatch>,
FullGrandpaBlockImport<RuntimeApi, ExecutorDispatch, ChainSelection>,
>,
grandpa::LinkHalf<Block, FullClient<RuntimeApi, ExecutorDispatch>, ChainSelection>,
babe::BabeLink<Block>,
beefy_gadget::notification::BeefySignedCommitmentSender<Block>,
),
grandpa::SharedVoterState,
std::time::Duration, // slot-duration
Option<Telemetry>,
),
>,
Error,
>
where
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
+ Send
+ Sync
+ 'static,
RuntimeApi::RuntimeApi:
RuntimeApiCollection<StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>,
ExecutorDispatch: NativeExecutionDispatch + 'static,
ChainSelection: 'static + SelectChain<Block>,
{
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
@@ -674,16 +711,7 @@ where
let disable_grandpa = config.disable_grandpa;
let name = config.network.node_name.clone();
let service::PartialComponents {
client,
backend,
mut task_manager,
keystore_container,
mut select_chain,
import_queue,
transaction_pool,
other: (rpc_extensions_builder, import_setup, rpc_setup, slot_duration, mut telemetry),
} = new_partial::<RuntimeApi, ExecutorDispatch>(
let basics = new_partial_basics::<RuntimeApi, ExecutorDispatch>(
&mut config,
jaeger_agent,
telemetry_worker_handle,
@@ -691,6 +719,46 @@ where
let prometheus_registry = config.prometheus_registry().cloned();
let overseer_connector = OverseerConnector::default();
let overseer_handle = Handle::new(overseer_connector.handle());
let chain_spec = config.chain_spec.cloned_box();
// we should remove this check before we deploy parachains on polkadot
// TODO: https://github.com/paritytech/polkadot/issues/3326
let is_relay_chain = chain_spec.is_kusama() ||
chain_spec.is_westend() ||
chain_spec.is_rococo() ||
chain_spec.is_wococo();
let local_keystore = basics.keystore_container.local_keystore();
let requires_overseer_for_chain_sel = local_keystore.is_some() &&
is_relay_chain &&
(role.is_authority() || is_collator.is_collator());
use relay_chain_selection::SelectRelayChain;
let select_chain = SelectRelayChain::new(
basics.backend.clone(),
overseer_handle.clone(),
requires_overseer_for_chain_sel,
polkadot_node_subsystem_util::metrics::Metrics::register(prometheus_registry.as_ref())?,
);
let service::PartialComponents::<_, _, SelectRelayChain<_>, _, _, _> {
client,
backend,
mut task_manager,
keystore_container,
select_chain,
import_queue,
transaction_pool,
other: (rpc_extensions_builder, import_setup, rpc_setup, slot_duration, mut telemetry),
} = new_partial::<RuntimeApi, ExecutorDispatch, SelectRelayChain<_>>(
&mut config,
basics,
select_chain,
)?;
let shared_voter_state = rpc_setup;
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
@@ -699,7 +767,7 @@ where
// Substrate nodes.
config.network.extra_sets.push(grandpa::grandpa_peers_set_config());
if config.chain_spec.is_rococo() || config.chain_spec.is_wococo() {
if chain_spec.is_rococo() || chain_spec.is_wococo() {
config.network.extra_sets.push(beefy_gadget::beefy_peers_set_config());
}
@@ -784,7 +852,6 @@ where
col_data: crate::parachains_db::REAL_COLUMNS.col_dispute_coordinator_data,
};
let chain_spec = config.chain_spec.cloned_box();
let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams {
config,
backend: backend.clone(),
@@ -804,7 +871,10 @@ where
let overseer_client = client.clone();
let spawner = task_manager.spawn_handle();
let active_leaves = futures::executor::block_on(active_leaves(&select_chain, &*client))?;
// Cannot use the `RelayChainSelection`, since that'd require a setup _and running_ overseer
// which we are about to setup.
let active_leaves =
futures::executor::block_on(active_leaves(select_chain.as_longest_chain(), &*client))?;
let authority_discovery_service = if role.is_authority() || is_collator.is_collator() {
use futures::StreamExt;
@@ -841,7 +911,6 @@ where
None
};
let local_keystore = keystore_container.local_keystore();
if local_keystore.is_none() {
tracing::info!("Cannot run as validator without local keystore.");
}
@@ -852,6 +921,7 @@ where
let overseer_handle = if let Some((authority_discovery_service, keystore)) = maybe_params {
let (overseer, overseer_handle) = overseer_gen
.generate::<service::SpawnTaskHandle, FullClient<RuntimeApi, ExecutorDispatch>>(
overseer_connector,
OverseerGenArgs {
leaves: active_leaves,
keystore,
@@ -875,43 +945,37 @@ where
dispute_coordinator_config,
},
)?;
let handle = Handle::Connected(overseer_handle.clone());
let handle_clone = handle.clone();
let handle = Handle::new(overseer_handle.clone());
task_manager.spawn_essential_handle().spawn_blocking(
"overseer",
Box::pin(async move {
use futures::{pin_mut, select, FutureExt};
{
let handle = handle.clone();
task_manager.spawn_essential_handle().spawn_blocking(
"overseer",
Box::pin(async move {
use futures::{pin_mut, select, FutureExt};
let forward = polkadot_overseer::forward_events(overseer_client, handle_clone);
let forward = polkadot_overseer::forward_events(overseer_client, handle);
let forward = forward.fuse();
let overseer_fut = overseer.run().fuse();
let forward = forward.fuse();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
pin_mut!(forward);
pin_mut!(overseer_fut);
pin_mut!(forward);
select! {
_ = forward => (),
_ = overseer_fut => (),
complete => (),
}
}),
);
// we should remove this check before we deploy parachains on polkadot
// TODO: https://github.com/paritytech/polkadot/issues/3326
let should_connect_overseer = chain_spec.is_kusama() ||
chain_spec.is_westend() ||
chain_spec.is_rococo() ||
chain_spec.is_wococo();
if should_connect_overseer {
select_chain.connect_to_overseer(overseer_handle.clone());
} else {
tracing::info!("Overseer is running in the disconnected state");
select! {
_ = forward => (),
_ = overseer_fut => (),
complete => (),
}
}),
);
}
Some(handle)
} else {
assert!(
!requires_overseer_for_chain_sel,
"Precondition congruence (false) is guaranteed by manual checking. qed"
);
None
};
@@ -1228,6 +1292,31 @@ where
Ok((task_manager, rpc_handlers))
}
macro_rules! chain_ops {
($config:expr, $jaeger_agent:expr, $telemetry_worker_handle:expr; $scope:ident, $executor:ident, $variant:ident) => {{
let telemetry_worker_handle = $telemetry_worker_handle;
let jaeger_agent = $jaeger_agent;
let mut config = $config;
let basics = new_partial_basics::<$scope::RuntimeApi, $executor>(
config,
jaeger_agent,
telemetry_worker_handle,
)?;
use ::sc_consensus::LongestChain;
// use the longest chain selection, since there is no overseer available
let chain_selection = LongestChain::new(basics.backend.clone());
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
new_partial::<$scope::RuntimeApi, $executor, LongestChain<_, Block>>(
&mut config,
basics,
chain_selection,
)?;
Ok((Arc::new(Client::$variant(client)), backend, import_queue, task_manager))
}};
}
/// Builds a new object suitable for chain operations.
#[cfg(feature = "full-node")]
pub fn new_chain_ops(
@@ -1244,48 +1333,26 @@ pub fn new_chain_ops(
> {
config.keystore = service::config::KeystoreConfig::InMemory;
let telemetry_worker_handle = None;
#[cfg(feature = "rococo-native")]
if config.chain_spec.is_rococo() || config.chain_spec.is_wococo() {
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
new_partial::<rococo_runtime::RuntimeApi, RococoExecutorDispatch>(
config,
jaeger_agent,
None,
)?;
return Ok((Arc::new(Client::Rococo(client)), backend, import_queue, task_manager))
return chain_ops!(config, jaeger_agent, telemetry_worker_handle; rococo_runtime, RococoExecutorDispatch, Rococo)
}
#[cfg(feature = "kusama-native")]
if config.chain_spec.is_kusama() {
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
new_partial::<kusama_runtime::RuntimeApi, KusamaExecutorDispatch>(
config,
jaeger_agent,
None,
)?;
return Ok((Arc::new(Client::Kusama(client)), backend, import_queue, task_manager))
return chain_ops!(config, jaeger_agent, telemetry_worker_handle; kusama_runtime, KusamaExecutorDispatch, Kusama)
}
#[cfg(feature = "westend-native")]
if config.chain_spec.is_westend() {
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
new_partial::<westend_runtime::RuntimeApi, WestendExecutorDispatch>(
config,
jaeger_agent,
None,
)?;
return Ok((Arc::new(Client::Westend(client)), backend, import_queue, task_manager))
return chain_ops!(config, jaeger_agent, telemetry_worker_handle; westend_runtime, WestendExecutorDispatch, Westend)
}
#[cfg(feature = "polkadot-native")]
{
let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
new_partial::<polkadot_runtime::RuntimeApi, PolkadotExecutorDispatch>(
config,
jaeger_agent,
None,
)?;
return Ok((Arc::new(Client::Polkadot(client)), backend, import_queue, task_manager))
return chain_ops!(config, jaeger_agent, telemetry_worker_handle; polkadot_runtime, PolkadotExecutorDispatch, Polkadot)
}
#[cfg(not(feature = "polkadot-native"))]
+185 -10
View File
@@ -15,6 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::{AuthorityDiscoveryApi, Block, Error, Hash, IsCollator, Registry, SpawnNamed};
use lru::LruCache;
use polkadot_availability_distribution::IncomingRequestReceivers;
use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig;
use polkadot_node_core_av_store::Config as AvailabilityConfig;
@@ -22,7 +23,13 @@ use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig
use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver};
use polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandle};
#[cfg(any(feature = "malus", test))]
pub use polkadot_overseer::dummy::DummySubsystem;
pub use polkadot_overseer::{
metrics::Metrics, AllSubsystems, BlockInfo, HeadSupportsParachains, MetricsTrait, Overseer,
OverseerBuilder, OverseerConnector, OverseerHandle,
};
use polkadot_primitives::v1::ParachainHost;
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_client_api::AuxStore;
@@ -258,6 +265,176 @@ where
Ok(all_subsystems)
}
/// Obtain a prepared `OverseerBuilder`, that is initialized
/// with all default values.
pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
OverseerGenArgs {
leaves,
keystore,
runtime_client,
parachains_db,
network_service,
authority_discovery_service,
pov_req_receiver,
chunk_req_receiver,
collation_req_receiver,
available_data_req_receiver,
statement_req_receiver,
dispute_req_receiver,
registry,
spawner,
is_collator,
approval_voting_config,
availability_config,
candidate_validation_config,
chain_selection_config,
dispute_coordinator_config,
}: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<
OverseerBuilder<
Spawner,
Arc<RuntimeClient>,
CandidateValidationSubsystem,
CandidateBackingSubsystem<Spawner>,
StatementDistributionSubsystem,
AvailabilityDistributionSubsystem,
AvailabilityRecoverySubsystem,
BitfieldSigningSubsystem<Spawner>,
BitfieldDistributionSubsystem,
ProvisionerSubsystem<Spawner>,
RuntimeApiSubsystem<RuntimeClient>,
AvailabilityStoreSubsystem,
NetworkBridgeSubsystem<
Arc<sc_network::NetworkService<Block, Hash>>,
AuthorityDiscoveryService,
>,
ChainApiSubsystem<RuntimeClient>,
CollationGenerationSubsystem,
CollatorProtocolSubsystem,
ApprovalDistributionSubsystem,
ApprovalVotingSubsystem,
GossipSupportSubsystem,
DisputeCoordinatorSubsystem,
DisputeParticipationSubsystem,
DisputeDistributionSubsystem<AuthorityDiscoveryService>,
ChainSelectionSubsystem,
>,
Error,
>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
use polkadot_node_subsystem_util::metrics::Metrics;
use std::iter::FromIterator;
let metrics = <polkadot_overseer::metrics::Metrics as MetricsTrait>::register(registry)?;
let builder = Overseer::builder()
.availability_distribution(AvailabilityDistributionSubsystem::new(
keystore.clone(),
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
Metrics::register(registry)?,
))
.availability_recovery(AvailabilityRecoverySubsystem::with_chunks_only(
available_data_req_receiver,
Metrics::register(registry)?,
))
.availability_store(AvailabilityStoreSubsystem::new(
parachains_db.clone(),
availability_config,
Metrics::register(registry)?,
))
.bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
.bitfield_signing(BitfieldSigningSubsystem::new(
spawner.clone(),
keystore.clone(),
Metrics::register(registry)?,
))
.candidate_backing(CandidateBackingSubsystem::new(
spawner.clone(),
keystore.clone(),
Metrics::register(registry)?,
))
.candidate_validation(CandidateValidationSubsystem::with_config(
candidate_validation_config,
Metrics::register(registry)?, // candidate-validation metrics
Metrics::register(registry)?, // validation host metrics
))
.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
.collator_protocol({
let side = match is_collator {
IsCollator::Yes(collator_pair) => ProtocolSide::Collator(
network_service.local_peer_id().clone(),
collator_pair,
collation_req_receiver,
Metrics::register(registry)?,
),
IsCollator::No => ProtocolSide::Validator {
keystore: keystore.clone(),
eviction_policy: Default::default(),
metrics: Metrics::register(registry)?,
},
};
CollatorProtocolSubsystem::new(side)
})
.network_bridge(NetworkBridgeSubsystem::new(
network_service.clone(),
authority_discovery_service.clone(),
Box::new(network_service.clone()),
Metrics::register(registry)?,
))
.provisioner(ProvisionerSubsystem::new(spawner.clone(), (), Metrics::register(registry)?))
.runtime_api(RuntimeApiSubsystem::new(
runtime_client.clone(),
Metrics::register(registry)?,
spawner.clone(),
))
.statement_distribution(StatementDistributionSubsystem::new(
keystore.clone(),
statement_req_receiver,
Metrics::register(registry)?,
))
.approval_distribution(ApprovalDistributionSubsystem::new(Metrics::register(registry)?))
.approval_voting(ApprovalVotingSubsystem::with_config(
approval_voting_config,
parachains_db.clone(),
keystore.clone(),
Box::new(network_service.clone()),
Metrics::register(registry)?,
))
.gossip_support(GossipSupportSubsystem::new(keystore.clone()))
.dispute_coordinator(DisputeCoordinatorSubsystem::new(
parachains_db.clone(),
dispute_coordinator_config,
keystore.clone(),
Metrics::register(registry)?,
))
.dispute_participation(DisputeParticipationSubsystem::new())
.dispute_distribution(DisputeDistributionSubsystem::new(
keystore.clone(),
dispute_req_receiver,
authority_discovery_service.clone(),
Metrics::register(registry)?,
))
.chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
.leaves(Vec::from_iter(
leaves
.into_iter()
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
))
.activation_external_listeners(Default::default())
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.supports_parachains(runtime_client)
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
.metrics(metrics)
.spawner(spawner);
Ok(builder)
}
/// Trait for the `fn` generating the overseer.
///
/// Default behavior is to create an unmodified overseer, as `RealOverseerGen`
@@ -266,6 +443,7 @@ pub trait OverseerGen {
/// Overwrite the full generation of the overseer, including the subsystems.
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
where
@@ -274,19 +452,22 @@ pub trait OverseerGen {
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
let gen = RealOverseerGen;
RealOverseerGen::generate::<Spawner, RuntimeClient>(&gen, args)
RealOverseerGen::generate::<Spawner, RuntimeClient>(&gen, connector, args)
}
// It would be nice to make `create_subsystems` part of this trait,
// but the amount of generic arguments that would be required as
// as consequence make this rather annoying to implement and use.
}
use polkadot_overseer::KNOWN_LEAVES_CACHE_SIZE;
/// The regular set of subsystems.
pub struct RealOverseerGen;
impl OverseerGen for RealOverseerGen {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
where
@@ -294,14 +475,8 @@ impl OverseerGen for RealOverseerGen {
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
let spawner = args.spawner.clone();
let leaves = args.leaves.clone();
let runtime_client = args.runtime_client.clone();
let registry = args.registry.clone();
let all_subsystems = create_default_subsystems::<Spawner, RuntimeClient>(args)?;
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
prepared_overseer_builder(args)?
.build_with_connector(connector)
.map_err(|e| e.into())
}
}
@@ -39,7 +39,7 @@ use super::{HeaderProvider, HeaderProviderProvider};
use consensus_common::{Error as ConsensusError, SelectChain};
use futures::channel::oneshot;
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_overseer::{AllMessages, Handle, OverseerHandle};
use polkadot_overseer::{AllMessages, Handle};
use polkadot_primitives::v1::{
Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader,
};
@@ -109,66 +109,62 @@ impl Metrics {
}
/// A chain-selection implementation which provides safety for relay chains.
pub struct SelectRelayChainWithFallback<B: sc_client_api::Backend<PolkadotBlock>> {
// A fallback to use in case the overseer is disconnected.
//
// This is used on relay chains which have not yet enabled
// parachains as well as situations where the node is offline.
fallback: sc_consensus::LongestChain<B, PolkadotBlock>,
selection: SelectRelayChain<B, Handle>,
pub struct SelectRelayChain<B: sc_client_api::Backend<PolkadotBlock>> {
is_relay_chain: bool,
longest_chain: sc_consensus::LongestChain<B, PolkadotBlock>,
selection: SelectRelayChainInner<B, Handle>,
}
impl<B> Clone for SelectRelayChainWithFallback<B>
impl<B> Clone for SelectRelayChain<B>
where
B: sc_client_api::Backend<PolkadotBlock>,
SelectRelayChain<B, Handle>: Clone,
SelectRelayChainInner<B, Handle>: Clone,
{
fn clone(&self) -> Self {
Self { fallback: self.fallback.clone(), selection: self.selection.clone() }
}
}
impl<B> SelectRelayChainWithFallback<B>
where
B: sc_client_api::Backend<PolkadotBlock> + 'static,
{
/// Create a new [`SelectRelayChainWithFallback`] wrapping the given chain backend
/// and a handle to the overseer.
pub fn new(backend: Arc<B>, overseer: Handle, metrics: Metrics) -> Self {
SelectRelayChainWithFallback {
fallback: sc_consensus::LongestChain::new(backend.clone()),
selection: SelectRelayChain::new(backend, overseer, metrics),
Self {
longest_chain: self.longest_chain.clone(),
is_relay_chain: self.is_relay_chain,
selection: self.selection.clone(),
}
}
}
impl<B> SelectRelayChainWithFallback<B>
impl<B> SelectRelayChain<B>
where
B: sc_client_api::Backend<PolkadotBlock> + 'static,
{
/// Given an overseer handle, this connects the [`SelectRelayChainWithFallback`]'s
/// internal handle and its clones to the same overseer.
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
self.selection.overseer.connect_to_overseer(handle);
/// Create a new [`SelectRelayChain`] wrapping the given chain backend
/// and a handle to the overseer.
pub fn new(backend: Arc<B>, overseer: Handle, is_relay_chain: bool, metrics: Metrics) -> Self {
SelectRelayChain {
longest_chain: sc_consensus::LongestChain::new(backend.clone()),
selection: SelectRelayChainInner::new(backend, overseer, metrics),
is_relay_chain,
}
}
/// Allow access to the inner chain, for usage during the node setup.
pub fn as_longest_chain(&self) -> &sc_consensus::LongestChain<B, PolkadotBlock> {
&self.longest_chain
}
}
#[async_trait::async_trait]
impl<B> SelectChain<PolkadotBlock> for SelectRelayChainWithFallback<B>
impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>
where
B: sc_client_api::Backend<PolkadotBlock> + 'static,
{
async fn leaves(&self) -> Result<Vec<Hash>, ConsensusError> {
if self.selection.overseer.is_disconnected() {
return self.fallback.leaves().await
if !self.is_relay_chain {
return self.longest_chain.leaves().await
}
self.selection.leaves().await
}
async fn best_chain(&self) -> Result<PolkadotHeader, ConsensusError> {
if self.selection.overseer.is_disconnected() {
return self.fallback.best_chain().await
if !self.is_relay_chain {
return self.longest_chain.best_chain().await
}
self.selection.best_chain().await
}
@@ -179,34 +175,34 @@ where
maybe_max_number: Option<BlockNumber>,
) -> Result<Option<Hash>, ConsensusError> {
let longest_chain_best =
self.fallback.finality_target(target_hash, maybe_max_number).await?;
self.longest_chain.finality_target(target_hash, maybe_max_number).await?;
if self.selection.overseer.is_disconnected() {
if !self.is_relay_chain {
return Ok(longest_chain_best)
}
self.selection
.finality_target_with_fallback(target_hash, longest_chain_best, maybe_max_number)
.finality_target_with_longest_chain(target_hash, longest_chain_best, maybe_max_number)
.await
}
}
/// A chain-selection implementation which provides safety for relay chains
/// but does not handle situations where the overseer is not yet connected.
pub struct SelectRelayChain<B, OH> {
pub struct SelectRelayChainInner<B, OH> {
backend: Arc<B>,
overseer: OH,
metrics: Metrics,
}
impl<B, OH> SelectRelayChain<B, OH>
impl<B, OH> SelectRelayChainInner<B, OH>
where
B: HeaderProviderProvider<PolkadotBlock>,
OH: OverseerHandleT,
{
/// Create a new [`SelectRelayChain`] wrapping the given chain backend
/// Create a new [`SelectRelayChainInner`] wrapping the given chain backend
/// and a handle to the overseer.
pub fn new(backend: Arc<B>, overseer: OH, metrics: Metrics) -> Self {
SelectRelayChain { backend, overseer, metrics }
SelectRelayChainInner { backend, overseer, metrics }
}
fn block_header(&self, hash: Hash) -> Result<PolkadotHeader, ConsensusError> {
@@ -234,13 +230,13 @@ where
}
}
impl<B, OH> Clone for SelectRelayChain<B, OH>
impl<B, OH> Clone for SelectRelayChainInner<B, OH>
where
B: HeaderProviderProvider<PolkadotBlock> + Send + Sync,
OH: OverseerHandleT,
{
fn clone(&self) -> Self {
SelectRelayChain {
SelectRelayChainInner {
backend: self.backend.clone(),
overseer: self.overseer.clone(),
metrics: self.metrics.clone(),
@@ -273,7 +269,7 @@ impl OverseerHandleT for Handle {
}
}
impl<B, OH> SelectRelayChain<B, OH>
impl<B, OH> SelectRelayChainInner<B, OH>
where
B: HeaderProviderProvider<PolkadotBlock>,
OH: OverseerHandleT,
@@ -317,7 +313,7 @@ where
///
/// It will also constrain the chain to only chains which are fully
/// approved, and chains which contain no disputes.
pub(crate) async fn finality_target_with_fallback(
pub(crate) async fn finality_target_with_longest_chain(
&self,
target_hash: Hash,
best_leaf: Option<Hash>,
+1 -1
View File
@@ -79,7 +79,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
let (finality_target_tx, finality_target_rx) = oneshot::channel::<Option<Hash>>();
let select_relay_chain = SelectRelayChain::<TestChainStorage, TestSubsystemSender>::new(
let select_relay_chain = SelectRelayChainInner::<TestChainStorage, TestSubsystemSender>::new(
Arc::new(case_vars.chain.clone()),
context.sender().clone(),
Default::default(),
@@ -372,7 +372,9 @@ mod tests {
use super::*;
use futures::executor::block_on;
use polkadot_node_subsystem::messages::CollatorProtocolMessage;
use polkadot_overseer::{AllSubsystems, Handle, HeadSupportsParachains, Overseer};
use polkadot_overseer::{
AllSubsystems, Handle, HeadSupportsParachains, Overseer, OverseerConnector,
};
use polkadot_primitives::v1::Hash;
struct AlwaysSupportsParachains;
@@ -394,9 +396,10 @@ mod tests {
None,
AlwaysSupportsParachains,
spawner.clone(),
OverseerConnector::default(),
)
.unwrap();
let mut handle = Handle::Connected(handle);
let mut handle = Handle::new(handle);
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
+3 -1
View File
@@ -24,7 +24,9 @@
pub use jaeger::*;
pub use polkadot_node_jaeger as jaeger;
pub use polkadot_overseer::{self as overseer, ActiveLeavesUpdate, DummySubsystem, OverseerSignal};
pub use polkadot_overseer::{
self as overseer, ActiveLeavesUpdate, OverseerConnector, OverseerSignal,
};
pub use polkadot_node_subsystem_types::{
errors::{self, *},