From fad55b95fa5559f7a0b2ac641c197cba24e67ab7 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Fri, 17 Dec 2021 15:50:26 +0100 Subject: [PATCH] dispute statements node side limiting (#4541) --- polkadot/Cargo.lock | 2 + polkadot/node/core/provisioner/Cargo.toml | 2 + polkadot/node/core/provisioner/src/lib.rs | 163 ++++++++++++++++------ 3 files changed, 124 insertions(+), 43 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 134c4312e4..1f2f301875 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6315,11 +6315,13 @@ dependencies = [ "bitvec", "futures 0.3.18", "futures-timer 3.0.2", + "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", "polkadot-primitives-test-helpers", + "rand 0.8.4", "sp-application-crypto", "sp-keystore", "thiserror", diff --git a/polkadot/node/core/provisioner/Cargo.toml b/polkadot/node/core/provisioner/Cargo.toml index 96cba550ed..74c24b0d34 100644 --- a/polkadot/node/core/provisioner/Cargo.toml +++ b/polkadot/node/core/provisioner/Cargo.toml @@ -10,9 +10,11 @@ futures = "0.3.17" tracing = "0.1.29" thiserror = "1.0.30" polkadot-primitives = { path = "../../../primitives" } +polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } futures-timer = "3.0.2" +rand = "0.8.4" [dev-dependencies] sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 11c2f0aa9c..2b6dfb564d 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -25,6 +25,7 @@ use futures::{ prelude::*, }; use futures_timer::Delay; +use polkadot_node_primitives::CandidateVotes; use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, jaeger, @@ -39,11 +40,14 @@ use polkadot_node_subsystem_util::{ JobSubsystem, JobTrait, }; use polkadot_primitives::v1::{ - BackedCandidate, BlockNumber, CandidateReceipt, CoreState, DisputeStatement, - DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption, + BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeStatement, + DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption, SessionIndex, SignedAvailabilityBitfield, ValidatorIndex, }; -use std::{collections::BTreeMap, pin::Pin}; +use std::{ + collections::{BTreeMap, HashSet}, + pin::Pin, +}; use thiserror::Error; mod metrics; @@ -490,7 +494,7 @@ async fn select_candidates( target: LOG_TARGET, "Selected {} candidates for {} cores", candidates.len(), - availability_cores.len(), + availability_cores.len() ); Ok(candidates) @@ -551,55 +555,128 @@ fn bitfields_indicate_availability( 3 * availability.count_ones() >= 2 * availability.len() } -async fn select_disputes( - sender: &mut impl SubsystemSender, - metrics: &metrics::Metrics, -) -> Result { - let (tx, rx) = oneshot::channel(); +#[derive(Debug)] +enum RequestType { + /// Query recent disputes, could be an excessive amount. + Recent, + /// Query the currently active and very recently concluded disputes. + Active, +} - // We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine. - // It's heavier than `ActiveDisputes` but ensures that everything from the dispute - // window gets on-chain, unlike `ActiveDisputes`. - // - // This should have no meaningful impact on performance on production networks for - // two reasons: - // 1. In large validator sets, a node might be a block author 1% or less of the time. - // this code-path only triggers in the case of being a block author. - // 2. Disputes are expected to be rare because they come with heavy slashing. - sender.send_message(DisputeCoordinatorMessage::RecentDisputes(tx).into()).await; +/// Request open disputes identified by `CandidateHash` and the `SessionIndex`. +async fn request_disputes( + sender: &mut impl SubsystemSender, + active_or_recent: RequestType, +) -> Vec<(SessionIndex, CandidateHash)> { + let (tx, rx) = oneshot::channel(); + let msg = match active_or_recent { + RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx), + RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx), + }; + sender.send_message(msg.into()).await; let recent_disputes = match rx.await { Ok(r) => r, Err(oneshot::Canceled) => { - tracing::debug!( - target: LOG_TARGET, - "Unable to gather recent disputes - subsystem disconnected?", - ); - + tracing::warn!(target: LOG_TARGET, "Unable to gather {:?} disputes", active_or_recent); Vec::new() }, }; + recent_disputes +} + +/// Request the relevant dispute statements for a set of disputes identified by `CandidateHash` and the `SessionIndex`. +async fn request_votes( + sender: &mut impl SubsystemSender, + disputes_to_query: Vec<(SessionIndex, CandidateHash)>, +) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> { + let (tx, rx) = oneshot::channel(); + sender + .send_message(DisputeCoordinatorMessage::QueryCandidateVotes(disputes_to_query, tx).into()) + .await; + + match rx.await { + Ok(v) => v, + Err(oneshot::Canceled) => { + tracing::warn!(target: LOG_TARGET, "Unable to query candidate votes"); + Vec::new() + }, + } +} + +/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent. +fn extend_by_random_subset_without_repetition( + acc: &mut Vec<(SessionIndex, CandidateHash)>, + extension: Vec<(SessionIndex, CandidateHash)>, + n: usize, +) { + use rand::Rng; + + let lut = acc.iter().cloned().collect::>(); + + let mut unique_new = + extension.into_iter().filter(|recent| !lut.contains(recent)).collect::>(); + + // we can simply add all + if unique_new.len() <= n { + acc.extend(unique_new) + } else { + acc.reserve(n); + let mut rng = rand::thread_rng(); + for _ in 0..n { + let idx = rng.gen_range(0..unique_new.len()); + acc.push(unique_new.swap_remove(idx)); + } + } + // assure sorting stays candid according to session index + acc.sort_unstable_by(|a, b| a.0.cmp(&b.0)); +} + +async fn select_disputes( + sender: &mut impl SubsystemSender, + metrics: &metrics::Metrics, +) -> Result { + const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 10_000; + + // We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine. + // It's heavier than `ActiveDisputes` but ensures that everything from the dispute + // window gets on-chain, unlike `ActiveDisputes`. + // In case of an overload condition, we limit ourselves to active disputes, and fill up to the + // upper bound of disputes to pass to wasm `fn create_inherent_data`. + // If the active ones are already exceeding the bounds, randomly select a subset. + let recent = request_disputes(sender, RequestType::Recent).await; + let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { + tracing::warn!( + target: LOG_TARGET, + "Recent disputes are excessive ({} > {}), reduce to active ones, and selected", + recent.len(), + MAX_DISPUTES_FORWARDED_TO_RUNTIME + ); + let mut active = request_disputes(sender, RequestType::Active).await; + let n_active = active.len(); + let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { + let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME); + extend_by_random_subset_without_repetition( + &mut picked, + active, + MAX_DISPUTES_FORWARDED_TO_RUNTIME, + ); + picked + } else { + extend_by_random_subset_without_repetition( + &mut active, + recent, + MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active), + ); + active + }; + active + } else { + recent + }; // Load all votes for all disputes from the coordinator. - let dispute_candidate_votes = { - let (tx, rx) = oneshot::channel(); - sender - .send_message( - DisputeCoordinatorMessage::QueryCandidateVotes(recent_disputes, tx).into(), - ) - .await; - - match rx.await { - Ok(v) => v, - Err(oneshot::Canceled) => { - tracing::debug!( - target: LOG_TARGET, - "Unable to query candidate votes - subsystem disconnected?", - ); - Vec::new() - }, - } - }; + let dispute_candidate_votes = request_votes(sender, disputes).await; // Transform all `CandidateVotes` into `MultiDisputeStatementSet`. Ok(dispute_candidate_votes