dispute statements node side limiting (#4541)

This commit is contained in:
Bernhard Schuster
2021-12-17 15:50:26 +01:00
committed by GitHub
parent b0a4f78edc
commit fad55b95fa
3 changed files with 124 additions and 43 deletions
+2
View File
@@ -6315,11 +6315,13 @@ dependencies = [
"bitvec",
"futures 0.3.18",
"futures-timer 3.0.2",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"polkadot-primitives-test-helpers",
"rand 0.8.4",
"sp-application-crypto",
"sp-keystore",
"thiserror",
@@ -10,9 +10,11 @@ futures = "0.3.17"
tracing = "0.1.29"
thiserror = "1.0.30"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
futures-timer = "3.0.2"
rand = "0.8.4"
[dev-dependencies]
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
+120 -43
View File
@@ -25,6 +25,7 @@ use futures::{
prelude::*,
};
use futures_timer::Delay;
use polkadot_node_primitives::CandidateVotes;
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
jaeger,
@@ -39,11 +40,14 @@ use polkadot_node_subsystem_util::{
JobSubsystem, JobTrait,
};
use polkadot_primitives::v1::{
BackedCandidate, BlockNumber, CandidateReceipt, CoreState, DisputeStatement,
DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeStatement,
DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption, SessionIndex,
SignedAvailabilityBitfield, ValidatorIndex,
};
use std::{collections::BTreeMap, pin::Pin};
use std::{
collections::{BTreeMap, HashSet},
pin::Pin,
};
use thiserror::Error;
mod metrics;
@@ -490,7 +494,7 @@ async fn select_candidates(
target: LOG_TARGET,
"Selected {} candidates for {} cores",
candidates.len(),
availability_cores.len(),
availability_cores.len()
);
Ok(candidates)
@@ -551,55 +555,128 @@ fn bitfields_indicate_availability(
3 * availability.count_ones() >= 2 * availability.len()
}
async fn select_disputes(
sender: &mut impl SubsystemSender,
metrics: &metrics::Metrics,
) -> Result<MultiDisputeStatementSet, Error> {
let (tx, rx) = oneshot::channel();
#[derive(Debug)]
enum RequestType {
/// Query recent disputes, could be an excessive amount.
Recent,
/// Query the currently active and very recently concluded disputes.
Active,
}
// We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine.
// It's heavier than `ActiveDisputes` but ensures that everything from the dispute
// window gets on-chain, unlike `ActiveDisputes`.
//
// This should have no meaningful impact on performance on production networks for
// two reasons:
// 1. In large validator sets, a node might be a block author 1% or less of the time.
// this code-path only triggers in the case of being a block author.
// 2. Disputes are expected to be rare because they come with heavy slashing.
sender.send_message(DisputeCoordinatorMessage::RecentDisputes(tx).into()).await;
/// Request open disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_disputes(
sender: &mut impl SubsystemSender,
active_or_recent: RequestType,
) -> Vec<(SessionIndex, CandidateHash)> {
let (tx, rx) = oneshot::channel();
let msg = match active_or_recent {
RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx),
RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx),
};
sender.send_message(msg.into()).await;
let recent_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
tracing::debug!(
target: LOG_TARGET,
"Unable to gather recent disputes - subsystem disconnected?",
);
tracing::warn!(target: LOG_TARGET, "Unable to gather {:?} disputes", active_or_recent);
Vec::new()
},
};
recent_disputes
}
/// Request the relevant dispute statements for a set of disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_votes(
sender: &mut impl SubsystemSender,
disputes_to_query: Vec<(SessionIndex, CandidateHash)>,
) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> {
let (tx, rx) = oneshot::channel();
sender
.send_message(DisputeCoordinatorMessage::QueryCandidateVotes(disputes_to_query, tx).into())
.await;
match rx.await {
Ok(v) => v,
Err(oneshot::Canceled) => {
tracing::warn!(target: LOG_TARGET, "Unable to query candidate votes");
Vec::new()
},
}
}
/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent.
fn extend_by_random_subset_without_repetition(
acc: &mut Vec<(SessionIndex, CandidateHash)>,
extension: Vec<(SessionIndex, CandidateHash)>,
n: usize,
) {
use rand::Rng;
let lut = acc.iter().cloned().collect::<HashSet<(SessionIndex, CandidateHash)>>();
let mut unique_new =
extension.into_iter().filter(|recent| !lut.contains(recent)).collect::<Vec<_>>();
// we can simply add all
if unique_new.len() <= n {
acc.extend(unique_new)
} else {
acc.reserve(n);
let mut rng = rand::thread_rng();
for _ in 0..n {
let idx = rng.gen_range(0..unique_new.len());
acc.push(unique_new.swap_remove(idx));
}
}
// assure sorting stays candid according to session index
acc.sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
async fn select_disputes(
sender: &mut impl SubsystemSender,
metrics: &metrics::Metrics,
) -> Result<MultiDisputeStatementSet, Error> {
const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 10_000;
// We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine.
// It's heavier than `ActiveDisputes` but ensures that everything from the dispute
// window gets on-chain, unlike `ActiveDisputes`.
// In case of an overload condition, we limit ourselves to active disputes, and fill up to the
// upper bound of disputes to pass to wasm `fn create_inherent_data`.
// If the active ones are already exceeding the bounds, randomly select a subset.
let recent = request_disputes(sender, RequestType::Recent).await;
let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
tracing::warn!(
target: LOG_TARGET,
"Recent disputes are excessive ({} > {}), reduce to active ones, and selected",
recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME
);
let mut active = request_disputes(sender, RequestType::Active).await;
let n_active = active.len();
let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME);
extend_by_random_subset_without_repetition(
&mut picked,
active,
MAX_DISPUTES_FORWARDED_TO_RUNTIME,
);
picked
} else {
extend_by_random_subset_without_repetition(
&mut active,
recent,
MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active),
);
active
};
active
} else {
recent
};
// Load all votes for all disputes from the coordinator.
let dispute_candidate_votes = {
let (tx, rx) = oneshot::channel();
sender
.send_message(
DisputeCoordinatorMessage::QueryCandidateVotes(recent_disputes, tx).into(),
)
.await;
match rx.await {
Ok(v) => v,
Err(oneshot::Canceled) => {
tracing::debug!(
target: LOG_TARGET,
"Unable to query candidate votes - subsystem disconnected?",
);
Vec::new()
},
}
};
let dispute_candidate_votes = request_votes(sender, disputes).await;
// Transform all `CandidateVotes` into `MultiDisputeStatementSet`.
Ok(dispute_candidate_votes