mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Dispute distribution improvements (#3853)
* Better warning policy on dispute-distribution. Network errors happen, only warn on a minimum number of failed attempts. * Add metric to see how long it takes to get a dispute vote out. * Let's be alert.
This commit is contained in:
@@ -46,6 +46,9 @@ struct MetricsInner {
|
||||
///
|
||||
/// We both have successful imports and failed imports here.
|
||||
imported_requests: CounterVec<U64>,
|
||||
|
||||
/// The duration of issued dispute request to response.
|
||||
time_dispute_request: prometheus::Histogram,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
@@ -61,7 +64,7 @@ impl Metrics {
|
||||
}
|
||||
}
|
||||
|
||||
/// Increment counter on served chunks.
|
||||
/// Increment counter on served disputes.
|
||||
pub fn on_received_request(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.received_requests.inc()
|
||||
@@ -74,6 +77,11 @@ impl Metrics {
|
||||
metrics.imported_requests.with_label_values(&[label]).inc()
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a timer to time request/response duration.
|
||||
pub fn time_dispute_request(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.time_dispute_request.start_timer())
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
@@ -106,6 +114,13 @@ impl metrics::Metrics for Metrics {
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
time_dispute_request: prometheus::register(
|
||||
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
|
||||
"parachain_dispute_distribution_time_dispute_request",
|
||||
"Time needed for dispute votes to get confirmed/fail getting transmitted.",
|
||||
))?,
|
||||
registry,
|
||||
)?,
|
||||
};
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
|
||||
@@ -97,9 +97,15 @@ impl DisputeSender {
|
||||
return Ok(())
|
||||
},
|
||||
Entry::Vacant(vacant) => {
|
||||
let send_task =
|
||||
SendTask::new(ctx, runtime, &self.active_sessions, self.tx.clone(), req)
|
||||
.await?;
|
||||
let send_task = SendTask::new(
|
||||
ctx,
|
||||
runtime,
|
||||
&self.active_sessions,
|
||||
self.tx.clone(),
|
||||
req,
|
||||
&self.metrics,
|
||||
)
|
||||
.await?;
|
||||
vacant.insert(send_task);
|
||||
},
|
||||
}
|
||||
@@ -140,7 +146,9 @@ impl DisputeSender {
|
||||
|
||||
for dispute in self.disputes.values_mut() {
|
||||
if have_new_sessions || dispute.has_failed_sends() {
|
||||
dispute.refresh_sends(ctx, runtime, &self.active_sessions).await?;
|
||||
dispute
|
||||
.refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,12 +20,13 @@ use futures::{channel::mpsc, future::RemoteHandle, Future, FutureExt, SinkExt};
|
||||
|
||||
use polkadot_node_network_protocol::{
|
||||
request_response::{
|
||||
outgoing::RequestError,
|
||||
v1::{DisputeRequest, DisputeResponse},
|
||||
OutgoingRequest, OutgoingResult, Recipient, Requests,
|
||||
},
|
||||
IfDisconnected,
|
||||
};
|
||||
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
|
||||
use polkadot_node_subsystem_util::{metrics, runtime::RuntimeInfo};
|
||||
use polkadot_primitives::v1::{
|
||||
AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex,
|
||||
};
|
||||
@@ -38,7 +39,7 @@ use super::error::{Fatal, Result};
|
||||
|
||||
use crate::{
|
||||
metrics::{FAILED, SUCCEEDED},
|
||||
LOG_TARGET,
|
||||
Metrics, LOG_TARGET,
|
||||
};
|
||||
|
||||
/// Delivery status for a particular dispute.
|
||||
@@ -57,6 +58,16 @@ pub struct SendTask {
|
||||
/// Whether we have any tasks failed since the last refresh.
|
||||
has_failed_sends: bool,
|
||||
|
||||
/// Total count of failed transmissions.
|
||||
///
|
||||
/// Used for issuing a warning, if that number gets above a certain threshold.
|
||||
failed_count: usize,
|
||||
|
||||
/// Total number of initiated requests.
|
||||
///
|
||||
/// Used together with `failed_count` for issuing a warning on too many failed attempts.
|
||||
send_count: usize,
|
||||
|
||||
/// Sender to be cloned for tasks.
|
||||
tx: mpsc::Sender<TaskFinish>,
|
||||
}
|
||||
@@ -87,14 +98,14 @@ pub enum TaskResult {
|
||||
/// Task was not able to get the request out to its peer.
|
||||
///
|
||||
/// It should be retried in that case.
|
||||
Failed,
|
||||
Failed(RequestError),
|
||||
}
|
||||
|
||||
impl TaskResult {
|
||||
pub fn as_metrics_label(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Succeeded => SUCCEEDED,
|
||||
Self::Failed => FAILED,
|
||||
Self::Failed(_) => FAILED,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -107,10 +118,17 @@ impl SendTask {
|
||||
active_sessions: &HashMap<SessionIndex, Hash>,
|
||||
tx: mpsc::Sender<TaskFinish>,
|
||||
request: DisputeRequest,
|
||||
metrics: &Metrics,
|
||||
) -> Result<Self> {
|
||||
let mut send_task =
|
||||
Self { request, deliveries: HashMap::new(), has_failed_sends: false, tx };
|
||||
send_task.refresh_sends(ctx, runtime, active_sessions).await?;
|
||||
let mut send_task = Self {
|
||||
request,
|
||||
deliveries: HashMap::new(),
|
||||
has_failed_sends: false,
|
||||
tx,
|
||||
failed_count: 0,
|
||||
send_count: 0,
|
||||
};
|
||||
send_task.refresh_sends(ctx, runtime, active_sessions, metrics).await?;
|
||||
Ok(send_task)
|
||||
}
|
||||
|
||||
@@ -123,6 +141,7 @@ impl SendTask {
|
||||
ctx: &mut Context,
|
||||
runtime: &mut RuntimeInfo,
|
||||
active_sessions: &HashMap<SessionIndex, Hash>,
|
||||
metrics: &Metrics,
|
||||
) -> Result<()> {
|
||||
let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?;
|
||||
|
||||
@@ -137,10 +156,12 @@ impl SendTask {
|
||||
|
||||
// Start any new tasks that are needed:
|
||||
let new_statuses =
|
||||
send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone()).await?;
|
||||
send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone(), metrics)
|
||||
.await?;
|
||||
|
||||
self.deliveries.extend(new_statuses.into_iter());
|
||||
self.has_failed_sends = false;
|
||||
self.send_count += new_statuses.len();
|
||||
self.deliveries.extend(new_statuses.into_iter());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -150,15 +171,38 @@ impl SendTask {
|
||||
}
|
||||
|
||||
/// Handle a finished response waiting task.
|
||||
///
|
||||
/// Called by `DisputeSender` upon reception of the corresponding message from our spawned `wait_response_task`.
|
||||
pub fn on_finished_send(&mut self, authority: &AuthorityDiscoveryId, result: TaskResult) {
|
||||
match result {
|
||||
TaskResult::Failed => {
|
||||
tracing::warn!(
|
||||
TaskResult::Failed(err) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
candidate = ?self.request.0.candidate_receipt.hash(),
|
||||
?authority,
|
||||
"Could not get our message out! If this keeps happening, then check chain whether the dispute made it there."
|
||||
candidate_hash = %self.request.0.candidate_receipt.hash(),
|
||||
%err,
|
||||
"Error sending dispute statements to node."
|
||||
);
|
||||
|
||||
self.failed_count += 1;
|
||||
let error_rate = (100 * self.failed_count).checked_div(self.send_count).expect(
|
||||
"We cannot receive a failed request, without having sent one first. qed.",
|
||||
);
|
||||
// 10% seems to be a sensible threshold to become alert - note that
|
||||
// self.send_count gets increased in batches of the full validator set, so we don't
|
||||
// need to account for a low send_count.
|
||||
if error_rate > 10 {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = %self.request.0.candidate_receipt.hash(),
|
||||
last_authority = ?authority,
|
||||
last_error = %err,
|
||||
failed_count = ?self.failed_count,
|
||||
total_attempts = ?self.send_count,
|
||||
"Sending our dispute vote failed for more than 10% of total attempts!"
|
||||
);
|
||||
}
|
||||
|
||||
self.has_failed_sends = true;
|
||||
// Remove state, so we know what to try again:
|
||||
self.deliveries.remove(authority);
|
||||
@@ -236,6 +280,7 @@ async fn send_requests<Context: SubsystemContext>(
|
||||
tx: mpsc::Sender<TaskFinish>,
|
||||
receivers: Vec<AuthorityDiscoveryId>,
|
||||
req: DisputeRequest,
|
||||
metrics: &Metrics,
|
||||
) -> Result<HashMap<AuthorityDiscoveryId, DeliveryStatus>> {
|
||||
let mut statuses = HashMap::with_capacity(receivers.len());
|
||||
let mut reqs = Vec::with_capacity(receivers.len());
|
||||
@@ -251,6 +296,7 @@ async fn send_requests<Context: SubsystemContext>(
|
||||
req.0.candidate_receipt.hash(),
|
||||
receiver.clone(),
|
||||
tx.clone(),
|
||||
metrics.time_dispute_request(),
|
||||
);
|
||||
|
||||
let (remote, remote_handle) = fut.remote_handle();
|
||||
@@ -273,28 +319,13 @@ async fn wait_response_task(
|
||||
candidate_hash: CandidateHash,
|
||||
receiver: AuthorityDiscoveryId,
|
||||
mut tx: mpsc::Sender<TaskFinish>,
|
||||
_timer: Option<metrics::prometheus::prometheus::HistogramTimer>,
|
||||
) {
|
||||
let result = pending_response.await;
|
||||
let msg = match result {
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
%candidate_hash,
|
||||
%receiver,
|
||||
%err,
|
||||
"Error sending dispute statements to node."
|
||||
);
|
||||
TaskFinish { candidate_hash, receiver, result: TaskResult::Failed }
|
||||
},
|
||||
Ok(DisputeResponse::Confirmed) => {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
%candidate_hash,
|
||||
%receiver,
|
||||
"Sending dispute message succeeded"
|
||||
);
|
||||
TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded }
|
||||
},
|
||||
Err(err) => TaskFinish { candidate_hash, receiver, result: TaskResult::Failed(err) },
|
||||
Ok(DisputeResponse::Confirmed) =>
|
||||
TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded },
|
||||
};
|
||||
if let Err(err) = tx.feed(msg).await {
|
||||
tracing::debug!(
|
||||
|
||||
Reference in New Issue
Block a user