Approval Voting improvements (#2781)

* extract database from av-store itself

* generalize approval-voting over database type

* modes (without handling) and pruning old wakeups

* rework approval importing

* add our_approval_sig to ApprovalEntry

* import assignment

* guide updates for check-full-approval changes

* some aux functions

* send messages when becoming active.

* guide: network bridge sends view updates only when done syncing

* network bridge: send view updates only when done syncing

* tests for new network-bridge behavior

* add a test for updating approval entry with sig

* fix some warnings

* test load-all-blocks

* instantiate new parachains DB

* fix network-bridge empty view updates

* tweak

* fix wasm build, i think

* Update node/core/approval-voting/src/lib.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* add some versioning to parachains_db

* warnings

* fix merge changes

* remove versioning again

Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Robert Habermeier
2021-04-01 19:33:52 +02:00
committed by GitHub
parent 01badafba6
commit 57b56770e0
20 changed files with 1593 additions and 701 deletions
+430 -166
View File
@@ -43,10 +43,12 @@ use polkadot_primitives::v1::{
use polkadot_node_primitives::{ValidationResult, PoV};
use polkadot_node_primitives::approval::{
IndirectAssignmentCert, IndirectSignedApprovalVote, ApprovalVote, DelayTranche,
BlockApprovalMeta,
};
use polkadot_node_jaeger as jaeger;
use parity_scale_codec::Encode;
use sc_keystore::LocalKeystore;
use sp_consensus::SyncOracle;
use sp_consensus_slots::Slot;
use sp_runtime::traits::AppVerify;
use sp_application_crypto::Pair;
@@ -56,7 +58,7 @@ use futures::prelude::*;
use futures::future::RemoteHandle;
use futures::channel::{mpsc, oneshot};
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::btree_map::Entry;
use std::sync::Arc;
@@ -72,6 +74,8 @@ mod import;
mod time;
mod persisted_entries;
use crate::approval_db::v1::Config as DatabaseConfig;
#[cfg(test)]
mod tests;
@@ -79,25 +83,39 @@ const APPROVAL_SESSIONS: SessionIndex = 6;
const LOG_TARGET: &str = "parachain::approval-voting";
/// Configuration for the approval voting subsystem
#[derive(Debug, Clone)]
pub struct Config {
/// The path where the approval-voting DB should be kept. This directory is completely removed when starting
/// the service.
pub path: std::path::PathBuf,
/// The cache size, in bytes, to spend on approval checking metadata.
pub cache_size: Option<usize>,
/// The column family in the DB where approval-voting data is stored.
pub col_data: u32,
/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
/// divisible by 500.
pub slot_duration_millis: u64,
}
// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first
// starts, and then once it's reached the head of the chain it should move into the `Active` mode.
//
// In `Active` mode, the node is an active participant in the approvals protocol. When syncing,
// the node follows the new incoming blocks and finalized number, but does not yet participate.
//
// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution`
// subsystem of all unfinalized blocks and the candidates included within them, as well as all
// votes that the local node itself has cast on candidates within those blocks.
enum Mode {
Active,
Syncing(Box<dyn SyncOracle + Send>),
}
/// The approval voting subsystem.
pub struct ApprovalVotingSubsystem {
/// LocalKeystore is needed for assignment keys, but not necessarily approval keys.
///
/// We do a lot of VRF signing and need the keys to have low latency.
keystore: Arc<LocalKeystore>,
db_config: DatabaseConfig,
slot_duration_millis: u64,
db: Arc<dyn KeyValueDB>,
mode: Mode,
metrics: Metrics,
}
@@ -239,27 +257,24 @@ impl metrics::Metrics for Metrics {
}
impl ApprovalVotingSubsystem {
/// Create a new approval voting subsystem with the given keystore and config,
/// which creates a DB at the given path. This function will delete the directory
/// at the given path if it already exists.
/// Create a new approval voting subsystem with the given keystore, config, and database.
pub fn with_config(
config: Config,
db: Arc<dyn KeyValueDB>,
keystore: Arc<LocalKeystore>,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
) -> std::io::Result<Self> {
const DEFAULT_CACHE_SIZE: usize = 100 * 1024 * 1024; // 100MiB default should be fine unless finality stalls.
let db = approval_db::v1::clear_and_recreate(
&config.path,
config.cache_size.unwrap_or(DEFAULT_CACHE_SIZE),
)?;
Ok(ApprovalVotingSubsystem {
) -> Self {
ApprovalVotingSubsystem {
keystore,
slot_duration_millis: config.slot_duration_millis,
db,
db_config: DatabaseConfig {
col_data: config.col_data,
},
mode: Mode::Syncing(sync_oracle),
metrics,
})
}
}
}
@@ -305,6 +320,7 @@ struct Wakeups {
// Tick -> [(Relay Block, Candidate Hash)]
wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
}
impl Wakeups {
@@ -313,9 +329,19 @@ impl Wakeups {
self.wakeups.keys().next().map(|t| *t)
}
fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
self.block_numbers.entry(block_number).or_default().insert(block_hash);
}
// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
// for these values. replaces any later wakeup.
fn schedule(&mut self, block_hash: Hash, candidate_hash: CandidateHash, tick: Tick) {
fn schedule(
&mut self,
block_hash: Hash,
block_number: BlockNumber,
candidate_hash: CandidateHash,
tick: Tick,
) {
if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
if prev <= &tick { return }
@@ -331,12 +357,42 @@ impl Wakeups {
let _ = entry.remove_entry();
}
}
} else {
self.note_block(block_hash, block_number);
}
self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
}
fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
let after = self.block_numbers.split_off(&(finalized_number + 1));
let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
.into_iter()
.flat_map(|(_number, hashes)| hashes)
.collect();
let mut pruned_wakeups = BTreeMap::new();
self.reverse_wakeups.retain(|&(ref h, ref c_h), tick| {
let live = !pruned_blocks.contains(h);
if !live {
pruned_wakeups.entry(*tick)
.or_insert_with(HashSet::new)
.insert((*h, *c_h));
}
live
});
for (tick, pruned) in pruned_wakeups {
if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) {
entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
if entry.get().is_empty() {
let _ = entry.remove();
}
}
}
}
// Get the wakeup for a particular block/candidate combo, if any.
fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
@@ -379,30 +435,40 @@ trait DBReader {
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>>;
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>>;
}
// This is a submodule to enforce opacity of the inner DB type.
mod approval_db_v1_reader {
use super::{
DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry,
Arc, SubsystemResult, SubsystemError, approval_db,
SubsystemResult, SubsystemError, DatabaseConfig, approval_db,
};
/// A DB reader that uses the approval-db V1 under the hood.
pub(super) struct ApprovalDBV1Reader<T: ?Sized>(Arc<T>);
pub(super) struct ApprovalDBV1Reader<T> {
inner: T,
config: DatabaseConfig,
}
impl<T: ?Sized> From<Arc<T>> for ApprovalDBV1Reader<T> {
fn from(a: Arc<T>) -> Self {
ApprovalDBV1Reader(a)
impl<T> ApprovalDBV1Reader<T> {
pub(super) fn new(inner: T, config: DatabaseConfig) -> Self {
ApprovalDBV1Reader {
inner,
config,
}
}
}
impl DBReader for ApprovalDBV1Reader<dyn KeyValueDB> {
impl<'a, T: 'a> DBReader for ApprovalDBV1Reader<T>
where T: std::ops::Deref<Target=(dyn KeyValueDB + 'a)>
{
fn load_block_entry(
&self,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>> {
approval_db::v1::load_block_entry(&*self.0, block_hash)
approval_db::v1::load_block_entry(&*self.inner, &self.config, block_hash)
.map(|e| e.map(Into::into))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
@@ -411,14 +477,25 @@ mod approval_db_v1_reader {
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>> {
approval_db::v1::load_candidate_entry(&*self.0, candidate_hash)
approval_db::v1::load_candidate_entry(&*self.inner, &self.config, candidate_hash)
.map(|e| e.map(Into::into))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>> {
approval_db::v1::load_all_blocks(&*self.inner, &self.config)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
}
}
use approval_db_v1_reader::ApprovalDBV1Reader;
struct ApprovalStatus {
required_tranches: RequiredTranches,
tranche_now: DelayTranche,
block_tick: Tick,
}
struct State<T> {
session_window: import::RollingSessionWindow,
keystore: Arc<LocalKeystore>,
@@ -432,12 +509,59 @@ impl<T> State<T> {
fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
self.session_window.session_info(i)
}
// Compute the required tranches for approval for this block and candidate combo.
// Fails if there is no approval entry for the block under the candidate or no candidate entry
// under the block, or if the session is out of bounds.
fn approval_status<'a, 'b>(
&'a self,
block_entry: &'a BlockEntry,
candidate_entry: &'b CandidateEntry,
) -> Option<(&'b ApprovalEntry, ApprovalStatus)> {
let session_info = match self.session_info(block_entry.session()) {
Some(s) => s,
None => {
tracing::warn!(target: LOG_TARGET, "Unknown session info for {}", block_entry.session());
return None;
}
};
let block_hash = block_entry.block_hash();
let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
let no_show_duration = slot_number_to_tick(
self.slot_duration_millis,
Slot::from(u64::from(session_info.no_show_slots)),
);
if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
let required_tranches = approval_checking::tranches_to_approve(
approval_entry,
candidate_entry.approvals(),
tranche_now,
block_tick,
no_show_duration,
session_info.needed_approvals as _
);
let status = ApprovalStatus {
required_tranches,
block_tick,
tranche_now,
};
Some((approval_entry, status))
} else {
None
}
}
}
#[derive(Debug)]
enum Action {
ScheduleWakeup {
block_hash: Hash,
block_number: BlockNumber,
candidate_hash: CandidateHash,
tick: Tick,
},
@@ -451,6 +575,7 @@ enum Action {
candidate: CandidateReceipt,
backing_group: GroupIndex,
},
BecomeActive,
Conclude,
}
@@ -458,7 +583,7 @@ type BackgroundTaskMap = BTreeMap<BlockNumber, Vec<RemoteHandle<()>>>;
async fn run<C>(
mut ctx: C,
subsystem: ApprovalVotingSubsystem,
mut subsystem: ApprovalVotingSubsystem,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
) -> SubsystemResult<()>
@@ -469,7 +594,7 @@ async fn run<C>(
session_window: Default::default(),
keystore: subsystem.keystore,
slot_duration_millis: subsystem.slot_duration_millis,
db: ApprovalDBV1Reader::from(subsystem.db.clone()),
db: ApprovalDBV1Reader::new(subsystem.db.clone(), subsystem.db_config.clone()),
clock,
assignment_criteria,
};
@@ -496,20 +621,28 @@ async fn run<C>(
)?
}
next_msg = ctx.recv().fuse() => {
let actions = handle_from_overseer(
let mut actions = handle_from_overseer(
&mut ctx,
&mut state,
&subsystem.metrics,
db_writer,
subsystem.db_config,
next_msg?,
&mut last_finalized_height,
&wakeups,
&mut wakeups,
).await?;
if let Some(finalized_height) = last_finalized_height {
cleanup_background_tasks(finalized_height, &mut background_tasks);
}
if let Mode::Syncing(ref mut oracle) = subsystem.mode {
if !oracle.is_major_syncing() {
// note that we're active before processing other actions.
actions.insert(0, Action::BecomeActive)
}
}
actions
}
background_request = background_rx.next().fuse() => {
@@ -531,8 +664,10 @@ async fn run<C>(
&subsystem.metrics,
&mut wakeups,
db_writer,
subsystem.db_config,
&background_tx,
&mut background_tasks,
&mut subsystem.mode,
actions,
).await? {
break;
@@ -548,20 +683,25 @@ async fn handle_actions(
metrics: &Metrics,
wakeups: &mut Wakeups,
db: &dyn KeyValueDB,
db_config: DatabaseConfig,
background_tx: &mpsc::Sender<BackgroundRequest>,
background_tasks: &mut BackgroundTaskMap,
mode: &mut Mode,
actions: impl IntoIterator<Item = Action>,
) -> SubsystemResult<bool> {
let mut transaction = approval_db::v1::Transaction::default();
let mut transaction = approval_db::v1::Transaction::new(db_config);
let mut conclude = false;
for action in actions {
match action {
Action::ScheduleWakeup {
block_hash,
block_number,
candidate_hash,
tick,
} => wakeups.schedule(block_hash, candidate_hash, tick),
} => {
wakeups.schedule(block_hash, block_number, candidate_hash, tick)
}
Action::WriteBlockEntry(block_entry) => {
transaction.put_block_entry(block_entry.into());
}
@@ -576,6 +716,9 @@ async fn handle_actions(
candidate,
backing_group,
} => {
// Don't launch approval work if the node is syncing.
if let Mode::Syncing(_) = *mode { continue }
metrics.on_assignment_produced();
let block_hash = indirect_cert.block_hash;
let validator_index = indirect_cert.validator;
@@ -600,6 +743,15 @@ async fn handle_actions(
background_tasks.entry(relay_block_number).or_default().push(handle);
}
}
Action::BecomeActive => {
*mode = Mode::Active;
let messages = distribution_messages_for_activation(
ApprovalDBV1Reader::new(db, db_config)
)?;
ctx.send_messages(messages.into_iter().map(Into::into)).await;
}
Action::Conclude => { conclude = true; }
}
}
@@ -627,15 +779,113 @@ fn cleanup_background_tasks(
// the task on drop.
}
fn distribution_messages_for_activation<'a>(
db: impl DBReader + 'a,
) -> SubsystemResult<Vec<ApprovalDistributionMessage>> {
let all_blocks = db.load_all_blocks()?;
let mut approval_meta = Vec::with_capacity(all_blocks.len());
let mut messages = Vec::new();
messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
for block_hash in all_blocks {
let block_entry = match db.load_block_entry(&block_hash)? {
Some(b) => b,
None => {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
"Missing block entry",
);
continue
}
};
approval_meta.push(BlockApprovalMeta {
hash: block_hash,
number: block_entry.block_number(),
parent_hash: block_entry.parent_hash(),
candidates: block_entry.candidates().iter().map(|(_, c_hash)| *c_hash).collect(),
slot: block_entry.slot(),
});
for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() {
let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
Some(c) => c,
None => {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Missing candidate entry",
);
continue
}
};
match candidate_entry.approval_entry(&block_hash) {
Some(approval_entry) => {
match approval_entry.local_statements() {
(None, None) | (None, Some(_)) => {}, // second is impossible case.
(Some(assignment), None) => {
messages.push(ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCert {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
i as _,
));
}
(Some(assignment), Some(approval_sig)) => {
messages.push(ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCert {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
i as _,
));
messages.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVote {
block_hash,
candidate_index: i as _,
validator: assignment.validator_index(),
signature: approval_sig,
}
))
}
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Missing approval entry",
);
}
}
}
}
messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
Ok(messages)
}
// Handle an incoming signal from the overseer. Returns true if execution should conclude.
async fn handle_from_overseer(
ctx: &mut impl SubsystemContext,
state: &mut State<impl DBReader>,
metrics: &Metrics,
db_writer: &dyn KeyValueDB,
db_config: DatabaseConfig,
x: FromOverseer<ApprovalVotingMessage>,
last_finalized_height: &mut Option<BlockNumber>,
wakeups: &Wakeups,
wakeups: &mut Wakeups,
) -> SubsystemResult<Vec<Action>> {
let actions = match x {
@@ -648,6 +898,7 @@ async fn handle_from_overseer(
ctx,
state,
db_writer,
db_config,
head,
&*last_finalized_height,
).await {
@@ -685,6 +936,7 @@ async fn handle_from_overseer(
// and approvals which trigger rescheduling.
actions.push(Action::ScheduleWakeup {
block_hash: block_batch.block_hash,
block_number: block_batch.block_number,
candidate_hash: c_hash,
tick,
});
@@ -700,9 +952,11 @@ async fn handle_from_overseer(
FromOverseer::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
*last_finalized_height = Some(block_number);
approval_db::v1::canonicalize(db_writer, block_number, block_hash)
approval_db::v1::canonicalize(db_writer, &db_config, block_number, block_hash)
.map_err(|e| SubsystemError::with_origin("db", e))?;
wakeups.prune_finalized_wakeups(block_number);
Vec::new()
}
FromOverseer::Signal(OverseerSignal::Conclude) => {
@@ -711,7 +965,7 @@ async fn handle_from_overseer(
FromOverseer::Communication { msg } => match msg {
ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => {
let (check_outcome, actions)
= check_and_import_assignment(state, metrics, a, claimed_core)?;
= check_and_import_assignment(state, a, claimed_core)?;
let _ = res.send(check_outcome);
actions
}
@@ -996,6 +1250,7 @@ fn min_prefer_some<T: std::cmp::Ord>(
fn schedule_wakeup_action(
approval_entry: &ApprovalEntry,
block_hash: Hash,
block_number: BlockNumber,
candidate_hash: CandidateHash,
block_tick: Tick,
required_tranches: RequiredTranches,
@@ -1005,6 +1260,7 @@ fn schedule_wakeup_action(
RequiredTranches::All => None,
RequiredTranches::Exact { next_no_show, .. } => next_no_show.map(|tick| Action::ScheduleWakeup {
block_hash,
block_number,
candidate_hash,
tick,
}),
@@ -1032,7 +1288,12 @@ fn schedule_wakeup_action(
};
min_prefer_some(next_non_empty_tranche, next_no_show)
.map(|tick| Action::ScheduleWakeup { block_hash, candidate_hash, tick })
.map(|tick| Action::ScheduleWakeup {
block_hash,
block_number,
candidate_hash,
tick,
})
}
};
@@ -1060,7 +1321,6 @@ fn schedule_wakeup_action(
fn check_and_import_assignment(
state: &State<impl DBReader>,
metrics: &Metrics,
assignment: IndirectAssignmentCert,
candidate_index: CandidateIndex,
) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)> {
@@ -1155,24 +1415,22 @@ fn check_and_import_assignment(
}
};
// We check for approvals here because we may be late in seeing a block containing a
// candidate for which we have already seen approvals by the same validator.
//
// For these candidates, we will receive the assignments potentially after a corresponding
// approval, and so we must check for approval here.
//
// Note that this already produces actions for writing
// the candidate entry and any modified block entries to disk.
//
// It also produces actions to schedule wakeups for the candidate.
let actions = check_and_apply_full_approval(
state,
&metrics,
Some((assignment.block_hash, block_entry)),
assigned_candidate_hash,
candidate_entry,
|h, _| h == &assignment.block_hash,
)?;
let mut actions = Vec::new();
// We've imported a new approval, so we need to schedule a wake-up for when that might no-show.
if let Some((approval_entry, status)) = state.approval_status(&block_entry, &candidate_entry) {
actions.extend(schedule_wakeup_action(
approval_entry,
block_entry.block_hash(),
block_entry.block_number(),
assigned_candidate_hash,
status.block_tick,
status.required_tranches,
));
}
// We also write the candidate entry as it now contains the new candidate.
actions.push(Action::WriteCandidateEntry(assigned_candidate_hash, candidate_entry));
Ok((res, actions))
}
@@ -1259,116 +1517,89 @@ fn check_and_import_approval<T>(
let actions = import_checked_approval(
state,
&metrics,
Some((approval.block_hash, block_entry)),
block_entry,
approved_candidate_hash,
candidate_entry,
approval.validator,
)?;
ApprovalSource::Remote(approval.validator),
);
Ok((actions, t))
}
enum ApprovalSource {
Remote(ValidatorIndex),
Local(ValidatorIndex, ValidatorSignature),
}
impl ApprovalSource {
fn validator_index(&self) -> ValidatorIndex {
match *self {
ApprovalSource::Remote(v) | ApprovalSource::Local(v, _) => v,
}
}
fn is_remote(&self) -> bool {
match *self {
ApprovalSource::Remote(_) => true,
ApprovalSource::Local(_, _) => false,
}
}
}
// Import an approval vote which is already checked to be valid and corresponding to an assigned
// validator on the candidate and block. This updates the block entry and candidate entry as
// necessary and schedules any further wakeups.
fn import_checked_approval(
state: &State<impl DBReader>,
metrics: &Metrics,
already_loaded: Option<(Hash, BlockEntry)>,
mut block_entry: BlockEntry,
candidate_hash: CandidateHash,
mut candidate_entry: CandidateEntry,
validator: ValidatorIndex,
) -> SubsystemResult<Vec<Action>> {
if candidate_entry.mark_approval(validator) {
// already approved - nothing to do here.
return Ok(Vec::new());
source: ApprovalSource,
) -> Vec<Action> {
let validator_index = source.validator_index();
let already_approved_by = candidate_entry.mark_approval(validator_index);
let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash);
// Check for early exits.
//
// If the candidate was approved
// but not the block, it means that we still need more approvals for the candidate under the
// block.
//
// If the block was approved, but the validator hadn't approved it yet, we should still hold
// onto the approval vote on-disk in case we restart and rebroadcast votes. Otherwise, our
// assignment might manifest as a no-show.
match source {
ApprovalSource::Remote(_) => {
// We don't store remote votes, so we can early exit as long at the candidate is
// already concluded under the block i.e. we don't need more approvals.
if candidate_approved_in_block {
return Vec::new();
}
}
ApprovalSource::Local(_, _) => {
// We never early return on the local validator.
}
}
// Check if this approval vote alters the approval state of any blocks.
//
// This may include blocks beyond the already loaded block.
let actions = check_and_apply_full_approval(
state,
metrics,
already_loaded,
candidate_hash,
candidate_entry,
|_, a| a.is_assigned(validator),
)?;
Ok(actions)
}
// Checks the candidate for full approval under all blocks matching the given filter.
//
// If returning without error, is guaranteed to have produced actions
// to write all modified block entries. It also schedules wakeups for
// the candidate under any blocks filtered.
fn check_and_apply_full_approval(
state: &State<impl DBReader>,
metrics: &Metrics,
mut already_loaded: Option<(Hash, BlockEntry)>,
candidate_hash: CandidateHash,
mut candidate_entry: CandidateEntry,
filter: impl Fn(&Hash, &ApprovalEntry) -> bool,
) -> SubsystemResult<Vec<Action>> {
// We only query this max once per hash.
let db = &state.db;
let mut load_block_entry = move |block_hash| -> SubsystemResult<Option<BlockEntry>> {
if already_loaded.as_ref().map_or(false, |(h, _)| h == block_hash) {
Ok(already_loaded.take().map(|(_, c)| c))
} else {
db.load_block_entry(block_hash)
}
};
let mut newly_approved = Vec::new();
let mut actions = Vec::new();
for (block_hash, approval_entry) in candidate_entry.iter_approval_entries()
.into_iter()
.filter(|(h, a)| !a.is_approved() && filter(h, a))
let block_hash = block_entry.block_hash();
let block_number = block_entry.block_number();
let (is_approved, status) = if let Some((approval_entry, status))
= state.approval_status(&block_entry, &candidate_entry)
{
let mut block_entry = match load_block_entry(block_hash)? {
None => {
tracing::warn!(
target: LOG_TARGET,
"Missing block entry {} referenced by candidate {}",
block_hash,
candidate_hash,
);
continue
}
Some(b) => b,
};
let session_info = match state.session_info(block_entry.session()) {
Some(s) => s,
None => {
tracing::warn!(target: LOG_TARGET, "Unknown session info for {}", block_entry.session());
continue
}
};
let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot());
let no_show_duration = slot_number_to_tick(
state.slot_duration_millis,
Slot::from(u64::from(session_info.no_show_slots)),
);
let required_tranches = approval_checking::tranches_to_approve(
approval_entry,
candidate_entry.approvals(),
tranche_now,
block_tick,
no_show_duration,
session_info.needed_approvals as _
);
let check = approval_checking::check_approval(
&candidate_entry,
approval_entry,
required_tranches.clone(),
status.required_tranches.clone(),
);
if check.is_approved() {
let is_approved = check.is_approved();
if is_approved {
tracing::trace!(
target: LOG_TARGET,
?candidate_hash,
@@ -1378,42 +1609,74 @@ fn check_and_apply_full_approval(
let no_shows = check.known_no_shows();
let was_approved = block_entry.is_fully_approved();
newly_approved.push(*block_hash);
let was_block_approved = block_entry.is_fully_approved();
block_entry.mark_approved_by_hash(&candidate_hash);
let is_approved = block_entry.is_fully_approved();
let is_block_approved = block_entry.is_fully_approved();
if no_shows != 0 {
metrics.on_no_shows(no_shows);
}
metrics.on_candidate_approved(tranche_now as _);
metrics.on_candidate_approved(status.tranche_now as _);
if is_approved && !was_approved {
metrics.on_block_approved(tranche_now as _)
if is_block_approved && !was_block_approved {
metrics.on_block_approved(status.tranche_now as _);
}
actions.push(Action::WriteBlockEntry(block_entry));
}
(is_approved, status)
} else {
tracing::warn!(
target: LOG_TARGET,
?candidate_hash,
?block_hash,
?validator_index,
"No approval entry for approval under block",
);
return Vec::new();
};
{
let approval_entry = candidate_entry.approval_entry_mut(&block_hash)
.expect("Approval entry just fetched; qed");
let was_approved = approval_entry.is_approved();
let newly_approved = is_approved && !was_approved;
if is_approved {
approval_entry.mark_approved();
}
if let ApprovalSource::Local(_, ref sig) = source {
approval_entry.import_approval_sig(sig.clone());
}
actions.extend(schedule_wakeup_action(
&approval_entry,
*block_hash,
block_hash,
block_number,
candidate_hash,
block_tick,
required_tranches,
status.block_tick,
status.required_tranches,
));
}
for b in &newly_approved {
if let Some(a) = candidate_entry.approval_entry_mut(b) {
a.mark_approved();
// We have no need to write the candidate entry if
//
// 1. The source is remote, as we don't store anything new in the approval entry.
// 2. The candidate is not newly approved, as we haven't altered the approval entry's
// approved flag with `mark_approved` above.
// 3. The source had already approved the candidate, as we haven't altered the bitfield.
if !source.is_remote() || newly_approved || !already_approved_by {
// In all other cases, we need to write the candidate entry.
actions.push(Action::WriteCandidateEntry(candidate_hash, candidate_entry));
}
}
actions.push(Action::WriteCandidateEntry(candidate_hash, candidate_entry));
Ok(actions)
actions
}
fn should_trigger_assignment(
@@ -1592,6 +1855,7 @@ fn process_wakeup(
actions.extend(schedule_wakeup_action(
&approval_entry,
relay_block,
block_entry.block_number(),
candidate_hash,
block_tick,
tranches_to_approve,
@@ -1863,11 +2127,11 @@ fn issue_approval(
let actions = import_checked_approval(
state,
metrics,
Some((block_hash, block_entry)),
block_entry,
candidate_hash,
candidate_entry,
validator_index as _,
)?;
ApprovalSource::Local(validator_index as _, sig.clone()),
);
metrics.on_approval_produced();