mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 08:41:07 +00:00
overseer: AllSubsystems magic and subsystem channel sizes metrics (#2711)
* overseer: AllSubsystems magic and report subsystem channel sizes to prometheus * fix tests
This commit is contained in:
committed by
GitHub
parent
485e406058
commit
2388141b25
+272
-158
@@ -511,62 +511,28 @@ impl<M> OverseenSubsystem<M> {
|
||||
|
||||
/// The `Overseer` itself.
|
||||
pub struct Overseer<S> {
|
||||
/// A candidate validation subsystem.
|
||||
candidate_validation_subsystem: OverseenSubsystem<CandidateValidationMessage>,
|
||||
|
||||
/// A candidate backing subsystem.
|
||||
candidate_backing_subsystem: OverseenSubsystem<CandidateBackingMessage>,
|
||||
|
||||
/// A candidate selection subsystem.
|
||||
candidate_selection_subsystem: OverseenSubsystem<CandidateSelectionMessage>,
|
||||
|
||||
/// A statement distribution subsystem.
|
||||
statement_distribution_subsystem: OverseenSubsystem<StatementDistributionMessage>,
|
||||
|
||||
/// An availability distribution subsystem.
|
||||
availability_distribution_subsystem: OverseenSubsystem<AvailabilityDistributionMessage>,
|
||||
|
||||
/// An availability recovery subsystem.
|
||||
availability_recovery_subsystem: OverseenSubsystem<AvailabilityRecoveryMessage>,
|
||||
|
||||
/// A bitfield signing subsystem.
|
||||
bitfield_signing_subsystem: OverseenSubsystem<BitfieldSigningMessage>,
|
||||
|
||||
/// A bitfield distribution subsystem.
|
||||
bitfield_distribution_subsystem: OverseenSubsystem<BitfieldDistributionMessage>,
|
||||
|
||||
/// A provisioner subsystem.
|
||||
provisioner_subsystem: OverseenSubsystem<ProvisionerMessage>,
|
||||
|
||||
/// A PoV distribution subsystem.
|
||||
pov_distribution_subsystem: OverseenSubsystem<PoVDistributionMessage>,
|
||||
|
||||
/// A runtime API subsystem.
|
||||
runtime_api_subsystem: OverseenSubsystem<RuntimeApiMessage>,
|
||||
|
||||
/// An availability store subsystem.
|
||||
availability_store_subsystem: OverseenSubsystem<AvailabilityStoreMessage>,
|
||||
|
||||
/// A network bridge subsystem.
|
||||
network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>,
|
||||
|
||||
/// A Chain API subsystem.
|
||||
chain_api_subsystem: OverseenSubsystem<ChainApiMessage>,
|
||||
|
||||
/// A Collation Generation subsystem.
|
||||
collation_generation_subsystem: OverseenSubsystem<CollationGenerationMessage>,
|
||||
|
||||
/// A Collator Protocol subsystem.
|
||||
collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>,
|
||||
|
||||
/// An Approval Distribution subsystem.
|
||||
approval_distribution_subsystem: OverseenSubsystem<ApprovalDistributionMessage>,
|
||||
|
||||
/// An Approval Voting subsystem.
|
||||
approval_voting_subsystem: OverseenSubsystem<ApprovalVotingMessage>,
|
||||
|
||||
/// A Gossip Support subsystem.
|
||||
gossip_support_subsystem: OverseenSubsystem<GossipSupportMessage>,
|
||||
/// Handles to all subsystems.
|
||||
subsystems: AllSubsystems<
|
||||
OverseenSubsystem<CandidateValidationMessage>,
|
||||
OverseenSubsystem<CandidateBackingMessage>,
|
||||
OverseenSubsystem<CandidateSelectionMessage>,
|
||||
OverseenSubsystem<StatementDistributionMessage>,
|
||||
OverseenSubsystem<AvailabilityDistributionMessage>,
|
||||
OverseenSubsystem<AvailabilityRecoveryMessage>,
|
||||
OverseenSubsystem<BitfieldSigningMessage>,
|
||||
OverseenSubsystem<BitfieldDistributionMessage>,
|
||||
OverseenSubsystem<ProvisionerMessage>,
|
||||
OverseenSubsystem<PoVDistributionMessage>,
|
||||
OverseenSubsystem<RuntimeApiMessage>,
|
||||
OverseenSubsystem<AvailabilityStoreMessage>,
|
||||
OverseenSubsystem<NetworkBridgeMessage>,
|
||||
OverseenSubsystem<ChainApiMessage>,
|
||||
OverseenSubsystem<CollationGenerationMessage>,
|
||||
OverseenSubsystem<CollatorProtocolMessage>,
|
||||
OverseenSubsystem<ApprovalDistributionMessage>,
|
||||
OverseenSubsystem<ApprovalVotingMessage>,
|
||||
OverseenSubsystem<GossipSupportMessage>,
|
||||
>,
|
||||
|
||||
/// Spawner to spawn tasks to.
|
||||
s: S,
|
||||
@@ -598,6 +564,20 @@ pub struct Overseer<S> {
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
trait MapSubsystem<T> {
|
||||
type Output;
|
||||
|
||||
fn map_subsystem(&self, sub: T) -> Self::Output;
|
||||
}
|
||||
|
||||
impl<F, T, U> MapSubsystem<T> for F where F: Fn(T) -> U {
|
||||
type Output = U;
|
||||
|
||||
fn map_subsystem(&self, sub: T) -> U {
|
||||
(self)(sub)
|
||||
}
|
||||
}
|
||||
|
||||
/// This struct is passed as an argument to create a new instance of an [`Overseer`].
|
||||
///
|
||||
/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows
|
||||
@@ -1241,8 +1221,105 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV,
|
||||
gossip_support,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ CS, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ PoVD, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> {
|
||||
AllSubsystems {
|
||||
candidate_validation: &self.candidate_validation,
|
||||
candidate_backing: &self.candidate_backing,
|
||||
candidate_selection: &self.candidate_selection,
|
||||
statement_distribution: &self.statement_distribution,
|
||||
availability_distribution: &self.availability_distribution,
|
||||
availability_recovery: &self.availability_recovery,
|
||||
bitfield_signing: &self.bitfield_signing,
|
||||
bitfield_distribution: &self.bitfield_distribution,
|
||||
provisioner: &self.provisioner,
|
||||
pov_distribution: &self.pov_distribution,
|
||||
runtime_api: &self.runtime_api,
|
||||
availability_store: &self.availability_store,
|
||||
network_bridge: &self.network_bridge,
|
||||
chain_api: &self.chain_api,
|
||||
collation_generation: &self.collation_generation,
|
||||
collator_protocol: &self.collator_protocol,
|
||||
approval_distribution: &self.approval_distribution,
|
||||
approval_voting: &self.approval_voting,
|
||||
gossip_support: &self.gossip_support,
|
||||
}
|
||||
}
|
||||
|
||||
fn map_subsystems<M>(self, m: M)
|
||||
-> AllSubsystems<
|
||||
<M as MapSubsystem<CV>>::Output,
|
||||
<M as MapSubsystem<CB>>::Output,
|
||||
<M as MapSubsystem<CS>>::Output,
|
||||
<M as MapSubsystem<SD>>::Output,
|
||||
<M as MapSubsystem<AD>>::Output,
|
||||
<M as MapSubsystem<AR>>::Output,
|
||||
<M as MapSubsystem<BS>>::Output,
|
||||
<M as MapSubsystem<BD>>::Output,
|
||||
<M as MapSubsystem<P>>::Output,
|
||||
<M as MapSubsystem<PoVD>>::Output,
|
||||
<M as MapSubsystem<RA>>::Output,
|
||||
<M as MapSubsystem<AS>>::Output,
|
||||
<M as MapSubsystem<NB>>::Output,
|
||||
<M as MapSubsystem<CA>>::Output,
|
||||
<M as MapSubsystem<CG>>::Output,
|
||||
<M as MapSubsystem<CP>>::Output,
|
||||
<M as MapSubsystem<ApD>>::Output,
|
||||
<M as MapSubsystem<ApV>>::Output,
|
||||
<M as MapSubsystem<GS>>::Output,
|
||||
>
|
||||
where
|
||||
M: MapSubsystem<CV>,
|
||||
M: MapSubsystem<CB>,
|
||||
M: MapSubsystem<CS>,
|
||||
M: MapSubsystem<SD>,
|
||||
M: MapSubsystem<AD>,
|
||||
M: MapSubsystem<AR>,
|
||||
M: MapSubsystem<BS>,
|
||||
M: MapSubsystem<BD>,
|
||||
M: MapSubsystem<P>,
|
||||
M: MapSubsystem<PoVD>,
|
||||
M: MapSubsystem<RA>,
|
||||
M: MapSubsystem<AS>,
|
||||
M: MapSubsystem<NB>,
|
||||
M: MapSubsystem<CA>,
|
||||
M: MapSubsystem<CG>,
|
||||
M: MapSubsystem<CP>,
|
||||
M: MapSubsystem<ApD>,
|
||||
M: MapSubsystem<ApV>,
|
||||
M: MapSubsystem<GS>,
|
||||
{
|
||||
AllSubsystems {
|
||||
candidate_validation: m.map_subsystem(self.candidate_validation),
|
||||
candidate_backing: m.map_subsystem(self.candidate_backing),
|
||||
candidate_selection: m.map_subsystem(self.candidate_selection),
|
||||
statement_distribution: m.map_subsystem(self.statement_distribution),
|
||||
availability_distribution: m.map_subsystem(self.availability_distribution),
|
||||
availability_recovery: m.map_subsystem(self.availability_recovery),
|
||||
bitfield_signing: m.map_subsystem(self.bitfield_signing),
|
||||
bitfield_distribution: m.map_subsystem(self.bitfield_distribution),
|
||||
provisioner: m.map_subsystem(self.provisioner),
|
||||
pov_distribution: m.map_subsystem(self.pov_distribution),
|
||||
runtime_api: m.map_subsystem(self.runtime_api),
|
||||
availability_store: m.map_subsystem(self.availability_store),
|
||||
network_bridge: m.map_subsystem(self.network_bridge),
|
||||
chain_api: m.map_subsystem(self.chain_api),
|
||||
collation_generation: m.map_subsystem(self.collation_generation),
|
||||
collator_protocol: m.map_subsystem(self.collator_protocol),
|
||||
approval_distribution: m.map_subsystem(self.approval_distribution),
|
||||
approval_voting: m.map_subsystem(self.approval_voting),
|
||||
gossip_support: m.map_subsystem(self.gossip_support),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type AllSubsystemsSame<T> = AllSubsystems<
|
||||
T, T, T, T, T,
|
||||
T, T, T, T, T,
|
||||
T, T, T, T, T,
|
||||
T, T, T, T,
|
||||
>;
|
||||
|
||||
/// Overseer Prometheus metrics.
|
||||
#[derive(Clone)]
|
||||
struct MetricsInner {
|
||||
@@ -1251,7 +1328,7 @@ struct MetricsInner {
|
||||
messages_relayed_total: prometheus::Counter<prometheus::U64>,
|
||||
message_relay_timings: prometheus::Histogram,
|
||||
to_overseer_channel_queue_size: prometheus::Gauge<prometheus::U64>,
|
||||
from_overseer_channel_queue_size: prometheus::Gauge<prometheus::U64>,
|
||||
from_overseer_channel_queue_size: prometheus::GaugeVec<prometheus::U64>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
@@ -1281,9 +1358,17 @@ impl Metrics {
|
||||
self.0.as_ref().map(|metrics| metrics.message_relay_timings.start_timer())
|
||||
}
|
||||
|
||||
fn channel_fill_level_snapshot(&self, from_overseer: usize, to_overseer: usize) {
|
||||
fn channel_fill_level_snapshot(
|
||||
&self,
|
||||
from_overseer: AllSubsystemsSame<(&'static str, usize)>,
|
||||
to_overseer: usize,
|
||||
) {
|
||||
self.0.as_ref().map(|metrics| {
|
||||
from_overseer.map_subsystems(|(name, queue_size): (_, usize)| {
|
||||
metrics.from_overseer_channel_queue_size.with_label_values(&[name]).set(queue_size as u64);
|
||||
})
|
||||
});
|
||||
self.0.as_ref().map(|metrics| metrics.to_overseer_channel_queue_size.set(to_overseer as u64));
|
||||
self.0.as_ref().map(|metrics| metrics.from_overseer_channel_queue_size.set(from_overseer as u64));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1334,11 +1419,14 @@ impl metrics::Metrics for Metrics {
|
||||
registry,
|
||||
)?,
|
||||
from_overseer_channel_queue_size: prometheus::register(
|
||||
prometheus::Gauge::<prometheus::U64>::with_opts(
|
||||
prometheus::GaugeVec::<prometheus::U64>::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_from_overseer_channel_queue_size",
|
||||
"Number of elements sitting in the channel waiting to be processed.",
|
||||
"Number of elements sitting in the channel from the overseer waiting to be processed.",
|
||||
),
|
||||
&[
|
||||
"subsystem_name",
|
||||
],
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
@@ -1346,7 +1434,7 @@ impl metrics::Metrics for Metrics {
|
||||
prometheus::Gauge::<prometheus::U64>::with_opts(
|
||||
prometheus::Opts::new(
|
||||
"parachain_to_overseer_channel_queue_size",
|
||||
"Number of elements sitting in the channel waiting to be processed.",
|
||||
"Number of elements sitting in the channel to the overseer waiting to be processed.",
|
||||
),
|
||||
)?,
|
||||
registry,
|
||||
@@ -1486,21 +1574,6 @@ where
|
||||
|
||||
let (to_overseer_tx, to_overseer_rx) = metered::unbounded("to_overseer");
|
||||
|
||||
{
|
||||
let meter_from_overseer = events_rx.meter().clone();
|
||||
let meter_to_overseer = to_overseer_rx.meter().clone();
|
||||
let metronome_metrics = metrics.clone();
|
||||
let metronome = Metronome::new(std::time::Duration::from_millis(950))
|
||||
.for_each(move |_| {
|
||||
metronome_metrics.channel_fill_level_snapshot(meter_from_overseer.queue_count(), meter_to_overseer.queue_count());
|
||||
|
||||
async move {
|
||||
()
|
||||
}
|
||||
});
|
||||
s.spawn("metrics_metronome", Box::pin(metronome));
|
||||
}
|
||||
|
||||
let mut running_subsystems = FuturesUnordered::new();
|
||||
|
||||
let mut seed = 0x533d; // arbitrary
|
||||
@@ -1704,26 +1777,68 @@ where
|
||||
let active_leaves = HashMap::new();
|
||||
let activation_external_listeners = HashMap::new();
|
||||
|
||||
let subsystems = AllSubsystems {
|
||||
candidate_validation: candidate_validation_subsystem,
|
||||
candidate_backing: candidate_backing_subsystem,
|
||||
candidate_selection: candidate_selection_subsystem,
|
||||
statement_distribution: statement_distribution_subsystem,
|
||||
availability_distribution: availability_distribution_subsystem,
|
||||
availability_recovery: availability_recovery_subsystem,
|
||||
bitfield_signing: bitfield_signing_subsystem,
|
||||
bitfield_distribution: bitfield_distribution_subsystem,
|
||||
provisioner: provisioner_subsystem,
|
||||
pov_distribution: pov_distribution_subsystem,
|
||||
runtime_api: runtime_api_subsystem,
|
||||
availability_store: availability_store_subsystem,
|
||||
network_bridge: network_bridge_subsystem,
|
||||
chain_api: chain_api_subsystem,
|
||||
collation_generation: collation_generation_subsystem,
|
||||
collator_protocol: collator_protocol_subsystem,
|
||||
approval_distribution: approval_distribution_subsystem,
|
||||
approval_voting: approval_voting_subsystem,
|
||||
gossip_support: gossip_support_subsystem,
|
||||
};
|
||||
|
||||
{
|
||||
struct ExtractNameAndMeter;
|
||||
impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem<T>> for ExtractNameAndMeter {
|
||||
type Output = (&'static str, metered::Meter);
|
||||
|
||||
fn map_subsystem(&self, subsystem: &'a OverseenSubsystem<T>) -> Self::Output {
|
||||
let instance = subsystem.instance.as_ref()
|
||||
.expect("Extraction is done directly after spawning when subsystems\
|
||||
have not concluded; qed");
|
||||
|
||||
(instance.name, instance.tx.meter().clone())
|
||||
}
|
||||
}
|
||||
|
||||
let meter_external_to_overseer = events_rx.meter().clone();
|
||||
let meter_subsystem_to_overseer = to_overseer_rx.meter().clone();
|
||||
let subsystem_meters = subsystems.as_ref().map_subsystems(ExtractNameAndMeter);
|
||||
let metronome_metrics = metrics.clone();
|
||||
let metronome = Metronome::new(std::time::Duration::from_millis(950))
|
||||
.for_each(move |_| {
|
||||
let to_subsystem_counts = subsystem_meters.as_ref()
|
||||
.map_subsystems(|&(name, ref meter): &(_, metered::Meter)| (name, meter.queue_count()));
|
||||
|
||||
// We combine the amount of messages from subsystems to the overseer
|
||||
// as well as the amount of messages from external sources to the overseer
|
||||
// into one to_overseer value.
|
||||
metronome_metrics.channel_fill_level_snapshot(
|
||||
to_subsystem_counts,
|
||||
meter_subsystem_to_overseer.queue_count() + meter_external_to_overseer.queue_count(),
|
||||
);
|
||||
|
||||
async move {
|
||||
()
|
||||
}
|
||||
});
|
||||
s.spawn("metrics_metronome", Box::pin(metronome));
|
||||
}
|
||||
|
||||
let this = Self {
|
||||
candidate_validation_subsystem,
|
||||
candidate_backing_subsystem,
|
||||
candidate_selection_subsystem,
|
||||
statement_distribution_subsystem,
|
||||
availability_distribution_subsystem,
|
||||
availability_recovery_subsystem,
|
||||
bitfield_signing_subsystem,
|
||||
bitfield_distribution_subsystem,
|
||||
provisioner_subsystem,
|
||||
pov_distribution_subsystem,
|
||||
runtime_api_subsystem,
|
||||
availability_store_subsystem,
|
||||
network_bridge_subsystem,
|
||||
chain_api_subsystem,
|
||||
collation_generation_subsystem,
|
||||
collator_protocol_subsystem,
|
||||
approval_distribution_subsystem,
|
||||
approval_voting_subsystem,
|
||||
gossip_support_subsystem,
|
||||
subsystems,
|
||||
s,
|
||||
running_subsystems,
|
||||
to_overseer_rx: to_overseer_rx.fuse(),
|
||||
@@ -1740,25 +1855,25 @@ where
|
||||
|
||||
// Stop the overseer.
|
||||
async fn stop(mut self) {
|
||||
let _ = self.candidate_validation_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.candidate_backing_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.candidate_selection_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.statement_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.availability_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.availability_recovery_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.bitfield_signing_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.bitfield_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.provisioner_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.pov_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.runtime_api_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.availability_store_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.network_bridge_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.chain_api_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.collator_protocol_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.collation_generation_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.approval_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.approval_voting_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.gossip_support_subsystem.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.candidate_validation.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.candidate_backing.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.candidate_selection.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.statement_distribution.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.availability_distribution.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.availability_recovery.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.bitfield_signing.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.bitfield_distribution.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.provisioner.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.pov_distribution.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.runtime_api.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.availability_store.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.network_bridge.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.chain_api.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.collator_protocol.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.collation_generation.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.approval_distribution.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.approval_voting.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.gossip_support.send_signal(OverseerSignal::Conclude).await;
|
||||
|
||||
let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
|
||||
|
||||
@@ -1911,25 +2026,25 @@ where
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
|
||||
self.candidate_validation_subsystem.send_signal(signal.clone()).await?;
|
||||
self.candidate_backing_subsystem.send_signal(signal.clone()).await?;
|
||||
self.candidate_selection_subsystem.send_signal(signal.clone()).await?;
|
||||
self.statement_distribution_subsystem.send_signal(signal.clone()).await?;
|
||||
self.availability_distribution_subsystem.send_signal(signal.clone()).await?;
|
||||
self.availability_recovery_subsystem.send_signal(signal.clone()).await?;
|
||||
self.bitfield_signing_subsystem.send_signal(signal.clone()).await?;
|
||||
self.bitfield_distribution_subsystem.send_signal(signal.clone()).await?;
|
||||
self.provisioner_subsystem.send_signal(signal.clone()).await?;
|
||||
self.pov_distribution_subsystem.send_signal(signal.clone()).await?;
|
||||
self.runtime_api_subsystem.send_signal(signal.clone()).await?;
|
||||
self.availability_store_subsystem.send_signal(signal.clone()).await?;
|
||||
self.network_bridge_subsystem.send_signal(signal.clone()).await?;
|
||||
self.chain_api_subsystem.send_signal(signal.clone()).await?;
|
||||
self.collator_protocol_subsystem.send_signal(signal.clone()).await?;
|
||||
self.collation_generation_subsystem.send_signal(signal.clone()).await?;
|
||||
self.approval_distribution_subsystem.send_signal(signal.clone()).await?;
|
||||
self.approval_voting_subsystem.send_signal(signal.clone()).await?;
|
||||
self.gossip_support_subsystem.send_signal(signal).await?;
|
||||
self.subsystems.candidate_validation.send_signal(signal.clone()).await?;
|
||||
self.subsystems.candidate_backing.send_signal(signal.clone()).await?;
|
||||
self.subsystems.candidate_selection.send_signal(signal.clone()).await?;
|
||||
self.subsystems.statement_distribution.send_signal(signal.clone()).await?;
|
||||
self.subsystems.availability_distribution.send_signal(signal.clone()).await?;
|
||||
self.subsystems.availability_recovery.send_signal(signal.clone()).await?;
|
||||
self.subsystems.bitfield_signing.send_signal(signal.clone()).await?;
|
||||
self.subsystems.bitfield_distribution.send_signal(signal.clone()).await?;
|
||||
self.subsystems.provisioner.send_signal(signal.clone()).await?;
|
||||
self.subsystems.pov_distribution.send_signal(signal.clone()).await?;
|
||||
self.subsystems.runtime_api.send_signal(signal.clone()).await?;
|
||||
self.subsystems.availability_store.send_signal(signal.clone()).await?;
|
||||
self.subsystems.network_bridge.send_signal(signal.clone()).await?;
|
||||
self.subsystems.chain_api.send_signal(signal.clone()).await?;
|
||||
self.subsystems.collator_protocol.send_signal(signal.clone()).await?;
|
||||
self.subsystems.collation_generation.send_signal(signal.clone()).await?;
|
||||
self.subsystems.approval_distribution.send_signal(signal.clone()).await?;
|
||||
self.subsystems.approval_voting.send_signal(signal.clone()).await?;
|
||||
self.subsystems.gossip_support.send_signal(signal).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1940,61 +2055,61 @@ where
|
||||
self.metrics.on_message_relayed();
|
||||
match msg {
|
||||
AllMessages::CandidateValidation(msg) => {
|
||||
self.candidate_validation_subsystem.send_message(msg).await?;
|
||||
self.subsystems.candidate_validation.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::CandidateBacking(msg) => {
|
||||
self.candidate_backing_subsystem.send_message(msg).await?;
|
||||
self.subsystems.candidate_backing.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::CandidateSelection(msg) => {
|
||||
self.candidate_selection_subsystem.send_message(msg).await?;
|
||||
self.subsystems.candidate_selection.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::StatementDistribution(msg) => {
|
||||
self.statement_distribution_subsystem.send_message(msg).await?;
|
||||
self.subsystems.statement_distribution.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::AvailabilityDistribution(msg) => {
|
||||
self.availability_distribution_subsystem.send_message(msg).await?;
|
||||
self.subsystems.availability_distribution.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::AvailabilityRecovery(msg) => {
|
||||
self.availability_recovery_subsystem.send_message(msg).await?;
|
||||
self.subsystems.availability_recovery.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::BitfieldDistribution(msg) => {
|
||||
self.bitfield_distribution_subsystem.send_message(msg).await?;
|
||||
self.subsystems.bitfield_distribution.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::BitfieldSigning(msg) => {
|
||||
self.bitfield_signing_subsystem.send_message(msg).await?;
|
||||
self.subsystems.bitfield_signing.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::Provisioner(msg) => {
|
||||
self.provisioner_subsystem.send_message(msg).await?;
|
||||
self.subsystems.provisioner.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::PoVDistribution(msg) => {
|
||||
self.pov_distribution_subsystem.send_message(msg).await?;
|
||||
self.subsystems.pov_distribution.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::RuntimeApi(msg) => {
|
||||
self.runtime_api_subsystem.send_message(msg).await?;
|
||||
self.subsystems.runtime_api.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::AvailabilityStore(msg) => {
|
||||
self.availability_store_subsystem.send_message(msg).await?;
|
||||
self.subsystems.availability_store.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::NetworkBridge(msg) => {
|
||||
self.network_bridge_subsystem.send_message(msg).await?;
|
||||
self.subsystems.network_bridge.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::ChainApi(msg) => {
|
||||
self.chain_api_subsystem.send_message(msg).await?;
|
||||
self.subsystems.chain_api.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::CollationGeneration(msg) => {
|
||||
self.collation_generation_subsystem.send_message(msg).await?;
|
||||
self.subsystems.collation_generation.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::CollatorProtocol(msg) => {
|
||||
self.collator_protocol_subsystem.send_message(msg).await?;
|
||||
self.subsystems.collator_protocol.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::ApprovalDistribution(msg) => {
|
||||
self.approval_distribution_subsystem.send_message(msg).await?;
|
||||
self.subsystems.approval_distribution.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::ApprovalVoting(msg) => {
|
||||
self.approval_voting_subsystem.send_message(msg).await?;
|
||||
self.subsystems.approval_voting.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::GossipSupport(msg) => {
|
||||
self.gossip_support_subsystem.send_message(msg).await?;
|
||||
self.subsystems.gossip_support.send_message(msg).await?;
|
||||
},
|
||||
}
|
||||
|
||||
@@ -2342,13 +2457,12 @@ mod tests {
|
||||
let gather = registry.gather();
|
||||
assert_eq!(gather[0].get_name(), "parachain_activated_heads_total");
|
||||
assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total");
|
||||
assert_eq!(gather[2].get_name(), "parachain_from_overseer_channel_queue_size");
|
||||
assert_eq!(gather[3].get_name(), "parachain_messages_relayed_total");
|
||||
assert_eq!(gather[4].get_name(), "parachain_overseer_messages_relay_timings");
|
||||
assert_eq!(gather[5].get_name(), "parachain_to_overseer_channel_queue_size");
|
||||
assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total");
|
||||
assert_eq!(gather[3].get_name(), "parachain_overseer_messages_relay_timings");
|
||||
assert_eq!(gather[4].get_name(), "parachain_to_overseer_channel_queue_size");
|
||||
let activated = gather[0].get_metric()[0].get_counter().get_value() as u64;
|
||||
let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64;
|
||||
let relayed = gather[3].get_metric()[0].get_counter().get_value() as u64;
|
||||
let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64;
|
||||
let mut result = HashMap::new();
|
||||
result.insert("activated", activated);
|
||||
result.insert("deactivated", deactivated);
|
||||
|
||||
Reference in New Issue
Block a user