approval-voting improvement: include all tranche0 assignments in one certificate (#1178)

**_PR migrated from https://github.com/paritytech/polkadot/pull/6782_** 

This PR will upgrade the network protocol to version 3 -> VStaging which
will later be renamed to V3. This version introduces a new kind of
assignment certificate that will be used for tranche0 assignments.
Instead of issuing/importing one tranche0 assignment per candidate,
there will be just one certificate per relay chain block per validator.
However, we will not be sending out the new assignment certificates,
yet. So everything should work exactly as before. Once the majority of
the validators have been upgraded to the new protocol version we will
enable the new certificates (starting at a specific relay chain block)
with a new client update.

There are still a few things that need to be done:

- [x] Use bitfield instead of Vec<CandidateIndex>:
https://github.com/paritytech/polkadot/pull/6802
  - [x] Fix existing approval-distribution and approval-voting tests
  - [x] Fix bitfield-distribution and statement-distribution tests
  - [x] Fix network bridge tests
  - [x] Implement todos in the code
  - [x] Add tests to cover new code
  - [x] Update metrics
  - [x] Remove the approval distribution aggression levels: TBD PR
  - [x] Parachains DB migration 
  - [x] Test network protocol upgrade on Versi
  - [x] Versi Load test
  - [x] Add Zombienet test
  - [x] Documentation updates
- [x] Fix for sending DistributeAssignment for each candidate claimed by
a v2 assignment (warning: Importing locally an already known assignment)
 - [x]  Fix AcceptedDuplicate
 - [x] Fix DB migration so that we can still keep old data.
 - [x] Final Versi burn in

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
This commit is contained in:
Andrei Sandu
2023-11-06 15:21:32 +02:00
committed by GitHub
parent 4ac9c4a364
commit 0570b6fa9e
55 changed files with 5643 additions and 1799 deletions
+377 -143
View File
@@ -25,7 +25,11 @@ use jaeger::{hash_to_trace_identifier, PerLeafSpan};
use polkadot_node_jaeger as jaeger;
use polkadot_node_primitives::{
approval::{
BlockApprovalMeta, DelayTranche, IndirectAssignmentCert, IndirectSignedApprovalVote,
v1::{BlockApprovalMeta, DelayTranche, IndirectSignedApprovalVote},
v2::{
AssignmentCertKindV2, BitfieldError, CandidateBitfield, CoreBitfield,
IndirectAssignmentCertV2,
},
},
ValidationResult, DISPUTE_WINDOW,
};
@@ -76,12 +80,13 @@ use std::{
use schnellru::{ByLength, LruMap};
use approval_checking::RequiredTranches;
use bitvec::{order::Lsb0, vec::BitVec};
use criteria::{AssignmentCriteria, RealAssignmentCriteria};
use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry};
use time::{slot_number_to_tick, Clock, ClockExt, SystemClock, Tick};
mod approval_checking;
mod approval_db;
pub mod approval_db;
mod backend;
mod criteria;
mod import;
@@ -90,8 +95,9 @@ mod persisted_entries;
mod time;
use crate::{
approval_db::v1::{Config as DatabaseConfig, DbBackend},
approval_db::v2::{Config as DatabaseConfig, DbBackend},
backend::{Backend, OverlayedBackend},
criteria::InvalidAssignmentReason,
};
#[cfg(test)]
@@ -107,7 +113,7 @@ const APPROVAL_CACHE_SIZE: u32 = 1024;
const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
const APPROVAL_DELAY: Tick = 2;
const LOG_TARGET: &str = "parachain::approval-voting";
pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
/// Configuration for the approval voting subsystem
#[derive(Debug, Clone)]
@@ -377,8 +383,8 @@ impl ApprovalVotingSubsystem {
/// The operation is not allowed for blocks older than the last finalized one.
pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
let config =
approval_db::v1::Config { col_approval_data: self.db_config.col_approval_data };
let mut backend = approval_db::v1::DbBackend::new(self.db.clone(), config);
approval_db::v2::Config { col_approval_data: self.db_config.col_approval_data };
let mut backend = approval_db::v2::DbBackend::new(self.db.clone(), config);
let mut overlay = OverlayedBackend::new(&backend);
ops::revert_to(&mut overlay, hash)?;
@@ -754,15 +760,16 @@ enum Action {
tick: Tick,
},
LaunchApproval {
claimed_candidate_indices: CandidateBitfield,
candidate_hash: CandidateHash,
indirect_cert: IndirectAssignmentCert,
indirect_cert: IndirectAssignmentCertV2,
assignment_tranche: DelayTranche,
relay_block_hash: Hash,
candidate_index: CandidateIndex,
session: SessionIndex,
executor_params: ExecutorParams,
candidate: CandidateReceipt,
backing_group: GroupIndex,
distribute_assignment: bool,
},
NoteApprovedInChainSelection(Hash),
IssueApproval(CandidateHash, ApprovalVoteRequest),
@@ -977,15 +984,16 @@ async fn handle_actions<Context>(
actions_iter = next_actions.into_iter();
},
Action::LaunchApproval {
claimed_candidate_indices,
candidate_hash,
indirect_cert,
assignment_tranche,
relay_block_hash,
candidate_index,
session,
executor_params,
candidate,
backing_group,
distribute_assignment,
} => {
// Don't launch approval work if the node is syncing.
if let Mode::Syncing(_) = *mode {
@@ -1006,10 +1014,12 @@ async fn handle_actions<Context>(
launch_approval_span.add_string_tag("block-hash", format!("{:?}", block_hash));
let validator_index = indirect_cert.validator;
ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
indirect_cert,
candidate_index,
));
if distribute_assignment {
ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
indirect_cert,
claimed_candidate_indices,
));
}
match approvals_cache.get(&candidate_hash) {
Some(ApprovalOutcome::Approved) => {
@@ -1078,6 +1088,49 @@ async fn handle_actions<Context>(
Ok(conclude)
}
fn cores_to_candidate_indices(
core_indices: &CoreBitfield,
block_entry: &BlockEntry,
) -> Result<CandidateBitfield, BitfieldError> {
let mut candidate_indices = Vec::new();
// Map from core index to candidate index.
for claimed_core_index in core_indices.iter_ones() {
if let Some(candidate_index) = block_entry
.candidates()
.iter()
.position(|(core_index, _)| core_index.0 == claimed_core_index as u32)
{
candidate_indices.push(candidate_index as CandidateIndex);
}
}
CandidateBitfield::try_from(candidate_indices)
}
// Returns the claimed core bitfield from the assignment cert, the candidate hash and a
// `BlockEntry`. Can fail only for VRF Delay assignments for which we cannot find the candidate hash
// in the block entry which indicates a bug or corrupted storage.
fn get_assignment_core_indices(
assignment: &AssignmentCertKindV2,
candidate_hash: &CandidateHash,
block_entry: &BlockEntry,
) -> Option<CoreBitfield> {
match &assignment {
AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
Some(core_bitfield.clone()),
AssignmentCertKindV2::RelayVRFModulo { sample: _ } => block_entry
.candidates()
.iter()
.find(|(_core_index, h)| candidate_hash == h)
.map(|(core_index, _candidate_hash)| {
CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")
}),
AssignmentCertKindV2::RelayVRFDelay { core_index } =>
Some(CoreBitfield::try_from(vec![*core_index]).expect("Not an empty vec; qed")),
}
}
fn distribution_messages_for_activation(
db: &OverlayedBackend<'_, impl Backend>,
state: &State,
@@ -1142,33 +1195,95 @@ fn distribution_messages_for_activation(
match approval_entry.local_statements() {
(None, None) | (None, Some(_)) => {}, // second is impossible case.
(Some(assignment), None) => {
messages.push(ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCert {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
i as _,
));
if let Some(claimed_core_indices) = get_assignment_core_indices(
&assignment.cert().kind,
&candidate_hash,
&block_entry,
) {
match cores_to_candidate_indices(
&claimed_core_indices,
&block_entry,
) {
Ok(bitfield) => messages.push(
ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCertV2 {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
bitfield,
),
),
Err(err) => {
// Should never happen. If we fail here it means the
// assignment is null (no cores claimed).
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
},
}
} else {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Cannot get assignment claimed core indices",
);
}
},
(Some(assignment), Some(approval_sig)) => {
messages.push(ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCert {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
i as _,
));
if let Some(claimed_core_indices) = get_assignment_core_indices(
&assignment.cert().kind,
&candidate_hash,
&block_entry,
) {
match cores_to_candidate_indices(
&claimed_core_indices,
&block_entry,
) {
Ok(bitfield) => messages.push(
ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCertV2 {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
bitfield,
),
),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
?err,
"Failed to create assignment bitfield",
);
// If we didn't send assignment, we don't send approval.
continue
},
}
messages.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVote {
block_hash,
candidate_index: i as _,
validator: assignment.validator_index(),
signature: approval_sig,
},
))
messages.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVote {
block_hash,
candidate_index: i as _,
validator: assignment.validator_index(),
signature: approval_sig,
},
));
} else {
gum::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Cannot get assignment claimed core indices",
);
}
},
}
},
@@ -1288,14 +1403,14 @@ async fn handle_from_overseer<Context>(
vec![Action::Conclude]
},
FromOrchestra::Communication { msg } => match msg {
ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => {
ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_cores, res) => {
let (check_outcome, actions) = check_and_import_assignment(
ctx.sender(),
state,
db,
session_info_provider,
a,
claimed_core,
claimed_cores,
)
.await?;
let _ = res.send(check_outcome);
@@ -1465,7 +1580,6 @@ async fn handle_approved_ancestor<Context>(
let mut span = span
.child("handle-approved-ancestor")
.with_stage(jaeger::Stage::ApprovalChecking);
use bitvec::{order::Lsb0, vec::BitVec};
let mut all_approved_max = None;
@@ -1804,8 +1918,8 @@ async fn check_and_import_assignment<Sender>(
state: &State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
assignment: IndirectAssignmentCert,
candidate_index: CandidateIndex,
assignment: IndirectAssignmentCertV2,
candidate_indices: CandidateBitfield,
) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)>
where
Sender: SubsystemSender<RuntimeApiMessage>,
@@ -1818,9 +1932,12 @@ where
.map(|span| span.child("check-and-import-assignment"))
.unwrap_or_else(|| jaeger::Span::new(assignment.block_hash, "check-and-import-assignment"))
.with_relay_parent(assignment.block_hash)
.with_uint_tag("candidate-index", candidate_index as u64)
.with_stage(jaeger::Stage::ApprovalChecking);
for candidate_index in candidate_indices.iter_ones() {
check_and_import_assignment_span.add_uint_tag("candidate-index", candidate_index as u64);
}
let block_entry = match db.load_block_entry(&assignment.block_hash)? {
Some(b) => b,
None =>
@@ -1850,39 +1967,64 @@ where
)),
};
let (claimed_core_index, assigned_candidate_hash) =
match block_entry.candidate(candidate_index as usize) {
Some((c, h)) => (*c, *h),
let n_cores = session_info.n_cores as usize;
// Early check the candidate bitfield and core bitfields lengths < `n_cores`.
// Core bitfield length is checked later in `check_assignment_cert`.
if candidate_indices.len() > n_cores {
gum::debug!(
target: LOG_TARGET,
validator = assignment.validator.0,
n_cores,
candidate_bitfield_len = ?candidate_indices.len(),
"Oversized bitfield",
);
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidBitfield(
candidate_indices.len(),
)),
Vec::new(),
))
}
// The Compact VRF modulo assignment cert has multiple core assignments.
let mut backing_groups = Vec::new();
let mut claimed_core_indices = Vec::new();
let mut assigned_candidate_hashes = Vec::new();
for candidate_index in candidate_indices.iter_ones() {
let (claimed_core_index, assigned_candidate_hash) =
match block_entry.candidate(candidate_index) {
Some((c, h)) => (*c, *h),
None =>
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex(
candidate_index as _,
)),
Vec::new(),
)), // no candidate at core.
};
let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
Some(c) => c,
None =>
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex(
candidate_index,
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
candidate_index as _,
assigned_candidate_hash,
)),
Vec::new(),
)), // no candidate at core.
};
check_and_import_assignment_span
.add_string_tag("candidate-hash", format!("{:?}", assigned_candidate_hash));
check_and_import_assignment_span.add_string_tag(
"traceID",
format!("{:?}", jaeger::hash_to_trace_identifier(assigned_candidate_hash.0)),
);
check_and_import_assignment_span
.add_string_tag("candidate-hash", format!("{:?}", assigned_candidate_hash));
check_and_import_assignment_span.add_string_tag(
"traceID",
format!("{:?}", jaeger::hash_to_trace_identifier(assigned_candidate_hash.0)),
);
let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
Some(c) => c,
None =>
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
candidate_index,
assigned_candidate_hash,
)),
Vec::new(),
)),
};
let res = {
// import the assignment.
let approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
Some(a) => a,
None =>
@@ -1895,79 +2037,144 @@ where
)),
};
let res = state.assignment_criteria.check_assignment_cert(
claimed_core_index,
assignment.validator,
&criteria::Config::from(session_info),
block_entry.relay_vrf_story(),
&assignment.cert,
approval_entry.backing_group(),
);
backing_groups.push(approval_entry.backing_group());
claimed_core_indices.push(claimed_core_index);
assigned_candidate_hashes.push(assigned_candidate_hash);
}
let tranche = match res {
Err(crate::criteria::InvalidAssignment(reason)) =>
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
assignment.validator,
format!("{:?}", reason),
)),
Vec::new(),
// Error on null assignments.
if claimed_core_indices.is_empty() {
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
assignment.validator,
format!("{:?}", InvalidAssignmentReason::NullAssignment),
)),
Vec::new(),
))
}
// Check the assignment certificate.
let res = state.assignment_criteria.check_assignment_cert(
claimed_core_indices
.clone()
.try_into()
.expect("Checked for null assignment above; qed"),
assignment.validator,
&criteria::Config::from(session_info),
block_entry.relay_vrf_story(),
&assignment.cert,
backing_groups,
);
let tranche = match res {
Err(crate::criteria::InvalidAssignment(reason)) =>
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(
assignment.validator,
format!("{:?}", reason),
)),
Ok(tranche) => {
let current_tranche =
state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
Vec::new(),
)),
Ok(tranche) => {
let current_tranche =
state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
let too_far_in_future = current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;
let too_far_in_future = current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;
if tranche >= too_far_in_future {
return Ok((AssignmentCheckResult::TooFarInFuture, Vec::new()))
}
if tranche >= too_far_in_future {
return Ok((AssignmentCheckResult::TooFarInFuture, Vec::new()))
}
tranche
},
};
tranche
},
};
check_and_import_assignment_span.add_uint_tag("tranche", tranche as u64);
let mut actions = Vec::new();
let res = {
let mut is_duplicate = true;
// Import the assignments for all cores in the cert.
for (assigned_candidate_hash, candidate_index) in
assigned_candidate_hashes.iter().zip(candidate_indices.iter_ones())
{
let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
Some(c) => c,
None =>
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidate(
candidate_index as _,
*assigned_candidate_hash,
)),
Vec::new(),
)),
};
let is_duplicate = approval_entry.is_assigned(assignment.validator);
approval_entry.import_assignment(tranche, assignment.validator, tick_now);
let approval_entry = match candidate_entry.approval_entry_mut(&assignment.block_hash) {
Some(a) => a,
None =>
return Ok((
AssignmentCheckResult::Bad(AssignmentCheckError::Internal(
assignment.block_hash,
*assigned_candidate_hash,
)),
Vec::new(),
)),
};
is_duplicate &= approval_entry.is_assigned(assignment.validator);
approval_entry.import_assignment(tranche, assignment.validator, tick_now);
check_and_import_assignment_span.add_uint_tag("tranche", tranche as u64);
// We've imported a new assignment, so we need to schedule a wake-up for when that might
// no-show.
if let Some((approval_entry, status)) = state
.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
.await
{
actions.extend(schedule_wakeup_action(
approval_entry,
block_entry.block_hash(),
block_entry.block_number(),
*assigned_candidate_hash,
status.block_tick,
tick_now,
status.required_tranches,
));
}
// We also write the candidate entry as it now contains the new candidate.
db.write_candidate_entry(candidate_entry.into());
}
// Since we don't account for tranche in distribution message fingerprinting, some
// validators can be assigned to the same core (VRF modulo vs VRF delay). These can be
// safely ignored. However, if an assignment is for multiple cores (these are only
// tranche0), we cannot ignore it, because it would mean ignoring other non duplicate
// assignments.
if is_duplicate {
AssignmentCheckResult::AcceptedDuplicate
} else if candidate_indices.count_ones() > 1 {
gum::trace!(
target: LOG_TARGET,
validator = assignment.validator.0,
candidate_hashes = ?assigned_candidate_hashes,
assigned_cores = ?claimed_core_indices,
?tranche,
"Imported assignments for multiple cores.",
);
AssignmentCheckResult::Accepted
} else {
gum::trace!(
target: LOG_TARGET,
validator = assignment.validator.0,
candidate_hash = ?assigned_candidate_hash,
para_id = ?candidate_entry.candidate_receipt().descriptor.para_id,
"Imported assignment.",
candidate_hashes = ?assigned_candidate_hashes,
assigned_cores = ?claimed_core_indices,
"Imported assignment for a single core.",
);
AssignmentCheckResult::Accepted
}
};
let mut actions = Vec::new();
// We've imported a new approval, so we need to schedule a wake-up for when that might no-show.
if let Some((approval_entry, status)) = state
.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
.await
{
actions.extend(schedule_wakeup_action(
approval_entry,
block_entry.block_hash(),
block_entry.block_number(),
assigned_candidate_hash,
status.block_tick,
tick_now,
status.required_tranches,
));
}
// We also write the candidate entry as it now contains the new candidate.
db.write_candidate_entry(candidate_entry.into());
Ok((res, actions))
}
@@ -2341,7 +2548,7 @@ async fn process_wakeup<Context>(
let candidate_entry = db.load_candidate_entry(&candidate_hash)?;
// If either is not present, we have nothing to wakeup. Might have lost a race with finality
let (block_entry, mut candidate_entry) = match (block_entry, candidate_entry) {
let (mut block_entry, mut candidate_entry) = match (block_entry, candidate_entry) {
(Some(b), Some(c)) => (b, c),
_ => return Ok(Vec::new()),
};
@@ -2422,32 +2629,59 @@ async fn process_wakeup<Context>(
if let Some((cert, val_index, tranche)) = maybe_cert {
let indirect_cert =
IndirectAssignmentCert { block_hash: relay_block, validator: val_index, cert };
IndirectAssignmentCertV2 { block_hash: relay_block, validator: val_index, cert };
let index_in_candidate =
block_entry.candidates().iter().position(|(_, h)| &candidate_hash == h);
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
para_id = ?candidate_receipt.descriptor.para_id,
block_hash = ?relay_block,
"Launching approval work.",
);
if let Some(i) = index_in_candidate {
gum::trace!(
if let Some(claimed_core_indices) =
get_assignment_core_indices(&indirect_cert.cert.kind, &candidate_hash, &block_entry)
{
match cores_to_candidate_indices(&claimed_core_indices, &block_entry) {
Ok(claimed_candidate_indices) => {
// Ensure we distribute multiple core assignments just once.
let distribute_assignment = if claimed_candidate_indices.count_ones() > 1 {
!block_entry.mark_assignment_distributed(claimed_candidate_indices.clone())
} else {
true
};
db.write_block_entry(block_entry.clone());
actions.push(Action::LaunchApproval {
claimed_candidate_indices,
candidate_hash,
indirect_cert,
assignment_tranche: tranche,
relay_block_hash: relay_block,
session: block_entry.session(),
executor_params: executor_params.clone(),
candidate: candidate_receipt,
backing_group,
distribute_assignment,
});
},
Err(err) => {
// Never happens, it should only happen if no cores are claimed, which is a bug.
gum::warn!(
target: LOG_TARGET,
block_hash = ?relay_block,
?err,
"Failed to create assignment bitfield"
);
},
};
} else {
gum::warn!(
target: LOG_TARGET,
?candidate_hash,
para_id = ?candidate_receipt.descriptor.para_id,
block_hash = ?relay_block,
"Launching approval work.",
?candidate_hash,
"Cannot get assignment claimed core indices",
);
// sanity: should always be present.
actions.push(Action::LaunchApproval {
candidate_hash,
indirect_cert,
assignment_tranche: tranche,
relay_block_hash: relay_block,
candidate_index: i as _,
session: block_entry.session(),
executor_params: executor_params.clone(),
candidate: candidate_receipt,
backing_group,
});
}
}
// Although we checked approval earlier in this function,