mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-20 00:01:03 +00:00
remove connected disconnected state only (#3868)
* remove connected disconnected state from overseer * foo * split new partial * fix * refactor init code to not require a `OverseerHandle` when we don't have an overseer * intermediate * fixins * X * fixup * foo * fixup * docs * conditional * Update node/service/src/lib.rs * review by ladi
This commit is contained in:
committed by
GitHub
parent
63a520b056
commit
5f637c510e
@@ -0,0 +1,54 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::{AllMessages, OverseerSignal};
|
||||
use polkadot_node_subsystem_types::errors::SubsystemError;
|
||||
use polkadot_overseer_gen::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext};
|
||||
|
||||
/// A dummy subsystem that implements [`Subsystem`] for all
|
||||
/// types of messages. Used for tests or as a placeholder.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct DummySubsystem;
|
||||
|
||||
impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
|
||||
where
|
||||
Context: SubsystemContext<
|
||||
Signal = OverseerSignal,
|
||||
Error = SubsystemError,
|
||||
AllMessages = AllMessages,
|
||||
>,
|
||||
{
|
||||
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
|
||||
let future = Box::pin(async move {
|
||||
loop {
|
||||
match ctx.recv().await {
|
||||
Err(_) => return Ok(()),
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
|
||||
Ok(overseer_msg) => {
|
||||
tracing::debug!(
|
||||
target: "dummy-subsystem",
|
||||
"Discarding a message sent from overseer {:?}",
|
||||
overseer_msg
|
||||
);
|
||||
continue
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
SpawnedSubsystem { name: "dummy-subsystem", future }
|
||||
}
|
||||
}
|
||||
@@ -70,7 +70,6 @@ use std::{
|
||||
|
||||
use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
|
||||
use lru::LruCache;
|
||||
use parking_lot::RwLock;
|
||||
|
||||
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
|
||||
use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost};
|
||||
@@ -91,12 +90,17 @@ pub use polkadot_node_subsystem_types::{
|
||||
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
|
||||
};
|
||||
|
||||
/// Test helper supplements.
|
||||
pub mod dummy;
|
||||
pub use self::dummy::DummySubsystem;
|
||||
|
||||
// TODO legacy, to be deleted, left for easier integration
|
||||
// TODO https://github.com/paritytech/polkadot/issues/3427
|
||||
mod subsystems;
|
||||
pub use self::subsystems::{AllSubsystems, DummySubsystem};
|
||||
pub use self::subsystems::AllSubsystems;
|
||||
|
||||
mod metrics;
|
||||
/// Metrics re-exports of `polkadot-metrics`.
|
||||
pub mod metrics;
|
||||
use self::metrics::Metrics;
|
||||
|
||||
use polkadot_node_metrics::{
|
||||
@@ -116,7 +120,7 @@ pub use polkadot_overseer_gen::{
|
||||
|
||||
/// Store 2 days worth of blocks, not accounting for forks,
|
||||
/// in the LRU cache. Assumes a 6-second block time.
|
||||
const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
|
||||
pub const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@@ -142,18 +146,12 @@ where
|
||||
///
|
||||
/// [`Overseer`]: struct.Overseer.html
|
||||
#[derive(Clone)]
|
||||
pub enum Handle {
|
||||
/// Used only at initialization to break the cyclic dependency.
|
||||
// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
|
||||
Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
|
||||
/// A handle to the overseer.
|
||||
Connected(OverseerHandle),
|
||||
}
|
||||
pub struct Handle(pub OverseerHandle);
|
||||
|
||||
impl Handle {
|
||||
/// Create a new disconnected [`Handle`].
|
||||
pub fn new_disconnected() -> Self {
|
||||
Self::Disconnected(Arc::new(RwLock::new(None)))
|
||||
/// Create a new [`Handle`].
|
||||
pub fn new(raw: OverseerHandle) -> Self {
|
||||
Self(raw)
|
||||
}
|
||||
|
||||
/// Inform the `Overseer` that that some block was imported.
|
||||
@@ -202,58 +200,8 @@ impl Handle {
|
||||
|
||||
/// Most basic operation, to stop a server.
|
||||
async fn send_and_log_error(&mut self, event: Event) {
|
||||
self.try_connect();
|
||||
if let Self::Connected(ref mut handle) = self {
|
||||
if handle.send(event).await.is_err() {
|
||||
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
|
||||
}
|
||||
} else {
|
||||
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether the handle is disconnected.
|
||||
pub fn is_disconnected(&self) -> bool {
|
||||
match self {
|
||||
Self::Disconnected(ref x) => x.read().is_none(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Connect this handle and all disconnected clones of it to the overseer.
|
||||
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
|
||||
match self {
|
||||
Self::Disconnected(ref mut x) => {
|
||||
let mut maybe_handle = x.write();
|
||||
if maybe_handle.is_none() {
|
||||
tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
|
||||
*maybe_handle = Some(handle);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Attempting to connect a clone of a connected Handle",
|
||||
);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Attempting to connect an already connected Handle",
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
|
||||
/// after calling `connect_to_overseer` on `self` or a clone of `self`.
|
||||
fn try_connect(&mut self) {
|
||||
if let Self::Disconnected(ref mut x) = self {
|
||||
let guard = x.write();
|
||||
if let Some(ref h) = *guard {
|
||||
let handle = h.clone();
|
||||
drop(guard);
|
||||
*self = Self::Connected(handle);
|
||||
}
|
||||
if self.0.send(event).await.is_err() {
|
||||
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -491,12 +439,13 @@ where
|
||||
/// # use polkadot_primitives::v1::Hash;
|
||||
/// # use polkadot_overseer::{
|
||||
/// # self as overseer,
|
||||
/// # Overseer,
|
||||
/// # OverseerSignal,
|
||||
/// # OverseerConnector,
|
||||
/// # SubsystemSender as _,
|
||||
/// # AllMessages,
|
||||
/// # AllSubsystems,
|
||||
/// # HeadSupportsParachains,
|
||||
/// # Overseer,
|
||||
/// # SubsystemError,
|
||||
/// # gen::{
|
||||
/// # SubsystemContext,
|
||||
@@ -550,6 +499,7 @@ where
|
||||
/// None,
|
||||
/// AlwaysSupportsParachains,
|
||||
/// spawner,
|
||||
/// OverseerConnector::default(),
|
||||
/// ).unwrap();
|
||||
///
|
||||
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
|
||||
@@ -616,6 +566,7 @@ where
|
||||
prometheus_registry: Option<&prometheus::Registry>,
|
||||
supports_parachains: SupportsParachains,
|
||||
s: S,
|
||||
connector: OverseerConnector,
|
||||
) -> SubsystemResult<(Self, OverseerHandle)>
|
||||
where
|
||||
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
|
||||
@@ -680,7 +631,7 @@ where
|
||||
.supports_parachains(supports_parachains)
|
||||
.metrics(metrics.clone())
|
||||
.spawner(s)
|
||||
.build()?;
|
||||
.build_with_connector(connector)?;
|
||||
|
||||
// spawn the metrics metronome task
|
||||
{
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
//! Prometheus metrics related to the overseer and its channels.
|
||||
|
||||
use super::*;
|
||||
use polkadot_node_metrics::metrics::{self, prometheus};
|
||||
pub use polkadot_node_metrics::metrics::{self, prometheus, Metrics as MetricsTrait};
|
||||
|
||||
#[cfg(feature = "memory-stats")]
|
||||
use polkadot_node_metrics::MemoryAllocationSnapshot;
|
||||
@@ -117,7 +117,7 @@ impl Metrics {
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
impl MetricsTrait for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
|
||||
let metrics = MetricsInner {
|
||||
activated_heads_total: prometheus::register(
|
||||
|
||||
@@ -19,47 +19,9 @@
|
||||
//! In the future, everything should be set up using the generated
|
||||
//! overseer builder pattern instead.
|
||||
|
||||
use crate::{AllMessages, OverseerSignal};
|
||||
use polkadot_node_subsystem_types::errors::SubsystemError;
|
||||
use crate::dummy::DummySubsystem;
|
||||
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
|
||||
use polkadot_overseer_gen::{
|
||||
FromOverseer, MapSubsystem, SpawnedSubsystem, Subsystem, SubsystemContext,
|
||||
};
|
||||
|
||||
/// A dummy subsystem that implements [`Subsystem`] for all
|
||||
/// types of messages. Used for tests or as a placeholder.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct DummySubsystem;
|
||||
|
||||
impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
|
||||
where
|
||||
Context: SubsystemContext<
|
||||
Signal = OverseerSignal,
|
||||
Error = SubsystemError,
|
||||
AllMessages = AllMessages,
|
||||
>,
|
||||
{
|
||||
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
|
||||
let future = Box::pin(async move {
|
||||
loop {
|
||||
match ctx.recv().await {
|
||||
Err(_) => return Ok(()),
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
|
||||
Ok(overseer_msg) => {
|
||||
tracing::debug!(
|
||||
target: "dummy-subsystem",
|
||||
"Discarding a message sent from overseer {:?}",
|
||||
overseer_msg
|
||||
);
|
||||
continue
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
SpawnedSubsystem { name: "dummy-subsystem", future }
|
||||
}
|
||||
}
|
||||
use polkadot_overseer_gen::MapSubsystem;
|
||||
|
||||
/// This struct is passed as an argument to create a new instance of an [`Overseer`].
|
||||
///
|
||||
|
||||
@@ -32,7 +32,7 @@ use polkadot_primitives::v1::{
|
||||
ValidatorIndex,
|
||||
};
|
||||
|
||||
use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer};
|
||||
use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer, OverseerConnector};
|
||||
use metered_channel as metered;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
@@ -164,9 +164,16 @@ fn overseer_works() {
|
||||
.replace_candidate_validation(move |_| TestSubsystem1(s1_tx))
|
||||
.replace_candidate_backing(move |_| TestSubsystem2(s2_tx));
|
||||
|
||||
let (overseer, handle) =
|
||||
Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -227,9 +234,10 @@ fn overseer_metrics_work() {
|
||||
Some(®istry),
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let mut handle = Handle(handle);
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -280,8 +288,15 @@ fn overseer_ends_on_subsystem_exit() {
|
||||
executor::block_on(async move {
|
||||
let all_subsystems =
|
||||
AllSubsystems::<()>::dummy().replace_candidate_backing(|_| ReturnOnStart);
|
||||
let (overseer, _handle) =
|
||||
Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap();
|
||||
let (overseer, _handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
overseer.run().await.unwrap();
|
||||
})
|
||||
@@ -382,10 +397,16 @@ fn overseer_start_stop_works() {
|
||||
let all_subsystems = AllSubsystems::<()>::dummy()
|
||||
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
|
||||
.replace_candidate_backing(move |_| TestSubsystem6(tx_6));
|
||||
let (overseer, handle) =
|
||||
Overseer::new(vec![first_block], all_subsystems, None, MockSupportsParachains, spawner)
|
||||
.unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![first_block],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -486,9 +507,10 @@ fn overseer_finalize_works() {
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let mut handle = Handle(handle);
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -573,10 +595,16 @@ fn do_not_send_empty_leaves_update_on_block_finalization() {
|
||||
let all_subsystems =
|
||||
AllSubsystems::<()>::dummy().replace_candidate_backing(move |_| TestSubsystem6(tx_5));
|
||||
|
||||
let (overseer, handle) =
|
||||
Overseer::new(Vec::new(), all_subsystems, None, MockSupportsParachains, spawner)
|
||||
.unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let (overseer, handle) = Overseer::new(
|
||||
Vec::new(),
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -849,9 +877,17 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
|
||||
dispute_distribution: subsystem.clone(),
|
||||
chain_selection: subsystem.clone(),
|
||||
};
|
||||
let (overseer, handle) =
|
||||
Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
OverseerConnector::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let mut handle = Handle(handle);
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
Reference in New Issue
Block a user