statement-distribution: validator disabling (#1841)

Closes #1591.

The purpose of this PR is filter out backing statements from the network
signed by disabled validators. This is just an optimization, since we
will do filtering in the runtime in #1863 to avoid nodes to filter
garbage out at block production time.

- [x] Ensure it's ok to fiddle with the mask of manifests
- [x] Write more unit tests
- [x] Test locally
- [x] simple zombienet test
- [x] PRDoc

---------

Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
This commit is contained in:
ordian
2024-01-10 10:32:52 +01:00
committed by GitHub
parent 01ea45c3a1
commit a4195326b9
14 changed files with 1577 additions and 833 deletions
@@ -17,11 +17,11 @@
//! Implementation of the v2 statement distribution protocol,
//! designed for asynchronous backing.
use net_protocol::{filter_by_peer_version, peer_set::ProtocolVersion};
use bitvec::prelude::{BitVec, Lsb0};
use polkadot_node_network_protocol::{
self as net_protocol,
self as net_protocol, filter_by_peer_version,
grid_topology::SessionGridTopology,
peer_set::ValidationVersion,
peer_set::{ProtocolVersion, ValidationVersion},
request_response::{
incoming::OutgoingResponse,
v2::{AttestedCandidateRequest, AttestedCandidateResponse},
@@ -64,7 +64,7 @@ use futures::{
use std::{
collections::{
hash_map::{Entry, HashMap},
HashSet,
BTreeSet, HashSet,
},
time::{Duration, Instant},
};
@@ -96,6 +96,7 @@ const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement");
const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep =
Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent");
const COST_EXCESSIVE_SECONDED: Rep = Rep::CostMinor("Sent Excessive `Seconded` Statements");
const COST_DISABLED_VALIDATOR: Rep = Rep::CostMinor("Sent a statement from a disabled validator");
const COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE: Rep =
Rep::CostMinor("Unexpected Manifest, missing knowlege for relay parent");
@@ -189,6 +190,8 @@ struct PerSessionState {
// getting the topology from the gossip-support subsystem
grid_view: Option<grid::SessionTopologyView>,
local_validator: Option<LocalValidatorIndex>,
// We store the latest state here based on union of leaves.
disabled_validators: BTreeSet<ValidatorIndex>,
}
impl PerSessionState {
@@ -205,7 +208,16 @@ impl PerSessionState {
)
.map(|(_, index)| LocalValidatorIndex::Active(index));
PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator }
let disabled_validators = BTreeSet::new();
PerSessionState {
session_info,
groups,
authority_lookup,
grid_view: None,
local_validator,
disabled_validators,
}
}
fn supply_topology(
@@ -234,6 +246,33 @@ impl PerSessionState {
fn is_not_validator(&self) -> bool {
self.grid_view.is_some() && self.local_validator.is_none()
}
/// A convenience function to generate a disabled bitmask for the given backing group.
/// The output bits are set to `true` for validators that are disabled.
/// Returns `None` if the group index is out of bounds.
pub fn disabled_bitmask(&self, group: GroupIndex) -> Option<BitVec<u8, Lsb0>> {
let group = self.groups.get(group)?;
let mask = BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)));
Some(mask)
}
/// Returns `true` if the given validator is disabled in the current session.
pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
self.disabled_validators.contains(validator_index)
}
/// Extend the list of disabled validators.
pub fn extend_disabled_validators(
&mut self,
disabled: impl IntoIterator<Item = ValidatorIndex>,
) {
self.disabled_validators.extend(disabled);
}
/// Clear the list of disabled validators.
pub fn clear_disabled_validators(&mut self) {
self.disabled_validators.clear();
}
}
pub(crate) struct State {
@@ -510,13 +549,20 @@ pub(crate) async fn handle_active_leaves_update<Context>(
let new_relay_parents =
state.implicit_view.all_allowed_relay_parents().cloned().collect::<Vec<_>>();
for new_relay_parent in new_relay_parents.iter().cloned() {
if state.per_relay_parent.contains_key(&new_relay_parent) {
continue
}
// New leaf: fetch info from runtime API and initialize
// `per_relay_parent`.
// We clear the list of disabled validators to reset it properly based on union of leaves.
let mut cleared_disabled_validators: BTreeSet<SessionIndex> = BTreeSet::new();
for new_relay_parent in new_relay_parents.iter().cloned() {
// Even if we processed this relay parent before, we need to fetch the list of disabled
// validators based on union of active leaves.
let disabled_validators =
polkadot_node_subsystem_util::vstaging::get_disabled_validators_with_fallback(
ctx.sender(),
new_relay_parent,
)
.await
.map_err(JfyiError::FetchDisabledValidators)?;
let session_index = polkadot_node_subsystem_util::request_session_index_for_child(
new_relay_parent,
@@ -527,23 +573,6 @@ pub(crate) async fn handle_active_leaves_update<Context>(
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchSessionIndex)?;
let availability_cores = polkadot_node_subsystem_util::request_availability_cores(
new_relay_parent,
ctx.sender(),
)
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchAvailabilityCores)?;
let group_rotation_info =
polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender())
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchValidatorGroups)?
.1;
if !state.per_session.contains_key(&session_index) {
let session_info = polkadot_node_subsystem_util::request_session_info(
new_relay_parent,
@@ -579,9 +608,49 @@ pub(crate) async fn handle_active_leaves_update<Context>(
let per_session = state
.per_session
.get(&session_index)
.get_mut(&session_index)
.expect("either existed or just inserted; qed");
if cleared_disabled_validators.insert(session_index) {
per_session.clear_disabled_validators();
}
if !disabled_validators.is_empty() {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?new_relay_parent,
?session_index,
?disabled_validators,
"Disabled validators detected"
);
per_session.extend_disabled_validators(disabled_validators);
}
if state.per_relay_parent.contains_key(&new_relay_parent) {
continue
}
// New leaf: fetch info from runtime API and initialize
// `per_relay_parent`.
let availability_cores = polkadot_node_subsystem_util::request_availability_cores(
new_relay_parent,
ctx.sender(),
)
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchAvailabilityCores)?;
let group_rotation_info =
polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender())
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchValidatorGroups)?
.1;
let local_validator = per_session.local_validator.and_then(|v| {
if let LocalValidatorIndex::Active(idx) = v {
find_active_validator_state(
@@ -1452,6 +1521,17 @@ async fn handle_incoming_statement<Context>(
},
};
if per_session.is_disabled(&statement.unchecked_validator_index()) {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
validator_index = ?statement.unchecked_validator_index(),
"Ignoring a statement from disabled validator."
);
modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
return
}
let (active, cluster_sender_index) = {
// This block of code only returns `Some` when both the originator and
// the sending peer are in the cluster.
@@ -1572,7 +1652,7 @@ async fn handle_incoming_statement<Context>(
checked_statement.clone(),
StatementOrigin::Remote,
) {
Err(statement_store::ValidatorUnknown) => {
Err(statement_store::Error::ValidatorUnknown) => {
// sanity: should never happen.
gum::warn!(
target: LOG_TARGET,
@@ -2110,7 +2190,7 @@ async fn handle_incoming_manifest_common<'a, Context>(
candidate_hash: CandidateHash,
relay_parent: Hash,
para_id: ParaId,
manifest_summary: grid::ManifestSummary,
mut manifest_summary: grid::ManifestSummary,
manifest_kind: grid::ManifestKind,
reputation: &mut ReputationAggregator,
) -> Option<ManifestImportSuccess<'a>> {
@@ -2195,6 +2275,12 @@ async fn handle_incoming_manifest_common<'a, Context>(
// 2. sanity checks: peer is validator, bitvec size, import into grid tracker
let group_index = manifest_summary.claimed_group_index;
let claimed_parent_hash = manifest_summary.claimed_parent_hash;
// Ignore votes from disabled validators when counting towards the threshold.
let disabled_mask = per_session.disabled_bitmask(group_index).unwrap_or_default();
manifest_summary.statement_knowledge.mask_seconded(&disabled_mask);
manifest_summary.statement_knowledge.mask_valid(&disabled_mask);
let acknowledge = match local_validator.grid_tracker.import_manifest(
grid_topology,
&per_session.groups,
@@ -2770,6 +2856,13 @@ pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut St
}
}
// Add disabled validators to the unwanted mask.
let disabled_mask = per_session
.disabled_bitmask(group_index)
.expect("group existence checked above; qed");
unwanted_mask.seconded_in_group |= &disabled_mask;
unwanted_mask.validated_in_group |= &disabled_mask;
// don't require a backing threshold for cluster candidates.
let local_validator = relay_parent_state.local_validator.as_ref()?;
let require_backing = local_validator
@@ -2777,14 +2870,14 @@ pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut St
.as_ref()
.map_or(true, |active| active.group != group_index);
Some(RequestProperties {
unwanted_mask,
backing_threshold: if require_backing {
Some(per_session.groups.get_size_and_backing_threshold(group_index)?.1)
} else {
None
},
})
let backing_threshold = if require_backing {
let threshold = per_session.groups.get_size_and_backing_threshold(group_index)?.1;
Some(threshold)
} else {
None
};
Some(RequestProperties { unwanted_mask, backing_threshold })
};
while let Some(request) = state.request_manager.next_request(
@@ -2857,6 +2950,10 @@ pub(crate) async fn handle_response<Context>(
Some(g) => g,
};
let disabled_mask = per_session
.disabled_bitmask(group_index)
.expect("group_index checked above; qed");
let res = response.validate_response(
&mut state.request_manager,
group,
@@ -2871,6 +2968,7 @@ pub(crate) async fn handle_response<Context>(
Some(g_index) == expected_group
},
disabled_mask,
);
for (peer, rep) in res.reputation_changes {
@@ -2968,6 +3066,14 @@ pub(crate) async fn handle_response<Context>(
// includable.
}
/// Returns true if the statement filter meets the backing threshold for grid requests.
pub(crate) fn seconded_and_sufficient(
filter: &StatementFilter,
backing_threshold: Option<usize>,
) -> bool {
backing_threshold.map_or(true, |t| filter.has_seconded() && filter.backing_validators() >= t)
}
/// Answer an incoming request for a candidate.
pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
let ResponderMessage { request, sent_feedback } = message;
@@ -3008,11 +3114,13 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
Some(d) => d,
};
let group_size = per_session
let group_index = confirmed.group_index();
let group = per_session
.groups
.get(confirmed.group_index())
.expect("group from session's candidate always known; qed")
.len();
.get(group_index)
.expect("group from session's candidate always known; qed");
let group_size = group.len();
// check request bitfields are right size.
if mask.seconded_in_group.len() != group_size || mask.validated_in_group.len() != group_size {
@@ -3065,17 +3173,59 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
// Transform mask with 'OR' semantics into one with 'AND' semantics for the API used
// below.
let and_mask = StatementFilter {
let mut and_mask = StatementFilter {
seconded_in_group: !mask.seconded_in_group.clone(),
validated_in_group: !mask.validated_in_group.clone(),
};
// Ignore disabled validators from the latest state when sending the response.
let disabled_mask =
per_session.disabled_bitmask(group_index).expect("group existence checked; qed");
and_mask.mask_seconded(&disabled_mask);
and_mask.mask_valid(&disabled_mask);
let mut sent_filter = StatementFilter::blank(group_size);
let statements: Vec<_> = relay_parent_state
.statement_store
.group_statements(&per_session.groups, confirmed.group_index(), *candidate_hash, &and_mask)
.map(|s| s.as_unchecked().clone())
.group_statements(&per_session.groups, group_index, *candidate_hash, &and_mask)
.map(|s| {
let s = s.as_unchecked().clone();
let index_in_group = |v: ValidatorIndex| group.iter().position(|x| &v == x);
let Some(i) = index_in_group(s.unchecked_validator_index()) else { return s };
match s.unchecked_payload() {
CompactStatement::Seconded(_) => {
sent_filter.seconded_in_group.set(i, true);
},
CompactStatement::Valid(_) => {
sent_filter.validated_in_group.set(i, true);
},
}
s
})
.collect();
// There should be no response at all for grid requests when the
// backing threshold is no longer met as a result of disabled validators.
if !is_cluster {
let threshold = per_session
.groups
.get_size_and_backing_threshold(group_index)
.expect("group existence checked above; qed")
.1;
if !seconded_and_sufficient(&sent_filter, Some(threshold)) {
gum::info!(
target: LOG_TARGET,
?candidate_hash,
relay_parent = ?confirmed.relay_parent(),
?group_index,
"Dropping a request from a grid peer because the backing threshold is no longer met."
);
return
}
}
// Update bookkeeping about which statements peers have received.
for statement in &statements {
if is_cluster {