mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 16:31:07 +00:00
approval-voting: processed wakeups can also update approval state (#3848)
* approval-voting: processed wakeups can also update approval state * fmt changes * reverse broken if condition * further correct condition * add test * fmt * Update node/core/approval-voting/src/lib.rs Co-authored-by: Andronik Ordian <write@reusable.software> Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
committed by
GitHub
parent
00b69f446c
commit
44d2e9adbd
@@ -694,6 +694,7 @@ where
|
||||
woken_block,
|
||||
woken_candidate,
|
||||
tick,
|
||||
&subsystem.metrics,
|
||||
)?
|
||||
}
|
||||
next_msg = ctx.recv().fuse() => {
|
||||
@@ -1710,14 +1711,14 @@ fn check_and_import_approval<T>(
|
||||
None
|
||||
};
|
||||
|
||||
let mut actions = import_checked_approval(
|
||||
let mut actions = advance_approval_state(
|
||||
state,
|
||||
db,
|
||||
&metrics,
|
||||
block_entry,
|
||||
approved_candidate_hash,
|
||||
candidate_entry,
|
||||
ApprovalSource::Remote(approval.validator),
|
||||
ApprovalStateTransition::RemoteApproval(approval.validator),
|
||||
);
|
||||
|
||||
actions.extend(inform_disputes_action);
|
||||
@@ -1725,41 +1726,46 @@ fn check_and_import_approval<T>(
|
||||
Ok((actions, t))
|
||||
}
|
||||
|
||||
enum ApprovalSource {
|
||||
Remote(ValidatorIndex),
|
||||
Local(ValidatorIndex, ValidatorSignature),
|
||||
#[derive(Debug)]
|
||||
enum ApprovalStateTransition {
|
||||
RemoteApproval(ValidatorIndex),
|
||||
LocalApproval(ValidatorIndex, ValidatorSignature),
|
||||
WakeupProcessed,
|
||||
}
|
||||
|
||||
impl ApprovalSource {
|
||||
fn validator_index(&self) -> ValidatorIndex {
|
||||
impl ApprovalStateTransition {
|
||||
fn validator_index(&self) -> Option<ValidatorIndex> {
|
||||
match *self {
|
||||
ApprovalSource::Remote(v) | ApprovalSource::Local(v, _) => v,
|
||||
ApprovalStateTransition::RemoteApproval(v) |
|
||||
ApprovalStateTransition::LocalApproval(v, _) => Some(v),
|
||||
ApprovalStateTransition::WakeupProcessed => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_remote(&self) -> bool {
|
||||
fn is_local_approval(&self) -> bool {
|
||||
match *self {
|
||||
ApprovalSource::Remote(_) => true,
|
||||
ApprovalSource::Local(_, _) => false,
|
||||
ApprovalStateTransition::RemoteApproval(_) => false,
|
||||
ApprovalStateTransition::LocalApproval(_, _) => true,
|
||||
ApprovalStateTransition::WakeupProcessed => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Import an approval vote which is already checked to be valid and corresponding to an assigned
|
||||
// validator on the candidate and block. This updates the block entry and candidate entry as
|
||||
// Advance the approval state, either by importing an approval vote which is already checked to be valid and corresponding to an assigned
|
||||
// validator on the candidate and block, or by noting that there are no further wakeups or tranches needed. This updates the block entry and candidate entry as
|
||||
// necessary and schedules any further wakeups.
|
||||
fn import_checked_approval(
|
||||
fn advance_approval_state(
|
||||
state: &State,
|
||||
db: &mut OverlayedBackend<'_, impl Backend>,
|
||||
metrics: &Metrics,
|
||||
mut block_entry: BlockEntry,
|
||||
candidate_hash: CandidateHash,
|
||||
mut candidate_entry: CandidateEntry,
|
||||
source: ApprovalSource,
|
||||
transition: ApprovalStateTransition,
|
||||
) -> Vec<Action> {
|
||||
let validator_index = source.validator_index();
|
||||
let validator_index = transition.validator_index();
|
||||
|
||||
let already_approved_by = candidate_entry.mark_approval(validator_index);
|
||||
let already_approved_by = validator_index.as_ref().map(|v| candidate_entry.mark_approval(*v));
|
||||
let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash);
|
||||
|
||||
// Check for early exits.
|
||||
@@ -1771,17 +1777,13 @@ fn import_checked_approval(
|
||||
// If the block was approved, but the validator hadn't approved it yet, we should still hold
|
||||
// onto the approval vote on-disk in case we restart and rebroadcast votes. Otherwise, our
|
||||
// assignment might manifest as a no-show.
|
||||
match source {
|
||||
ApprovalSource::Remote(_) => {
|
||||
// We don't store remote votes, so we can early exit as long at the candidate is
|
||||
// already concluded under the block i.e. we don't need more approvals.
|
||||
if candidate_approved_in_block {
|
||||
return Vec::new()
|
||||
}
|
||||
},
|
||||
ApprovalSource::Local(_, _) => {
|
||||
// We never early return on the local validator.
|
||||
},
|
||||
if !transition.is_local_approval() {
|
||||
// We don't store remote votes and there's nothing to store for processed wakeups,
|
||||
// so we can early exit as long at the candidate is already concluded under the
|
||||
// block i.e. we don't need more approvals.
|
||||
if candidate_approved_in_block {
|
||||
return Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
let mut actions = Vec::new();
|
||||
@@ -1852,7 +1854,7 @@ fn import_checked_approval(
|
||||
approval_entry.mark_approved();
|
||||
}
|
||||
|
||||
if let ApprovalSource::Local(_, ref sig) = source {
|
||||
if let ApprovalStateTransition::LocalApproval(_, ref sig) = transition {
|
||||
approval_entry.import_approval_sig(sig.clone());
|
||||
}
|
||||
|
||||
@@ -1865,13 +1867,15 @@ fn import_checked_approval(
|
||||
status.required_tranches,
|
||||
));
|
||||
|
||||
// We have no need to write the candidate entry if
|
||||
// We have no need to write the candidate entry if all of the following
|
||||
// is true:
|
||||
//
|
||||
// 1. The source is remote, as we don't store anything new in the approval entry.
|
||||
// 1. This is not a local approval, as we don't store anything new in the approval entry.
|
||||
// 2. The candidate is not newly approved, as we haven't altered the approval entry's
|
||||
// approved flag with `mark_approved` above.
|
||||
// 3. The source had already approved the candidate, as we haven't altered the bitfield.
|
||||
if !source.is_remote() || newly_approved || !already_approved_by {
|
||||
// 3. The approver, if any, had already approved the candidate, as we haven't altered the bitfield.
|
||||
if transition.is_local_approval() || newly_approved || !already_approved_by.unwrap_or(true)
|
||||
{
|
||||
// In all other cases, we need to write the candidate entry.
|
||||
db.write_candidate_entry(candidate_entry);
|
||||
}
|
||||
@@ -1918,6 +1922,7 @@ fn process_wakeup(
|
||||
relay_block: Hash,
|
||||
candidate_hash: CandidateHash,
|
||||
expected_tick: Tick,
|
||||
metrics: &Metrics,
|
||||
) -> SubsystemResult<Vec<Action>> {
|
||||
let _span = jaeger::Span::from_encodable(
|
||||
(relay_block, candidate_hash, expected_tick),
|
||||
@@ -2040,28 +2045,20 @@ fn process_wakeup(
|
||||
}
|
||||
}
|
||||
|
||||
let approval_entry = candidate_entry
|
||||
.approval_entry(&relay_block)
|
||||
.expect("this function returned earlier if not available; qed");
|
||||
|
||||
// Although we ran this earlier in the function, we need to run again because we might have
|
||||
// imported our own assignment, which could change things.
|
||||
let tranches_to_approve = approval_checking::tranches_to_approve(
|
||||
&approval_entry,
|
||||
candidate_entry.approvals(),
|
||||
tranche_now,
|
||||
block_tick,
|
||||
no_show_duration,
|
||||
session_info.needed_approvals as _,
|
||||
);
|
||||
|
||||
actions.extend(schedule_wakeup_action(
|
||||
&approval_entry,
|
||||
relay_block,
|
||||
block_entry.block_number(),
|
||||
// Although we checked approval earlier in this function,
|
||||
// this wakeup might have advanced the state to approved via
|
||||
// a no-show that was immediately covered and therefore
|
||||
// we need to check for that and advance the state on-disk.
|
||||
//
|
||||
// Note that this function also schedules a wakeup as necessary.
|
||||
actions.extend(advance_approval_state(
|
||||
state,
|
||||
db,
|
||||
metrics,
|
||||
block_entry,
|
||||
candidate_hash,
|
||||
block_tick,
|
||||
tranches_to_approve,
|
||||
candidate_entry,
|
||||
ApprovalStateTransition::WakeupProcessed,
|
||||
));
|
||||
|
||||
Ok(actions)
|
||||
@@ -2436,14 +2433,14 @@ async fn issue_approval(
|
||||
None
|
||||
};
|
||||
|
||||
let mut actions = import_checked_approval(
|
||||
let mut actions = advance_approval_state(
|
||||
state,
|
||||
db,
|
||||
metrics,
|
||||
block_entry,
|
||||
candidate_hash,
|
||||
candidate_entry,
|
||||
ApprovalSource::Local(validator_index as _, sig.clone()),
|
||||
ApprovalStateTransition::LocalApproval(validator_index as _, sig.clone()),
|
||||
);
|
||||
|
||||
metrics.on_approval_produced();
|
||||
|
||||
@@ -2555,3 +2555,197 @@ fn subsystem_assignment_not_triggered_if_at_maximum_but_clock_is_before_with_dri
|
||||
should_be_triggered: |_| false,
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pre_covers_dont_stall_approval() {
|
||||
// A, B are tranche 0.
|
||||
// C is tranche 1.
|
||||
//
|
||||
// All assignments imported at once, and B, C approvals imported immediately.
|
||||
// A no-shows, leading to being covered by C.
|
||||
// Technically, this is an approved block, but it will be approved
|
||||
// when the no-show timer hits, not as a response to an approval vote.
|
||||
//
|
||||
// Note that we have 6 validators, otherwise the 2nd approval triggers
|
||||
// the >1/3 insta-approval condition.
|
||||
|
||||
let assignment_criteria = Box::new(MockAssignmentCriteria::check_only(
|
||||
move |validator_index| match validator_index {
|
||||
ValidatorIndex(0 | 1) => Ok(0),
|
||||
ValidatorIndex(2) => Ok(1),
|
||||
ValidatorIndex(_) => Err(criteria::InvalidAssignment),
|
||||
},
|
||||
));
|
||||
|
||||
let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build();
|
||||
let store = config.backend();
|
||||
test_harness(config, |test_harness| async move {
|
||||
let TestHarness {
|
||||
mut virtual_overseer,
|
||||
clock,
|
||||
sync_oracle_handle: _sync_oracle_handle,
|
||||
..
|
||||
} = test_harness;
|
||||
|
||||
let block_hash = Hash::repeat_byte(0x01);
|
||||
let validator_index_a = ValidatorIndex(0);
|
||||
let validator_index_b = ValidatorIndex(1);
|
||||
let validator_index_c = ValidatorIndex(2);
|
||||
|
||||
let validators = vec![
|
||||
Sr25519Keyring::Alice,
|
||||
Sr25519Keyring::Bob,
|
||||
Sr25519Keyring::Charlie,
|
||||
Sr25519Keyring::Dave,
|
||||
Sr25519Keyring::Eve,
|
||||
Sr25519Keyring::One,
|
||||
];
|
||||
let session_info = SessionInfo {
|
||||
validators: validators.iter().map(|v| v.public().into()).collect(),
|
||||
validator_groups: vec![
|
||||
vec![ValidatorIndex(0), ValidatorIndex(1)],
|
||||
vec![ValidatorIndex(2), ValidatorIndex(5)],
|
||||
vec![ValidatorIndex(3), ValidatorIndex(4)],
|
||||
],
|
||||
needed_approvals: 2,
|
||||
discovery_keys: validators.iter().map(|v| v.public().into()).collect(),
|
||||
assignment_keys: validators.iter().map(|v| v.public().into()).collect(),
|
||||
n_cores: validators.len() as _,
|
||||
zeroth_delay_tranche_width: 5,
|
||||
relay_vrf_modulo_samples: 3,
|
||||
n_delay_tranches: 50,
|
||||
no_show_slots: 2,
|
||||
};
|
||||
|
||||
let candidate_descriptor = make_candidate(1.into(), &block_hash);
|
||||
let candidate_hash = candidate_descriptor.hash();
|
||||
|
||||
let head: Hash = ChainBuilder::GENESIS_HASH;
|
||||
let mut builder = ChainBuilder::new();
|
||||
let slot = Slot::from(1 as u64);
|
||||
builder.add_block(
|
||||
block_hash,
|
||||
head,
|
||||
1,
|
||||
BlockConfig {
|
||||
slot,
|
||||
candidates: Some(vec![(candidate_descriptor, CoreIndex(0), GroupIndex(0))]),
|
||||
session_info: Some(session_info),
|
||||
},
|
||||
);
|
||||
builder.build(&mut virtual_overseer).await;
|
||||
|
||||
let candidate_index = 0;
|
||||
|
||||
let rx = check_and_import_assignment(
|
||||
&mut virtual_overseer,
|
||||
block_hash,
|
||||
candidate_index,
|
||||
validator_index_a,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(rx.await, Ok(AssignmentCheckResult::Accepted),);
|
||||
|
||||
let rx = check_and_import_assignment(
|
||||
&mut virtual_overseer,
|
||||
block_hash,
|
||||
candidate_index,
|
||||
validator_index_b,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(rx.await, Ok(AssignmentCheckResult::Accepted),);
|
||||
|
||||
let rx = check_and_import_assignment(
|
||||
&mut virtual_overseer,
|
||||
block_hash,
|
||||
candidate_index,
|
||||
validator_index_c,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(rx.await, Ok(AssignmentCheckResult::Accepted),);
|
||||
|
||||
let session_index = 1;
|
||||
let sig_b = sign_approval(Sr25519Keyring::Bob, candidate_hash, session_index);
|
||||
|
||||
let rx = check_and_import_approval(
|
||||
&mut virtual_overseer,
|
||||
block_hash,
|
||||
candidate_index,
|
||||
validator_index_b,
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
true,
|
||||
Some(sig_b),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(rx.await, Ok(ApprovalCheckResult::Accepted),);
|
||||
|
||||
let sig_c = sign_approval(Sr25519Keyring::Charlie, candidate_hash, session_index);
|
||||
let rx = check_and_import_approval(
|
||||
&mut virtual_overseer,
|
||||
block_hash,
|
||||
candidate_index,
|
||||
validator_index_c,
|
||||
candidate_hash,
|
||||
session_index,
|
||||
false,
|
||||
true,
|
||||
Some(sig_c),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(rx.await, Ok(ApprovalCheckResult::Accepted),);
|
||||
|
||||
// Sleep to ensure we get a consistent read on the database.
|
||||
//
|
||||
// NOTE: Since the response above occurs before writing to the database, we are somewhat
|
||||
// breaking the external consistency of the API by reaching into the database directly.
|
||||
// Under normal operation, this wouldn't be necessary, since all requests are serialized by
|
||||
// the event loop and we write at the end of each pass. However, if the database write were
|
||||
// to fail, a downstream subsystem may expect for this candidate to be approved, and
|
||||
// possibly take further actions on the assumption that the candidate is approved, when
|
||||
// that may not be the reality from the database's perspective. This could be avoided
|
||||
// entirely by having replies processed after database writes, but that would constitute a
|
||||
// larger refactor and incur a performance penalty.
|
||||
futures_timer::Delay::new(Duration::from_millis(100)).await;
|
||||
|
||||
// The candidate should not be approved.
|
||||
let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
|
||||
assert!(!candidate_entry.approval_entry(&block_hash).unwrap().is_approved());
|
||||
assert!(clock.inner.lock().has_wakeup(20));
|
||||
|
||||
// Wait for the no-show timer to observe the approval from
|
||||
// tranche 0 and set a wakeup for tranche 1.
|
||||
clock.inner.lock().set_tick(20);
|
||||
|
||||
// Sleep to ensure we get a consistent read on the database.
|
||||
futures_timer::Delay::new(Duration::from_millis(100)).await;
|
||||
|
||||
// The next wakeup should observe the assignment & approval from
|
||||
// tranche 1, and the no-show from tranche 0 should be immediately covered.
|
||||
assert_eq!(clock.inner.lock().next_wakeup(), Some(31));
|
||||
clock.inner.lock().set_tick(31);
|
||||
|
||||
assert_matches!(
|
||||
overseer_recv(&mut virtual_overseer).await,
|
||||
AllMessages::ChainSelection(ChainSelectionMessage::Approved(b_hash)) => {
|
||||
assert_eq!(b_hash, block_hash);
|
||||
}
|
||||
);
|
||||
|
||||
// The candidate and block should now be approved.
|
||||
let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
|
||||
assert!(candidate_entry.approval_entry(&block_hash).unwrap().is_approved());
|
||||
assert!(clock.inner.lock().next_wakeup().is_none());
|
||||
|
||||
let block_entry = store.load_block_entry(&block_hash).unwrap().unwrap();
|
||||
assert!(block_entry.is_fully_approved());
|
||||
|
||||
virtual_overseer
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user