mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 15:51:12 +00:00
add dispute metrics, some chores (#3842)
* rename: MsgFilter -> MessageInterceptor * feat: add dispute metrics * fixup * test fixins * fix metrics * dummysubsystem export and trait fn fix * chore: fmt * undo unwanted changes * foo * pfmt * fixup * fixup * revert * some more * Update node/malus/Cargo.toml Co-authored-by: Andronik Ordian <write@reusable.software> * Update node/core/dispute-coordinator/src/metrics.rs Co-authored-by: Andronik Ordian <write@reusable.software> * Update node/core/dispute-coordinator/src/metrics.rs Co-authored-by: Andronik Ordian <write@reusable.software> * Update node/core/dispute-coordinator/src/metrics.rs Co-authored-by: Andronik Ordian <write@reusable.software> * add license header * fix lockfile * new with opts * fmt * Update node/core/dispute-coordinator/src/metrics.rs * feature gate Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
committed by
GitHub
parent
6fa18a2339
commit
cc8b861271
Generated
+3
@@ -6977,12 +6977,15 @@ dependencies = [
|
||||
"assert_matches",
|
||||
"async-trait",
|
||||
"color-eyre",
|
||||
"futures 0.3.17",
|
||||
"parity-util-mem",
|
||||
"polkadot-cli",
|
||||
"polkadot-node-core-candidate-validation",
|
||||
"polkadot-node-core-pvf",
|
||||
"polkadot-node-subsystem",
|
||||
"polkadot-node-subsystem-test-helpers",
|
||||
"polkadot-node-subsystem-util",
|
||||
"sp-core",
|
||||
"structopt",
|
||||
]
|
||||
|
||||
|
||||
@@ -30,6 +30,8 @@ use kvdb::KeyValueDB;
|
||||
use parity_scale_codec::{Decode, Encode, Error as CodecError};
|
||||
use sc_keystore::LocalKeystore;
|
||||
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
const LOG_TARGET: &str = "parachain::dispute-coordinator";
|
||||
|
||||
/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots.
|
||||
@@ -52,7 +54,7 @@ pub struct DisputeCoordinatorSubsystem {}
|
||||
|
||||
impl DisputeCoordinatorSubsystem {
|
||||
/// Create a new instance of the subsystem.
|
||||
pub fn new(_: Arc<dyn KeyValueDB>, _: Config, _: Arc<LocalKeystore>) -> Self {
|
||||
pub fn new(_: Arc<dyn KeyValueDB>, _: Config, _: Arc<LocalKeystore>, _: Metrics) -> Self {
|
||||
DisputeCoordinatorSubsystem {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,8 @@
|
||||
//! another node, this will trigger the dispute participation subsystem to recover and validate the block and call
|
||||
//! back to this subsystem.
|
||||
|
||||
mod metrics;
|
||||
|
||||
#[cfg(feature = "disputes")]
|
||||
mod real;
|
||||
#[cfg(feature = "disputes")]
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MetricsInner {
|
||||
/// Number of opened disputes.
|
||||
open: prometheus::Counter<prometheus::U64>,
|
||||
/// Votes of all disputes.
|
||||
votes: prometheus::CounterVec<prometheus::U64>,
|
||||
/// Conclusion across all disputes.
|
||||
concluded: prometheus::CounterVec<prometheus::U64>,
|
||||
}
|
||||
|
||||
/// Candidate validation metrics.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Metrics(Option<MetricsInner>);
|
||||
|
||||
#[cfg(feature = "disputes")]
|
||||
impl Metrics {
|
||||
pub(crate) fn on_open(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.open.inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_valid_vote(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.votes.with_label_values(&["valid"]).inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_invalid_vote(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.votes.with_label_values(&["invalid"]).inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_concluded_valid(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.concluded.with_label_values(&["valid"]).inc();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_concluded_invalid(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.concluded.with_label_values(&["invalid"]).inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
|
||||
let metrics = MetricsInner {
|
||||
open: prometheus::register(
|
||||
prometheus::Counter::with_opts(prometheus::Opts::new(
|
||||
"parachain_candidate_disputes_total",
|
||||
"Total number of raised disputes.",
|
||||
))?,
|
||||
registry,
|
||||
)?,
|
||||
concluded: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_candidate_dispute_concluded",
|
||||
"Concluded dispute votes, sorted by candidate is `valid` and `invalid`.",
|
||||
),
|
||||
&["validity"],
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
votes: prometheus::register(
|
||||
prometheus::CounterVec::new(
|
||||
prometheus::Opts::new(
|
||||
"parachain_candidate_dispute_votes",
|
||||
"Accumulated dispute votes, sorted by candidate is `valid` and `invalid`.",
|
||||
),
|
||||
&["validity"],
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
};
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
@@ -56,6 +56,7 @@ use kvdb::KeyValueDB;
|
||||
use parity_scale_codec::{Decode, Encode, Error as CodecError};
|
||||
use sc_keystore::LocalKeystore;
|
||||
|
||||
use crate::metrics::Metrics;
|
||||
use backend::{Backend, OverlayedBackend};
|
||||
use db::v1::{DbBackend, RecentDisputes};
|
||||
|
||||
@@ -116,12 +117,18 @@ pub struct DisputeCoordinatorSubsystem {
|
||||
config: Config,
|
||||
store: Arc<dyn KeyValueDB>,
|
||||
keystore: Arc<LocalKeystore>,
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
impl DisputeCoordinatorSubsystem {
|
||||
/// Create a new instance of the subsystem.
|
||||
pub fn new(store: Arc<dyn KeyValueDB>, config: Config, keystore: Arc<LocalKeystore>) -> Self {
|
||||
DisputeCoordinatorSubsystem { store, config, keystore }
|
||||
pub fn new(
|
||||
store: Arc<dyn KeyValueDB>,
|
||||
config: Config,
|
||||
keystore: Arc<LocalKeystore>,
|
||||
metrics: Metrics,
|
||||
) -> Self {
|
||||
DisputeCoordinatorSubsystem { store, config, keystore, metrics }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -329,6 +336,7 @@ where
|
||||
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
|
||||
recovery_state: Participation::Pending,
|
||||
};
|
||||
let metrics = &subsystem.metrics;
|
||||
|
||||
loop {
|
||||
let mut overlay_db = OverlayedBackend::new(backend);
|
||||
@@ -348,7 +356,8 @@ where
|
||||
},
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
|
||||
FromOverseer::Communication { msg } =>
|
||||
handle_incoming(ctx, &mut overlay_db, &mut state, msg, clock.now()).await?,
|
||||
handle_incoming(ctx, &mut overlay_db, &mut state, msg, clock.now(), &metrics)
|
||||
.await?,
|
||||
}
|
||||
|
||||
if !overlay_db.is_empty() {
|
||||
@@ -518,6 +527,7 @@ async fn handle_incoming(
|
||||
state: &mut State,
|
||||
message: DisputeCoordinatorMessage,
|
||||
now: Timestamp,
|
||||
metrics: &Metrics,
|
||||
) -> Result<(), Error> {
|
||||
match message {
|
||||
DisputeCoordinatorMessage::ImportStatements {
|
||||
@@ -537,6 +547,7 @@ async fn handle_incoming(
|
||||
statements,
|
||||
now,
|
||||
pending_confirmation,
|
||||
metrics,
|
||||
)
|
||||
.await?;
|
||||
},
|
||||
@@ -578,6 +589,7 @@ async fn handle_incoming(
|
||||
session,
|
||||
valid,
|
||||
now,
|
||||
metrics,
|
||||
)
|
||||
.await?;
|
||||
},
|
||||
@@ -635,6 +647,7 @@ async fn handle_import_statements(
|
||||
statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
|
||||
now: Timestamp,
|
||||
pending_confirmation: oneshot::Sender<ImportStatementsResult>,
|
||||
metrics: &Metrics,
|
||||
) -> Result<(), Error> {
|
||||
if state.highest_session.map_or(true, |h| session + DISPUTE_WINDOW < h) {
|
||||
// It is not valid to participate in an ancient dispute (spam?).
|
||||
@@ -694,6 +707,7 @@ async fn handle_import_statements(
|
||||
|
||||
match statement.statement().clone() {
|
||||
DisputeStatement::Valid(valid_kind) => {
|
||||
metrics.on_valid_vote();
|
||||
insert_into_statement_vec(
|
||||
&mut votes.valid,
|
||||
valid_kind,
|
||||
@@ -702,6 +716,7 @@ async fn handle_import_statements(
|
||||
);
|
||||
},
|
||||
DisputeStatement::Invalid(invalid_kind) => {
|
||||
metrics.on_invalid_vote();
|
||||
insert_into_statement_vec(
|
||||
&mut votes.invalid,
|
||||
invalid_kind,
|
||||
@@ -784,6 +799,14 @@ async fn handle_import_statements(
|
||||
);
|
||||
return Ok(())
|
||||
}
|
||||
metrics.on_open();
|
||||
|
||||
if concluded_valid {
|
||||
metrics.on_concluded_valid();
|
||||
}
|
||||
if concluded_invalid {
|
||||
metrics.on_concluded_invalid();
|
||||
}
|
||||
}
|
||||
|
||||
// Only write when updated and vote is available.
|
||||
@@ -824,6 +847,7 @@ async fn issue_local_statement(
|
||||
session: SessionIndex,
|
||||
valid: bool,
|
||||
now: Timestamp,
|
||||
metrics: &Metrics,
|
||||
) -> Result<(), Error> {
|
||||
// Load session info.
|
||||
let info = match state.rolling_session_window.session_info(session) {
|
||||
@@ -857,7 +881,6 @@ async fn issue_local_statement(
|
||||
|
||||
let voted_indices: HashSet<_> = voted_indices.into_iter().collect();
|
||||
let controlled_indices = find_controlled_validator_indices(&state.keystore, &validators[..]);
|
||||
|
||||
for index in controlled_indices {
|
||||
if voted_indices.contains(&index) {
|
||||
continue
|
||||
@@ -914,6 +937,7 @@ async fn issue_local_statement(
|
||||
statements,
|
||||
now,
|
||||
pending_confirmation,
|
||||
metrics,
|
||||
)
|
||||
.await?;
|
||||
match rx.await {
|
||||
|
||||
@@ -286,6 +286,7 @@ impl TestState {
|
||||
self.db.clone(),
|
||||
self.config.clone(),
|
||||
self.subsystem_keystore.clone(),
|
||||
Metrics::default(),
|
||||
);
|
||||
let backend = DbBackend::new(self.db.clone(), self.config.column_config());
|
||||
let subsystem_task = run(subsystem, ctx, backend, Box::new(self.clock.clone()));
|
||||
|
||||
@@ -27,3 +27,8 @@ color-eyre = { version = "0.5.11", default-features = false }
|
||||
assert_matches = "1.5"
|
||||
structopt = "0.3.23"
|
||||
async-trait = "0.1.51"
|
||||
|
||||
[dev-dependencies]
|
||||
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
futures = { version = "0.3.17", features = ["thread-pool"] }
|
||||
|
||||
@@ -24,38 +24,52 @@ use polkadot_node_subsystem::*;
|
||||
pub use polkadot_node_subsystem::{messages::AllMessages, overseer, FromOverseer};
|
||||
use std::{future::Future, pin::Pin};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// Filter incoming and outgoing messages.
|
||||
pub trait MsgFilter: Send + Sync + Clone + 'static {
|
||||
pub trait MessageInterceptor<Sender>: Send + Sync + Clone + 'static
|
||||
where
|
||||
Sender: overseer::SubsystemSender<Self::Message> + Clone + 'static,
|
||||
{
|
||||
/// The message type the original subsystem handles incoming.
|
||||
type Message: Send + 'static;
|
||||
|
||||
/// Filter messages that are to be received by
|
||||
/// the subsystem.
|
||||
fn filter_in(&self, msg: FromOverseer<Self::Message>) -> Option<FromOverseer<Self::Message>> {
|
||||
///
|
||||
/// For non-trivial cases, the `sender` can be used to send
|
||||
/// multiple messages after doing some additional processing.
|
||||
fn intercept_incoming(
|
||||
&self,
|
||||
_sender: &mut Sender,
|
||||
msg: FromOverseer<Self::Message>,
|
||||
) -> Option<FromOverseer<Self::Message>> {
|
||||
Some(msg)
|
||||
}
|
||||
|
||||
/// Modify outgoing messages.
|
||||
fn filter_out(&self, msg: AllMessages) -> Option<AllMessages> {
|
||||
fn intercept_outgoing(&self, msg: AllMessages) -> Option<AllMessages> {
|
||||
Some(msg)
|
||||
}
|
||||
}
|
||||
|
||||
/// A sender with the outgoing messages filtered.
|
||||
#[derive(Clone)]
|
||||
pub struct FilteredSender<Sender, Fil> {
|
||||
pub struct InterceptedSender<Sender, Fil> {
|
||||
inner: Sender,
|
||||
message_filter: Fil,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<Sender, Fil> overseer::SubsystemSender<AllMessages> for FilteredSender<Sender, Fil>
|
||||
impl<Sender, Fil> overseer::SubsystemSender<AllMessages> for InterceptedSender<Sender, Fil>
|
||||
where
|
||||
Sender: overseer::SubsystemSender<AllMessages>,
|
||||
Fil: MsgFilter,
|
||||
Sender: overseer::SubsystemSender<AllMessages>
|
||||
+ overseer::SubsystemSender<<Fil as MessageInterceptor<Sender>>::Message>,
|
||||
Fil: MessageInterceptor<Sender>,
|
||||
{
|
||||
async fn send_message(&mut self, msg: AllMessages) {
|
||||
if let Some(msg) = self.message_filter.filter_out(msg) {
|
||||
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
|
||||
self.inner.send_message(msg).await;
|
||||
}
|
||||
}
|
||||
@@ -71,26 +85,39 @@ where
|
||||
}
|
||||
|
||||
fn send_unbounded_message(&mut self, msg: AllMessages) {
|
||||
if let Some(msg) = self.message_filter.filter_out(msg) {
|
||||
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
|
||||
self.inner.send_unbounded_message(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A subsystem context, that filters the outgoing messages.
|
||||
pub struct FilteredContext<Context: overseer::SubsystemContext + SubsystemContext, Fil: MsgFilter> {
|
||||
inner: Context,
|
||||
message_filter: Fil,
|
||||
sender: FilteredSender<<Context as overseer::SubsystemContext>::Sender, Fil>,
|
||||
}
|
||||
|
||||
impl<Context, Fil> FilteredContext<Context, Fil>
|
||||
pub struct InterceptedContext<Context, Fil>
|
||||
where
|
||||
Context: overseer::SubsystemContext + SubsystemContext,
|
||||
Fil: MsgFilter<Message = <Context as overseer::SubsystemContext>::Message>,
|
||||
Fil: MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>,
|
||||
<Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
|
||||
<Fil as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>>::Message,
|
||||
>,
|
||||
{
|
||||
inner: Context,
|
||||
message_filter: Fil,
|
||||
sender: InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>,
|
||||
}
|
||||
|
||||
impl<Context, Fil> InterceptedContext<Context, Fil>
|
||||
where
|
||||
Context: overseer::SubsystemContext + SubsystemContext,
|
||||
Fil: MessageInterceptor<
|
||||
<Context as overseer::SubsystemContext>::Sender,
|
||||
Message = <Context as overseer::SubsystemContext>::Message,
|
||||
>,
|
||||
<Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
|
||||
<Fil as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>>::Message,
|
||||
>,
|
||||
{
|
||||
pub fn new(mut inner: Context, message_filter: Fil) -> Self {
|
||||
let sender = FilteredSender::<<Context as overseer::SubsystemContext>::Sender, Fil> {
|
||||
let sender = InterceptedSender::<<Context as overseer::SubsystemContext>::Sender, Fil> {
|
||||
inner: inner.sender().clone(),
|
||||
message_filter: message_filter.clone(),
|
||||
};
|
||||
@@ -99,15 +126,21 @@ where
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<Context, Fil> overseer::SubsystemContext for FilteredContext<Context, Fil>
|
||||
impl<Context, Fil> overseer::SubsystemContext for InterceptedContext<Context, Fil>
|
||||
where
|
||||
Context: overseer::SubsystemContext + SubsystemContext,
|
||||
Fil: MsgFilter<Message = <Context as overseer::SubsystemContext>::Message>,
|
||||
Fil: MessageInterceptor<
|
||||
<Context as overseer::SubsystemContext>::Sender,
|
||||
Message = <Context as overseer::SubsystemContext>::Message,
|
||||
>,
|
||||
<Context as overseer::SubsystemContext>::AllMessages:
|
||||
From<<Context as overseer::SubsystemContext>::Message>,
|
||||
<Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
|
||||
<Fil as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>>::Message,
|
||||
>,
|
||||
{
|
||||
type Message = <Context as overseer::SubsystemContext>::Message;
|
||||
type Sender = FilteredSender<<Context as overseer::SubsystemContext>::Sender, Fil>;
|
||||
type Sender = InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>;
|
||||
type Error = <Context as overseer::SubsystemContext>::Error;
|
||||
type AllMessages = <Context as overseer::SubsystemContext>::AllMessages;
|
||||
type Signal = <Context as overseer::SubsystemContext>::Signal;
|
||||
@@ -117,7 +150,9 @@ where
|
||||
match self.inner.try_recv().await? {
|
||||
None => return Ok(None),
|
||||
Some(msg) =>
|
||||
if let Some(msg) = self.message_filter.filter_in(msg) {
|
||||
if let Some(msg) =
|
||||
self.message_filter.intercept_incoming(self.inner.sender(), msg)
|
||||
{
|
||||
return Ok(Some(msg))
|
||||
},
|
||||
}
|
||||
@@ -127,7 +162,7 @@ where
|
||||
async fn recv(&mut self) -> SubsystemResult<FromOverseer<Self::Message>> {
|
||||
loop {
|
||||
let msg = self.inner.recv().await?;
|
||||
if let Some(msg) = self.message_filter.filter_in(msg) {
|
||||
if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
|
||||
return Ok(msg)
|
||||
}
|
||||
}
|
||||
@@ -155,27 +190,33 @@ where
|
||||
}
|
||||
|
||||
/// A subsystem to which incoming and outgoing filters are applied.
|
||||
pub struct FilteredSubsystem<Sub, Fil> {
|
||||
subsystem: Sub,
|
||||
message_filter: Fil,
|
||||
pub struct InterceptedSubsystem<Sub, Interceptor> {
|
||||
pub subsystem: Sub,
|
||||
pub message_interceptor: Interceptor,
|
||||
}
|
||||
|
||||
impl<Sub, Fil> FilteredSubsystem<Sub, Fil> {
|
||||
pub fn new(subsystem: Sub, message_filter: Fil) -> Self {
|
||||
Self { subsystem, message_filter }
|
||||
impl<Sub, Interceptor> InterceptedSubsystem<Sub, Interceptor> {
|
||||
pub fn new(subsystem: Sub, message_interceptor: Interceptor) -> Self {
|
||||
Self { subsystem, message_interceptor }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Context, Sub, Fil> overseer::Subsystem<Context, SubsystemError> for FilteredSubsystem<Sub, Fil>
|
||||
impl<Context, Sub, Interceptor> overseer::Subsystem<Context, SubsystemError> for InterceptedSubsystem<Sub, Interceptor>
|
||||
where
|
||||
Context: overseer::SubsystemContext + SubsystemContext + Sync + Send,
|
||||
Sub: overseer::Subsystem<FilteredContext<Context, Fil>, SubsystemError>,
|
||||
FilteredContext<Context, Fil>: overseer::SubsystemContext + SubsystemContext,
|
||||
Fil: MsgFilter<Message = <Context as overseer::SubsystemContext>::Message>,
|
||||
Sub: overseer::Subsystem<InterceptedContext<Context, Interceptor>, SubsystemError>,
|
||||
InterceptedContext<Context, Interceptor>: overseer::SubsystemContext + SubsystemContext,
|
||||
Interceptor: MessageInterceptor<
|
||||
<Context as overseer::SubsystemContext>::Sender,
|
||||
Message = <Context as overseer::SubsystemContext>::Message,
|
||||
>,
|
||||
<Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
|
||||
<Interceptor as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>>::Message,
|
||||
>,
|
||||
{
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
let ctx = FilteredContext::new(ctx, self.message_filter);
|
||||
overseer::Subsystem::<FilteredContext<Context, Fil>, SubsystemError>::start(
|
||||
let ctx = InterceptedContext::new(ctx, self.message_interceptor);
|
||||
overseer::Subsystem::<InterceptedContext<Context, Interceptor>, SubsystemError>::start(
|
||||
self.subsystem,
|
||||
ctx,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
// Copyright 2021 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use super::*;
|
||||
|
||||
use polkadot_node_subsystem_test_helpers::*;
|
||||
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{AllMessages, AvailabilityStoreMessage},
|
||||
overseer::{gen::TimeoutExt, Subsystem},
|
||||
DummySubsystem,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct BlackHoleInterceptor;
|
||||
|
||||
impl<Sender> MessageInterceptor<Sender> for BlackHoleInterceptor
|
||||
where
|
||||
Sender: overseer::SubsystemSender<AllMessages>
|
||||
+ overseer::SubsystemSender<AvailabilityStoreMessage>
|
||||
+ Clone
|
||||
+ 'static,
|
||||
{
|
||||
type Message = AvailabilityStoreMessage;
|
||||
fn intercept_incoming(
|
||||
&self,
|
||||
_sender: &mut Sender,
|
||||
msg: FromOverseer<Self::Message>,
|
||||
) -> Option<FromOverseer<Self::Message>> {
|
||||
match msg {
|
||||
FromOverseer::Communication { msg: _msg } => None,
|
||||
// to conclude the test cleanly
|
||||
sig => Some(sig),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn overseer_send<T: Into<AllMessages>>(overseer: &mut TestSubsystemContextHandle<T>, msg: T) {
|
||||
overseer.send(FromOverseer::Communication { msg }).await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn integrity_test() {
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, mut overseer) = make_subsystem_context(pool);
|
||||
|
||||
let sub = DummySubsystem;
|
||||
|
||||
let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor);
|
||||
|
||||
// Try to send a message we know is going to be filtered.
|
||||
let test_fut = async move {
|
||||
let (tx, rx) = futures::channel::oneshot::channel();
|
||||
overseer_send(
|
||||
&mut overseer,
|
||||
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
|
||||
)
|
||||
.await;
|
||||
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
|
||||
overseer
|
||||
};
|
||||
let subsystem = async move {
|
||||
sub_intercepted.start(context).future.await.unwrap();
|
||||
};
|
||||
|
||||
futures::pin_mut!(test_fut);
|
||||
futures::pin_mut!(subsystem);
|
||||
|
||||
futures::executor::block_on(futures::future::join(
|
||||
async move {
|
||||
let mut overseer = test_fut.await;
|
||||
overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||
},
|
||||
subsystem,
|
||||
))
|
||||
.1;
|
||||
}
|
||||
@@ -27,7 +27,7 @@ use polkadot_cli::{
|
||||
create_default_subsystems,
|
||||
service::{
|
||||
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer,
|
||||
OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost, ProvideRuntimeApi, SpawnNamed,
|
||||
OverseerGen, OverseerGenArgs, ParachainHost, ProvideRuntimeApi, SpawnNamed,
|
||||
},
|
||||
Cli,
|
||||
};
|
||||
@@ -35,11 +35,15 @@ use polkadot_cli::{
|
||||
// Import extra types relevant to the particular
|
||||
// subsystem.
|
||||
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
|
||||
use polkadot_node_subsystem::messages::CandidateValidationMessage;
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{AllMessages, CandidateValidationMessage},
|
||||
overseer::{self, OverseerHandle},
|
||||
FromOverseer,
|
||||
};
|
||||
|
||||
// Filter wrapping related types.
|
||||
use malus::*;
|
||||
|
||||
// Filter wrapping related types.
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
@@ -51,17 +55,27 @@ use structopt::StructOpt;
|
||||
#[derive(Clone, Default, Debug)]
|
||||
struct Skippy(Arc<AtomicUsize>);
|
||||
|
||||
impl MsgFilter for Skippy {
|
||||
impl<Sender> MessageInterceptor<Sender> for Skippy
|
||||
where
|
||||
Sender: overseer::SubsystemSender<AllMessages>
|
||||
+ overseer::SubsystemSender<CandidateValidationMessage>
|
||||
+ Clone
|
||||
+ 'static,
|
||||
{
|
||||
type Message = CandidateValidationMessage;
|
||||
|
||||
fn filter_in(&self, msg: FromOverseer<Self::Message>) -> Option<FromOverseer<Self::Message>> {
|
||||
fn intercept_incoming(
|
||||
&self,
|
||||
_sender: &mut Sender,
|
||||
msg: FromOverseer<Self::Message>,
|
||||
) -> Option<FromOverseer<Self::Message>> {
|
||||
if self.0.fetch_add(1, Ordering::Relaxed) % 2 == 0 {
|
||||
Some(msg)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
fn filter_out(&self, msg: AllMessages) -> Option<AllMessages> {
|
||||
fn intercept_outgoing(&self, msg: AllMessages) -> Option<AllMessages> {
|
||||
Some(msg)
|
||||
}
|
||||
}
|
||||
@@ -88,7 +102,7 @@ impl OverseerGen for BehaveMaleficient {
|
||||
let all_subsystems = create_default_subsystems(args)?.replace_candidate_validation(
|
||||
// create the filtered subsystem
|
||||
|orig: CandidateValidationSubsystem| {
|
||||
FilteredSubsystem::new(
|
||||
InterceptedSubsystem::new(
|
||||
CandidateValidationSubsystem::with_config(
|
||||
candidate_validation_config,
|
||||
orig.metrics,
|
||||
|
||||
@@ -94,7 +94,7 @@ pub use polkadot_node_subsystem_types::{
|
||||
// TODO legacy, to be deleted, left for easier integration
|
||||
// TODO https://github.com/paritytech/polkadot/issues/3427
|
||||
mod subsystems;
|
||||
pub use self::subsystems::AllSubsystems;
|
||||
pub use self::subsystems::{AllSubsystems, DummySubsystem};
|
||||
|
||||
mod metrics;
|
||||
use self::metrics::Metrics;
|
||||
|
||||
@@ -241,6 +241,7 @@ where
|
||||
parachains_db.clone(),
|
||||
dispute_coordinator_config,
|
||||
keystore.clone(),
|
||||
Metrics::register(registry)?,
|
||||
),
|
||||
dispute_participation: DisputeParticipationSubsystem::new(),
|
||||
dispute_distribution: DisputeDistributionSubsystem::new(
|
||||
|
||||
@@ -150,22 +150,25 @@ pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver<AllMes
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl overseer::SubsystemSender<AllMessages> for TestSubsystemSender {
|
||||
async fn send_message(&mut self, msg: AllMessages) {
|
||||
self.tx.send(msg).await.expect("test overseer no longer live");
|
||||
impl<T> overseer::SubsystemSender<T> for TestSubsystemSender
|
||||
where
|
||||
T: Into<AllMessages> + Send + 'static,
|
||||
{
|
||||
async fn send_message(&mut self, msg: T) {
|
||||
self.tx.send(msg.into()).await.expect("test overseer no longer live");
|
||||
}
|
||||
|
||||
async fn send_messages<T>(&mut self, msgs: T)
|
||||
async fn send_messages<X>(&mut self, msgs: X)
|
||||
where
|
||||
T: IntoIterator<Item = AllMessages> + Send,
|
||||
T::IntoIter: Send,
|
||||
X: IntoIterator<Item = T> + Send,
|
||||
X::IntoIter: Send,
|
||||
{
|
||||
let mut iter = stream::iter(msgs.into_iter().map(Ok));
|
||||
let mut iter = stream::iter(msgs.into_iter().map(|msg| Ok(msg.into())));
|
||||
self.tx.send_all(&mut iter).await.expect("test overseer no longer live");
|
||||
}
|
||||
|
||||
fn send_unbounded_message(&mut self, msg: AllMessages) {
|
||||
self.tx.unbounded_send(msg).expect("test overseer no longer live");
|
||||
fn send_unbounded_message(&mut self, msg: T) {
|
||||
self.tx.unbounded_send(msg.into()).expect("test overseer no longer live");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
pub use jaeger::*;
|
||||
pub use polkadot_node_jaeger as jaeger;
|
||||
|
||||
pub use polkadot_overseer::{self as overseer, ActiveLeavesUpdate, OverseerSignal};
|
||||
pub use polkadot_overseer::{self as overseer, ActiveLeavesUpdate, DummySubsystem, OverseerSignal};
|
||||
|
||||
pub use polkadot_node_subsystem_types::{
|
||||
errors::{self, *},
|
||||
|
||||
Reference in New Issue
Block a user