mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-18 10:41:01 +00:00
enable disputes (#3478)
* initial integration and migration code * fix tests * fix counting test * assume the current version on missing file * use SelectRelayChain * remove duplicate metric * Update node/service/src/lib.rs Co-authored-by: Robert Habermeier <rphmeier@gmail.com> * remove ApprovalCheckingVotingRule * address my concern * never mode for StagnantCheckInterval * REVERTME: some logs * w00t * it's ugly but it works * Revert "REVERTME: some logs" This reverts commit e210505a2e83e31c381394924500b69277bb042e. * it's handle, not handler * fix a few typos Co-authored-by: Robert Habermeier <rphmeier@gmail.com>
This commit is contained in:
@@ -73,6 +73,7 @@ use futures::{
|
||||
Future, FutureExt, StreamExt,
|
||||
};
|
||||
use lru::LruCache;
|
||||
use parking_lot::RwLock;
|
||||
|
||||
use polkadot_primitives::v1::{Block, BlockId,BlockNumber, Hash, ParachainHost};
|
||||
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
|
||||
@@ -159,13 +160,24 @@ impl<Client> HeadSupportsParachains for Arc<Client> where
|
||||
}
|
||||
|
||||
|
||||
/// A handler used to communicate with the [`Overseer`].
|
||||
/// A handle used to communicate with the [`Overseer`].
|
||||
///
|
||||
/// [`Overseer`]: struct.Overseer.html
|
||||
#[derive(Clone)]
|
||||
pub struct Handle(pub OverseerHandle);
|
||||
pub enum Handle {
|
||||
/// Used only at initialization to break the cyclic dependency.
|
||||
// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
|
||||
Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
|
||||
/// A handle to the overseer.
|
||||
Connected(OverseerHandle),
|
||||
}
|
||||
|
||||
impl Handle {
|
||||
/// Create a new disconnected [`Handle`].
|
||||
pub fn new_disconnected() -> Self {
|
||||
Self::Disconnected(Arc::new(RwLock::new(None)))
|
||||
}
|
||||
|
||||
/// Inform the `Overseer` that that some block was imported.
|
||||
pub async fn block_imported(&mut self, block: BlockInfo) {
|
||||
self.send_and_log_error(Event::BlockImported(block)).await
|
||||
@@ -207,25 +219,59 @@ impl Handle {
|
||||
|
||||
/// Most basic operation, to stop a server.
|
||||
async fn send_and_log_error(&mut self, event: Event) {
|
||||
if self.0.send(event).await.is_err() {
|
||||
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
|
||||
self.try_connect();
|
||||
if let Self::Connected(ref mut handle) = self {
|
||||
if handle.send(event).await.is_err() {
|
||||
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
|
||||
}
|
||||
} else {
|
||||
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether the overseer handler is connected to an overseer.
|
||||
pub fn is_connected(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
/// Whether the handler is disconnected.
|
||||
/// Whether the handle is disconnected.
|
||||
pub fn is_disconnected(&self) -> bool {
|
||||
false
|
||||
match self {
|
||||
Self::Disconnected(ref x) => x.read().is_none(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Using this handler, connect another handler to the same
|
||||
/// overseer, if any.
|
||||
pub fn connect_other(&self, other: &mut Handle) {
|
||||
*other = self.clone();
|
||||
/// Connect this handle and all disconnected clones of it to the overseer.
|
||||
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
|
||||
match self {
|
||||
Self::Disconnected(ref mut x) => {
|
||||
let mut maybe_handle = x.write();
|
||||
if maybe_handle.is_none() {
|
||||
tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
|
||||
*maybe_handle = Some(handle);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Attempting to connect a clone of a connected Handle",
|
||||
);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Attempting to connect an already connected Handle",
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
|
||||
/// after calling `connect_to_overseer` on `self` or a clone of `self`.
|
||||
fn try_connect(&mut self) {
|
||||
if let Self::Disconnected(ref mut x) = self {
|
||||
let guard = x.write();
|
||||
if let Some(ref h) = *guard {
|
||||
let handle = h.clone();
|
||||
drop(guard);
|
||||
*self = Self::Connected(handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,7 +347,7 @@ pub enum ExternalRequest {
|
||||
/// import and finality notifications into the [`OverseerHandle`].
|
||||
pub async fn forward_events<P: BlockchainEvents<Block>>(
|
||||
client: Arc<P>,
|
||||
mut handler: Handle,
|
||||
mut handle: Handle,
|
||||
) {
|
||||
let mut finality = client.finality_notification_stream();
|
||||
let mut imports = client.import_notification_stream();
|
||||
@@ -311,7 +357,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
|
||||
f = finality.next() => {
|
||||
match f {
|
||||
Some(block) => {
|
||||
handler.block_finalized(block.into()).await;
|
||||
handle.block_finalized(block.into()).await;
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
@@ -319,7 +365,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
|
||||
i = imports.next() => {
|
||||
match i {
|
||||
Some(block) => {
|
||||
handler.block_imported(block.into()).await;
|
||||
handle.block_imported(block.into()).await;
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
@@ -338,7 +384,6 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
|
||||
network=NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
|
||||
)]
|
||||
pub struct Overseer<SupportsParachains> {
|
||||
|
||||
#[subsystem(no_dispatch, CandidateValidationMessage)]
|
||||
candidate_validation: CandidateValidation,
|
||||
|
||||
@@ -390,16 +435,16 @@ pub struct Overseer<SupportsParachains> {
|
||||
#[subsystem(no_dispatch, GossipSupportMessage)]
|
||||
gossip_support: GossipSupport,
|
||||
|
||||
#[subsystem(no_dispatch, wip, DisputeCoordinatorMessage)]
|
||||
dipute_coordinator: DisputeCoordinator,
|
||||
#[subsystem(no_dispatch, DisputeCoordinatorMessage)]
|
||||
dispute_coordinator: DisputeCoordinator,
|
||||
|
||||
#[subsystem(no_dispatch, wip, DisputeParticipationMessage)]
|
||||
#[subsystem(no_dispatch, DisputeParticipationMessage)]
|
||||
dispute_participation: DisputeParticipation,
|
||||
|
||||
#[subsystem(no_dispatch, wip, DisputeDistributionMessage)]
|
||||
dipute_distribution: DisputeDistribution,
|
||||
#[subsystem(no_dispatch, DisputeDistributionMessage)]
|
||||
dispute_distribution: DisputeDistribution,
|
||||
|
||||
#[subsystem(no_dispatch, wip, ChainSelectionMessage)]
|
||||
#[subsystem(no_dispatch, ChainSelectionMessage)]
|
||||
chain_selection: ChainSelection,
|
||||
|
||||
/// External listeners waiting for a hash to be in the active-leave set.
|
||||
@@ -436,7 +481,7 @@ where
|
||||
/// This returns the overseer along with an [`OverseerHandle`] which can
|
||||
/// be used to send messages from external parts of the codebase.
|
||||
///
|
||||
/// The [`OverseerHandler`] returned from this function is connected to
|
||||
/// The [`OverseerHandle`] returned from this function is connected to
|
||||
/// the returned [`Overseer`].
|
||||
///
|
||||
/// ```text
|
||||
@@ -527,7 +572,7 @@ where
|
||||
/// let spawner = sp_core::testing::TaskExecutor::new();
|
||||
/// let all_subsystems = AllSubsystems::<()>::dummy()
|
||||
/// .replace_candidate_validation(ValidationSubsystem);
|
||||
/// let (overseer, _handler) = Overseer::new(
|
||||
/// let (overseer, _handle) = Overseer::new(
|
||||
/// vec![],
|
||||
/// all_subsystems,
|
||||
/// None,
|
||||
@@ -549,13 +594,13 @@ where
|
||||
/// # });
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn new<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>(
|
||||
pub fn new<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>(
|
||||
leaves: impl IntoIterator<Item = BlockInfo>,
|
||||
all_subsystems: AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>,
|
||||
all_subsystems: AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>,
|
||||
prometheus_registry: Option<&prometheus::Registry>,
|
||||
supports_parachains: SupportsParachains,
|
||||
s: S,
|
||||
) -> SubsystemResult<(Self, Handle)>
|
||||
) -> SubsystemResult<(Self, OverseerHandle)>
|
||||
where
|
||||
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
|
||||
CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>, SubsystemError> + Send,
|
||||
@@ -574,11 +619,15 @@ where
|
||||
ApD: Subsystem<OverseerSubsystemContext<ApprovalDistributionMessage>, SubsystemError> + Send,
|
||||
ApV: Subsystem<OverseerSubsystemContext<ApprovalVotingMessage>, SubsystemError> + Send,
|
||||
GS: Subsystem<OverseerSubsystemContext<GossipSupportMessage>, SubsystemError> + Send,
|
||||
DC: Subsystem<OverseerSubsystemContext<DisputeCoordinatorMessage>, SubsystemError> + Send,
|
||||
DP: Subsystem<OverseerSubsystemContext<DisputeParticipationMessage>, SubsystemError> + Send,
|
||||
DD: Subsystem<OverseerSubsystemContext<DisputeDistributionMessage>, SubsystemError> + Send,
|
||||
CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send,
|
||||
S: SpawnNamed,
|
||||
{
|
||||
let metrics: Metrics = <Metrics as MetricsTrait>::register(prometheus_registry)?;
|
||||
|
||||
let (mut overseer, handler) = Self::builder()
|
||||
let (mut overseer, handle) = Self::builder()
|
||||
.candidate_validation(all_subsystems.candidate_validation)
|
||||
.candidate_backing(all_subsystems.candidate_backing)
|
||||
.statement_distribution(all_subsystems.statement_distribution)
|
||||
@@ -596,6 +645,10 @@ where
|
||||
.approval_distribution(all_subsystems.approval_distribution)
|
||||
.approval_voting(all_subsystems.approval_voting)
|
||||
.gossip_support(all_subsystems.gossip_support)
|
||||
.dispute_coordinator(all_subsystems.dispute_coordinator)
|
||||
.dispute_participation(all_subsystems.dispute_participation)
|
||||
.dispute_distribution(all_subsystems.dispute_distribution)
|
||||
.chain_selection(all_subsystems.chain_selection)
|
||||
.leaves(Vec::from_iter(
|
||||
leaves.into_iter().map(|BlockInfo { hash, parent_hash: _, number }| (hash, number))
|
||||
))
|
||||
@@ -647,7 +700,7 @@ where
|
||||
overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
|
||||
}
|
||||
|
||||
Ok((overseer, Handle(handler)))
|
||||
Ok((overseer, handle))
|
||||
}
|
||||
|
||||
/// Stop the overseer.
|
||||
|
||||
@@ -77,7 +77,7 @@ where
|
||||
pub struct AllSubsystems<
|
||||
CV = (), CB = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (),
|
||||
RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (),
|
||||
GS = (),
|
||||
GS = (), DC = (), DP = (), DD = (), CS = (),
|
||||
> {
|
||||
/// A candidate validation subsystem.
|
||||
pub candidate_validation: CV,
|
||||
@@ -113,10 +113,18 @@ pub struct AllSubsystems<
|
||||
pub approval_voting: ApV,
|
||||
/// A Connection Request Issuer subsystem.
|
||||
pub gossip_support: GS,
|
||||
/// A Dispute Coordinator subsystem.
|
||||
pub dispute_coordinator: DC,
|
||||
/// A Dispute Participation subsystem.
|
||||
pub dispute_participation: DP,
|
||||
/// A Dispute Distribution subsystem.
|
||||
pub dispute_distribution: DD,
|
||||
/// A Chain Selection subsystem.
|
||||
pub chain_selection: CS,
|
||||
}
|
||||
|
||||
impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>
|
||||
AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>
|
||||
{
|
||||
/// Create a new instance of [`AllSubsystems`].
|
||||
///
|
||||
@@ -148,6 +156,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
> {
|
||||
AllSubsystems {
|
||||
candidate_validation: DummySubsystem,
|
||||
@@ -167,11 +179,15 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
approval_distribution: DummySubsystem,
|
||||
approval_voting: DummySubsystem,
|
||||
gossip_support: DummySubsystem,
|
||||
dispute_coordinator: DummySubsystem,
|
||||
dispute_participation: DummySubsystem,
|
||||
dispute_distribution: DummySubsystem,
|
||||
chain_selection: DummySubsystem,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reference every individual subsystem.
|
||||
pub fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> {
|
||||
pub fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS, &'_ DC, &'_ DP, &'_ DD, &'_ CS> {
|
||||
AllSubsystems {
|
||||
candidate_validation: &self.candidate_validation,
|
||||
candidate_backing: &self.candidate_backing,
|
||||
@@ -190,6 +206,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
approval_distribution: &self.approval_distribution,
|
||||
approval_voting: &self.approval_voting,
|
||||
gossip_support: &self.gossip_support,
|
||||
dispute_coordinator: &self.dispute_coordinator,
|
||||
dispute_participation: &self.dispute_participation,
|
||||
dispute_distribution: &self.dispute_distribution,
|
||||
chain_selection: &self.chain_selection,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -213,6 +233,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
<Mapper as MapSubsystem<ApD>>::Output,
|
||||
<Mapper as MapSubsystem<ApV>>::Output,
|
||||
<Mapper as MapSubsystem<GS>>::Output,
|
||||
<Mapper as MapSubsystem<DC>>::Output,
|
||||
<Mapper as MapSubsystem<DP>>::Output,
|
||||
<Mapper as MapSubsystem<DD>>::Output,
|
||||
<Mapper as MapSubsystem<CS>>::Output,
|
||||
>
|
||||
where
|
||||
Mapper: MapSubsystem<CV>,
|
||||
@@ -232,6 +256,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
Mapper: MapSubsystem<ApD>,
|
||||
Mapper: MapSubsystem<ApV>,
|
||||
Mapper: MapSubsystem<GS>,
|
||||
Mapper: MapSubsystem<DC>,
|
||||
Mapper: MapSubsystem<DP>,
|
||||
Mapper: MapSubsystem<DD>,
|
||||
Mapper: MapSubsystem<CS>,
|
||||
{
|
||||
AllSubsystems {
|
||||
candidate_validation: <Mapper as MapSubsystem<CV>>::map_subsystem(&mapper, self.candidate_validation),
|
||||
@@ -251,6 +279,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
approval_distribution: <Mapper as MapSubsystem<ApD>>::map_subsystem(&mapper, self.approval_distribution),
|
||||
approval_voting: <Mapper as MapSubsystem<ApV>>::map_subsystem(&mapper, self.approval_voting),
|
||||
gossip_support: <Mapper as MapSubsystem<GS>>::map_subsystem(&mapper, self.gossip_support),
|
||||
dispute_coordinator: <Mapper as MapSubsystem<DC>>::map_subsystem(&mapper, self.dispute_coordinator),
|
||||
dispute_participation: <Mapper as MapSubsystem<DP>>::map_subsystem(&mapper, self.dispute_participation),
|
||||
dispute_distribution: <Mapper as MapSubsystem<DD>>::map_subsystem(&mapper, self.dispute_distribution),
|
||||
chain_selection: <Mapper as MapSubsystem<CS>>::map_subsystem(&mapper, self.chain_selection),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::atomic;
|
||||
use std::collections::HashMap;
|
||||
use std::task::{Poll};
|
||||
use futures::{executor, pin_mut, select, FutureExt, pending, poll, stream};
|
||||
use futures::channel::mpsc;
|
||||
|
||||
use polkadot_primitives::v1::{CollatorPair, CandidateHash};
|
||||
use polkadot_node_primitives::{CollationResult, CollationGenerationConfig, PoV, BlockData};
|
||||
@@ -166,13 +167,14 @@ fn overseer_works() {
|
||||
.replace_candidate_validation(TestSubsystem1(s1_tx))
|
||||
.replace_candidate_backing(TestSubsystem2(s2_tx));
|
||||
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -188,7 +190,7 @@ fn overseer_works() {
|
||||
Some(msg) => {
|
||||
s1_results.push(msg);
|
||||
if s1_results.len() == 10 {
|
||||
handler.stop().await;
|
||||
handle.stop().await;
|
||||
}
|
||||
}
|
||||
None => break,
|
||||
@@ -236,21 +238,22 @@ fn overseer_metrics_work() {
|
||||
|
||||
let all_subsystems = AllSubsystems::<()>::dummy();
|
||||
let registry = prometheus::Registry::new();
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![first_block],
|
||||
all_subsystems,
|
||||
Some(®istry),
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
handler.block_imported(second_block).await;
|
||||
handler.block_imported(third_block).await;
|
||||
handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
|
||||
handler.stop().await;
|
||||
handle.block_imported(second_block).await;
|
||||
handle.block_imported(third_block).await;
|
||||
handle.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
|
||||
handle.stop().await;
|
||||
|
||||
select! {
|
||||
res = overseer_fut => {
|
||||
@@ -398,13 +401,14 @@ fn overseer_start_stop_works() {
|
||||
let all_subsystems = AllSubsystems::<()>::dummy()
|
||||
.replace_candidate_validation(TestSubsystem5(tx_5))
|
||||
.replace_candidate_backing(TestSubsystem6(tx_6));
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![first_block],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -412,8 +416,8 @@ fn overseer_start_stop_works() {
|
||||
let mut ss5_results = Vec::new();
|
||||
let mut ss6_results = Vec::new();
|
||||
|
||||
handler.block_imported(second_block).await;
|
||||
handler.block_imported(third_block).await;
|
||||
handle.block_imported(second_block).await;
|
||||
handle.block_imported(third_block).await;
|
||||
|
||||
let expected_heartbeats = vec![
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
@@ -463,7 +467,7 @@ fn overseer_start_stop_works() {
|
||||
|
||||
if ss5_results.len() == expected_heartbeats.len() &&
|
||||
ss6_results.len() == expected_heartbeats.len() {
|
||||
handler.stop().await;
|
||||
handle.stop().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -507,13 +511,14 @@ fn overseer_finalize_works() {
|
||||
.replace_candidate_backing(TestSubsystem6(tx_6));
|
||||
|
||||
// start with two forks of different height.
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![first_block, second_block],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
@@ -522,7 +527,7 @@ fn overseer_finalize_works() {
|
||||
let mut ss6_results = Vec::new();
|
||||
|
||||
// this should stop work on both forks we started with earlier.
|
||||
handler.block_finalized(third_block).await;
|
||||
handle.block_finalized(third_block).await;
|
||||
|
||||
let expected_heartbeats = vec![
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
@@ -569,7 +574,7 @@ fn overseer_finalize_works() {
|
||||
}
|
||||
|
||||
if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() {
|
||||
handler.stop().await;
|
||||
handle.stop().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -607,21 +612,22 @@ fn do_not_send_empty_leaves_update_on_block_finalization() {
|
||||
let all_subsystems = AllSubsystems::<()>::dummy()
|
||||
.replace_candidate_backing(TestSubsystem6(tx_5));
|
||||
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
let (overseer, handle) = Overseer::new(
|
||||
Vec::new(),
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
let mut ss5_results = Vec::new();
|
||||
|
||||
handler.block_finalized(finalized_block.clone()).await;
|
||||
handler.block_imported(imported_block.clone()).await;
|
||||
handle.block_finalized(finalized_block.clone()).await;
|
||||
handle.block_imported(imported_block.clone()).await;
|
||||
|
||||
let expected_heartbeats = vec![
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
@@ -652,7 +658,7 @@ fn do_not_send_empty_leaves_update_on_block_finalization() {
|
||||
}
|
||||
|
||||
if ss5_results.len() == expected_heartbeats.len() {
|
||||
handler.stop().await;
|
||||
handle.stop().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -809,10 +815,35 @@ fn test_approval_voting_msg() -> ApprovalVotingMessage {
|
||||
ApprovalVotingMessage::ApprovedAncestor(Default::default(), 0, sender)
|
||||
}
|
||||
|
||||
fn test_dispute_coordinator_msg() -> DisputeCoordinatorMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
DisputeCoordinatorMessage::RecentDisputes(sender)
|
||||
}
|
||||
|
||||
fn test_dispute_participation_msg() -> DisputeParticipationMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
DisputeParticipationMessage::Participate {
|
||||
candidate_hash: Default::default(),
|
||||
candidate_receipt: Default::default(),
|
||||
session: 0,
|
||||
n_validators: 0,
|
||||
report_availability: sender,
|
||||
}
|
||||
}
|
||||
|
||||
fn test_dispute_distribution_msg() -> DisputeDistributionMessage {
|
||||
let (_, receiver) = mpsc::channel(1);
|
||||
DisputeDistributionMessage::DisputeSendingReceiver(receiver)
|
||||
}
|
||||
|
||||
fn test_chain_selection_msg() -> ChainSelectionMessage {
|
||||
ChainSelectionMessage::Approved(Default::default())
|
||||
}
|
||||
|
||||
// Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly.
|
||||
#[test]
|
||||
fn overseer_all_subsystems_receive_signals_and_messages() {
|
||||
const NUM_SUBSYSTEMS: usize = 17;
|
||||
const NUM_SUBSYSTEMS: usize = 21;
|
||||
// -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution
|
||||
const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 3;
|
||||
|
||||
@@ -846,20 +877,25 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
|
||||
approval_distribution: subsystem.clone(),
|
||||
approval_voting: subsystem.clone(),
|
||||
gossip_support: subsystem.clone(),
|
||||
dispute_coordinator: subsystem.clone(),
|
||||
dispute_participation: subsystem.clone(),
|
||||
dispute_distribution: subsystem.clone(),
|
||||
chain_selection: subsystem.clone(),
|
||||
};
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
let (overseer, handle) = Overseer::new(
|
||||
vec![],
|
||||
all_subsystems,
|
||||
None,
|
||||
MockSupportsParachains,
|
||||
spawner,
|
||||
).unwrap();
|
||||
let mut handle = Handle::Connected(handle);
|
||||
let overseer_fut = overseer.run().fuse();
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
// send a signal to each subsystem
|
||||
handler.block_imported(BlockInfo {
|
||||
handle.block_imported(BlockInfo {
|
||||
hash: Default::default(),
|
||||
parent_hash: Default::default(),
|
||||
number: Default::default(),
|
||||
@@ -867,22 +903,26 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
|
||||
|
||||
// send a msg to each subsystem
|
||||
// except for BitfieldSigning and GossipSupport as the messages are not instantiable
|
||||
handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
|
||||
// handler.send_msg_anon(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
|
||||
// handler.send_msg_anon(AllMessages::GossipSupport(test_bitfield_signing_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::Provisioner(test_provisioner_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
|
||||
handler.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
|
||||
// handle.send_msg_anon(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
|
||||
// handle.send_msg_anon(AllMessages::GossipSupport(test_bitfield_signing_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::Provisioner(test_provisioner_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::DisputeCoordinator(test_dispute_coordinator_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::DisputeParticipation(test_dispute_participation_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::DisputeDistribution(test_dispute_distribution_msg())).await;
|
||||
handle.send_msg_anon(AllMessages::ChainSelection(test_chain_selection_msg())).await;
|
||||
|
||||
// Wait until all subsystems have received. Otherwise the messages might race against
|
||||
// the conclude signal.
|
||||
@@ -903,7 +943,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
|
||||
}
|
||||
|
||||
// send a stop signal to each subsystems
|
||||
handler.stop().await;
|
||||
handle.stop().await;
|
||||
|
||||
let res = overseer_fut.await;
|
||||
assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
|
||||
@@ -933,6 +973,10 @@ fn context_holds_onto_message_until_enough_signals_received() {
|
||||
let (approval_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (approval_voting_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (gossip_support_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (dispute_coordinator_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (dispute_participation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (dispute_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (chain_selection_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
|
||||
let (candidate_validation_unbounded_tx, _) = metered::unbounded();
|
||||
let (candidate_backing_unbounded_tx, _) = metered::unbounded();
|
||||
@@ -951,6 +995,10 @@ fn context_holds_onto_message_until_enough_signals_received() {
|
||||
let (approval_distribution_unbounded_tx, _) = metered::unbounded();
|
||||
let (approval_voting_unbounded_tx, _) = metered::unbounded();
|
||||
let (gossip_support_unbounded_tx, _) = metered::unbounded();
|
||||
let (dispute_coordinator_unbounded_tx, _) = metered::unbounded();
|
||||
let (dispute_participation_unbounded_tx, _) = metered::unbounded();
|
||||
let (dispute_distribution_unbounded_tx, _) = metered::unbounded();
|
||||
let (chain_selection_unbounded_tx, _) = metered::unbounded();
|
||||
|
||||
let channels_out = ChannelsOut {
|
||||
candidate_validation: candidate_validation_bounded_tx.clone(),
|
||||
@@ -970,6 +1018,10 @@ fn context_holds_onto_message_until_enough_signals_received() {
|
||||
approval_distribution: approval_distribution_bounded_tx.clone(),
|
||||
approval_voting: approval_voting_bounded_tx.clone(),
|
||||
gossip_support: gossip_support_bounded_tx.clone(),
|
||||
dispute_coordinator: dispute_coordinator_bounded_tx.clone(),
|
||||
dispute_participation: dispute_participation_bounded_tx.clone(),
|
||||
dispute_distribution: dispute_distribution_bounded_tx.clone(),
|
||||
chain_selection: chain_selection_bounded_tx.clone(),
|
||||
|
||||
candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(),
|
||||
candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(),
|
||||
@@ -988,6 +1040,10 @@ fn context_holds_onto_message_until_enough_signals_received() {
|
||||
approval_distribution_unbounded: approval_distribution_unbounded_tx.clone(),
|
||||
approval_voting_unbounded: approval_voting_unbounded_tx.clone(),
|
||||
gossip_support_unbounded: gossip_support_unbounded_tx.clone(),
|
||||
dispute_coordinator_unbounded: dispute_coordinator_unbounded_tx.clone(),
|
||||
dispute_participation_unbounded: dispute_participation_unbounded_tx.clone(),
|
||||
dispute_distribution_unbounded: dispute_distribution_unbounded_tx.clone(),
|
||||
chain_selection_unbounded: chain_selection_unbounded_tx.clone(),
|
||||
};
|
||||
|
||||
let (mut signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY);
|
||||
|
||||
Reference in New Issue
Block a user