dispute-coordinator: disabling in participation (#2637)

Closes #2225.

- [x] tests
- [x] fix todos
- [x] fix duplicates
- [x] make the check part of `potential_spam` 
- [x] fix a bug with votes insertion
- [x] guide changes
- [x] docs

---------

Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
This commit is contained in:
ordian
2024-01-09 07:44:19 +01:00
committed by GitHub
parent a02b53475b
commit 0ff3f4d3af
12 changed files with 1013 additions and 48 deletions
+4 -3
View File
@@ -1030,9 +1030,10 @@ async fn construct_per_relay_parent_state<Context>(
// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to
// `get_disabled_validators_with_fallback`, add `request_disabled_validators` call to the
// `try_join!` above and use `try_runtime_api!` to get `disabled_validators`
let disabled_validators = get_disabled_validators_with_fallback(ctx.sender(), parent)
.await
.map_err(Error::UtilError)?;
let disabled_validators =
get_disabled_validators_with_fallback(ctx.sender(), parent).await.map_err(|e| {
Error::UtilError(TryFrom::try_from(e).expect("the conversion is infallible; qed"))
})?;
let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = match Validator::construct(
@@ -52,6 +52,8 @@ pub struct CandidateEnvironment<'a> {
executor_params: &'a ExecutorParams,
/// Validator indices controlled by this node.
controlled_indices: HashSet<ValidatorIndex>,
/// Indices of disabled validators at the `relay_parent`.
disabled_indices: HashSet<ValidatorIndex>,
}
#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
@@ -66,6 +68,16 @@ impl<'a> CandidateEnvironment<'a> {
session_index: SessionIndex,
relay_parent: Hash,
) -> Option<CandidateEnvironment<'a>> {
let disabled_indices = runtime_info
.get_disabled_validators(ctx.sender(), relay_parent)
.await
.unwrap_or_else(|err| {
gum::info!(target: LOG_TARGET, ?err, "Failed to get disabled validators");
Vec::new()
})
.into_iter()
.collect();
let (session, executor_params) = match runtime_info
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await
@@ -76,7 +88,7 @@ impl<'a> CandidateEnvironment<'a> {
};
let controlled_indices = find_controlled_validator_indices(keystore, &session.validators);
Some(Self { session_index, session, executor_params, controlled_indices })
Some(Self { session_index, session, executor_params, controlled_indices, disabled_indices })
}
/// Validators in the candidate's session.
@@ -103,6 +115,11 @@ impl<'a> CandidateEnvironment<'a> {
pub fn controlled_indices(&'a self) -> &'a HashSet<ValidatorIndex> {
&self.controlled_indices
}
/// Indices of disabled validators at the `relay_parent`.
pub fn disabled_indices(&'a self) -> &'a HashSet<ValidatorIndex> {
&self.disabled_indices
}
}
/// Whether or not we already issued some statement about a candidate.
@@ -344,6 +361,14 @@ impl CandidateVoteState<CandidateVotes> {
&self.votes.candidate_receipt
}
/// Returns true if all the invalid votes are from disabled validators.
pub fn invalid_votes_all_disabled(
&self,
mut is_disabled: impl FnMut(&ValidatorIndex) -> bool,
) -> bool {
self.votes.invalid.keys().all(|i| is_disabled(i))
}
/// Extract `CandidateVotes` for handling import of new statements.
fn into_old_state(self) -> (CandidateVotes, CandidateVoteState<()>) {
let CandidateVoteState { votes, own_vote, dispute_status, byzantine_threshold_against } =
@@ -17,7 +17,7 @@
//! Dispute coordinator subsystem in initialized state (after first active leaf is received).
use std::{
collections::{BTreeMap, VecDeque},
collections::{BTreeMap, HashSet, VecDeque},
sync::Arc,
};
@@ -47,6 +47,7 @@ use polkadot_primitives::{
DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, ValidDisputeStatementKind,
ValidatorId, ValidatorIndex,
};
use schnellru::{LruMap, UnlimitedCompact};
use crate::{
db,
@@ -92,6 +93,9 @@ pub struct InitialData {
pub(crate) struct Initialized {
keystore: Arc<LocalKeystore>,
runtime_info: RuntimeInfo,
/// We have the onchain state of disabled validators as well as the offchain
/// state that is based on the lost disputes.
offchain_disabled_validators: OffchainDisabledValidators,
/// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doesn't matter if it
/// was cached successfully or not. It is used to detect ancient disputes.
highest_session_seen: SessionIndex,
@@ -130,10 +134,12 @@ impl Initialized {
let (participation_sender, participation_receiver) = mpsc::channel(1);
let participation = Participation::new(participation_sender, metrics.clone());
let offchain_disabled_validators = OffchainDisabledValidators::default();
Self {
keystore,
runtime_info,
offchain_disabled_validators,
highest_session_seen,
gaps_in_cache,
spam_slots,
@@ -319,13 +325,16 @@ impl Initialized {
self.runtime_info.pin_block(session_idx, new_leaf.unpin_handle);
// Fetch the last `DISPUTE_WINDOW` number of sessions unless there are no gaps
// in cache and we are not missing too many `SessionInfo`s
let mut lower_bound = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
if !self.gaps_in_cache && self.highest_session_seen > lower_bound {
lower_bound = self.highest_session_seen + 1
}
let prune_up_to = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
let fetch_lower_bound =
if !self.gaps_in_cache && self.highest_session_seen > prune_up_to {
self.highest_session_seen + 1
} else {
prune_up_to
};
// There is a new session. Perform a dummy fetch to cache it.
for idx in lower_bound..=session_idx {
for idx in fetch_lower_bound..=session_idx {
if let Err(err) = self
.runtime_info
.get_session_info_by_index(ctx.sender(), new_leaf.hash, idx)
@@ -344,11 +353,9 @@ impl Initialized {
self.highest_session_seen = session_idx;
db::v1::note_earliest_session(
overlay_db,
session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1),
)?;
self.spam_slots.prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1));
db::v1::note_earliest_session(overlay_db, prune_up_to)?;
self.spam_slots.prune_old(prune_up_to);
self.offchain_disabled_validators.prune_old(prune_up_to);
},
Ok(_) => { /* no new session => nothing to cache */ },
Err(err) => {
@@ -978,11 +985,13 @@ impl Initialized {
Some(env) => env,
};
let n_validators = env.validators().len();
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
?session,
num_validators = ?env.session_info().validators.len(),
?n_validators,
"Number of validators"
);
@@ -1084,18 +1093,42 @@ impl Initialized {
target: LOG_TARGET,
?candidate_hash,
?session,
num_validators = ?env.session_info().validators.len(),
?n_validators,
"Import result ready"
);
let new_state = import_result.new_state();
let byzantine_threshold = polkadot_primitives::byzantine_threshold(n_validators);
// combine on-chain with off-chain disabled validators
// process disabled validators in the following order:
// - on-chain disabled validators
// - prioritized order of off-chain disabled validators
// deduplicate the list and take at most `byzantine_threshold` validators
let disabled_validators = {
let mut d: HashSet<ValidatorIndex> = HashSet::new();
for v in env
.disabled_indices()
.iter()
.cloned()
.chain(self.offchain_disabled_validators.iter(session))
{
if d.len() == byzantine_threshold {
break
}
d.insert(v);
}
d
};
let is_included = self.scraper.is_candidate_included(&candidate_hash);
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
let own_vote_missing = new_state.own_vote_missing();
let is_disputed = new_state.is_disputed();
let is_confirmed = new_state.is_confirmed();
let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash);
// We participate only in disputes which are not potential spam.
let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash, |v| {
disabled_validators.contains(v)
});
let allow_participation = !potential_spam;
gum::trace!(
@@ -1106,6 +1139,7 @@ impl Initialized {
?candidate_hash,
confirmed = ?new_state.is_confirmed(),
has_invalid_voters = ?!import_result.new_invalid_voters().is_empty(),
n_disabled_validators = ?disabled_validators.len(),
"Is spam?"
);
@@ -1337,6 +1371,10 @@ impl Initialized {
);
}
}
for validator_index in new_state.votes().invalid.keys() {
self.offchain_disabled_validators
.insert_against_valid(session, *validator_index);
}
self.metrics.on_concluded_valid();
}
if import_result.is_freshly_concluded_against() {
@@ -1356,6 +1394,14 @@ impl Initialized {
);
}
}
for (validator_index, (kind, _sig)) in new_state.votes().valid.raw() {
let is_backer = kind.is_backing();
self.offchain_disabled_validators.insert_for_invalid(
session,
*validator_index,
is_backer,
);
}
self.metrics.on_concluded_invalid();
}
@@ -1591,3 +1637,82 @@ fn determine_undisputed_chain(
Ok(last)
}
#[derive(Default)]
struct OffchainDisabledValidators {
// Ideally, we want to use the top `byzantine_threshold` offenders here based on the amount of
// stake slashed. However, given that slashing might be applied with a delay, we want to have
// some list of offenders as soon as disputes conclude offchain. This list only approximates
// the top offenders and only accounts for lost disputes. But that should be good enough to
// prevent spam attacks.
per_session: BTreeMap<SessionIndex, LostSessionDisputes>,
}
struct LostSessionDisputes {
// We separate lost disputes to prioritize "for invalid" offenders. And among those, we
// prioritize backing votes the most. There's no need to limit the size of these sets, as they
// are already limited by the number of validators in the session. We use `LruMap` to ensure
// the iteration order prioritizes most recently disputes lost over older ones in case we reach
// the limit.
backers_for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
against_valid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
}
impl Default for LostSessionDisputes {
fn default() -> Self {
Self {
backers_for_invalid: LruMap::new(UnlimitedCompact),
for_invalid: LruMap::new(UnlimitedCompact),
against_valid: LruMap::new(UnlimitedCompact),
}
}
}
impl OffchainDisabledValidators {
fn prune_old(&mut self, up_to_excluding: SessionIndex) {
// split_off returns everything after the given key, including the key.
let mut relevant = self.per_session.split_off(&up_to_excluding);
std::mem::swap(&mut relevant, &mut self.per_session);
}
fn insert_for_invalid(
&mut self,
session_index: SessionIndex,
validator_index: ValidatorIndex,
is_backer: bool,
) {
let entry = self.per_session.entry(session_index).or_default();
if is_backer {
entry.backers_for_invalid.insert(validator_index, ());
} else {
entry.for_invalid.insert(validator_index, ());
}
}
fn insert_against_valid(
&mut self,
session_index: SessionIndex,
validator_index: ValidatorIndex,
) {
self.per_session
.entry(session_index)
.or_default()
.against_valid
.insert(validator_index, ());
}
/// Iterate over all validators that are offchain disabled.
/// The order of iteration prioritizes `for_invalid` offenders (and backers among those) over
/// `against_valid` offenders. And most recently lost disputes over older ones.
/// NOTE: the iterator might contain duplicates.
fn iter(&self, session_index: SessionIndex) -> impl Iterator<Item = ValidatorIndex> + '_ {
self.per_session.get(&session_index).into_iter().flat_map(|e| {
e.backers_for_invalid
.iter()
.chain(e.for_invalid.iter())
.chain(e.against_valid.iter())
.map(|(i, _)| *i)
})
}
}
@@ -370,8 +370,10 @@ impl DisputeCoordinatorSubsystem {
},
};
let vote_state = CandidateVoteState::new(votes, &env, now);
let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash);
let onchain_disabled = env.disabled_indices();
let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash, |v| {
onchain_disabled.contains(v)
});
let is_included =
scraper.is_candidate_included(&vote_state.votes().candidate_receipt.hash());
@@ -462,17 +464,20 @@ async fn wait_for_first_leaf<Context>(ctx: &mut Context) -> Result<Option<Activa
/// Check wheter a dispute for the given candidate could be spam.
///
/// That is the candidate could be made up.
pub fn is_potential_spam<V>(
pub fn is_potential_spam(
scraper: &ChainScraper,
vote_state: &CandidateVoteState<V>,
vote_state: &CandidateVoteState<CandidateVotes>,
candidate_hash: &CandidateHash,
is_disabled: impl FnMut(&ValidatorIndex) -> bool,
) -> bool {
let is_disputed = vote_state.is_disputed();
let is_included = scraper.is_candidate_included(candidate_hash);
let is_backed = scraper.is_candidate_backed(candidate_hash);
let is_confirmed = vote_state.is_confirmed();
let all_invalid_votes_disabled = vote_state.invalid_votes_all_disabled(is_disabled);
let ignore_disabled = !is_confirmed && all_invalid_votes_disabled;
is_disputed && !is_included && !is_backed && !is_confirmed
(is_disputed && !is_included && !is_backed && !is_confirmed) || ignore_disabled
}
/// Tell dispute-distribution to send all our votes.
@@ -372,7 +372,6 @@ fn cannot_participate_if_cannot_recover_validation_code() {
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();
recover_available_data(&mut ctx_handle).await;
assert_matches!(
@@ -257,7 +257,7 @@ impl TestState {
session: SessionIndex,
block_number: BlockNumber,
candidate_events: Vec<CandidateEvent>,
) {
) -> Hash {
assert!(block_number > 0);
let block_header = Header {
@@ -282,6 +282,8 @@ impl TestState {
self.handle_sync_queries(virtual_overseer, block_hash, session, candidate_events)
.await;
block_hash
}
/// Returns any sent `DisputeMessage`s.
@@ -406,6 +408,19 @@ impl TestState {
)) => {
tx.send(Ok(Vec::new())).unwrap();
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::Version(tx),
)) => {
tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT))
.unwrap();
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::DisabledValidators(tx),
)) => {
tx.send(Ok(Vec::new())).unwrap();
},
AllMessages::ChainApi(ChainApiMessage::Ancestors { hash, k, response_channel }) => {
let target_header = self
.headers
@@ -628,15 +643,19 @@ async fn participation_with_distribution(
}
fn make_valid_candidate_receipt() -> CandidateReceipt {
let mut candidate_receipt = dummy_candidate_receipt_bad_sig(dummy_hash(), dummy_hash());
candidate_receipt.commitments_hash = CandidateCommitments::default().hash();
candidate_receipt
make_another_valid_candidate_receipt(dummy_hash())
}
fn make_invalid_candidate_receipt() -> CandidateReceipt {
dummy_candidate_receipt_bad_sig(Default::default(), Some(Default::default()))
}
fn make_another_valid_candidate_receipt(relay_parent: Hash) -> CandidateReceipt {
let mut candidate_receipt = dummy_candidate_receipt_bad_sig(relay_parent, dummy_hash());
candidate_receipt.commitments_hash = CandidateCommitments::default().hash();
candidate_receipt
}
// Generate a `CandidateBacked` event from a `CandidateReceipt`. The rest is dummy data.
fn make_candidate_backed_event(candidate_receipt: CandidateReceipt) -> CandidateEvent {
CandidateEvent::CandidateBacked(
@@ -740,6 +759,7 @@ fn too_many_unconfirmed_statements_are_considered_spam() {
.await;
gum::trace!("After sending `ImportStatements`");
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash1, HashMap::new())
.await;
@@ -875,6 +895,7 @@ fn approval_vote_import_works() {
.into_iter()
.collect();
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash1, approval_votes)
.await;
@@ -982,6 +1003,7 @@ fn dispute_gets_confirmed_via_participation() {
})
.await;
gum::debug!("After First import!");
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash1, HashMap::new())
.await;
@@ -1131,6 +1153,7 @@ fn dispute_gets_confirmed_at_byzantine_threshold() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash1, HashMap::new())
.await;
@@ -1255,6 +1278,7 @@ fn backing_statements_import_works_and_no_spam() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
assert_matches!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
@@ -1387,6 +1411,7 @@ fn conflicting_votes_lead_to_dispute_participation() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -1506,6 +1531,7 @@ fn positive_votes_dont_trigger_participation() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
{
let (tx, rx) = oneshot::channel();
@@ -1616,6 +1642,7 @@ fn wrong_validator_index_is_ignored() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
{
let (tx, rx) = oneshot::channel();
@@ -1693,6 +1720,7 @@ fn finality_votes_ignore_disputed_candidates() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -1769,14 +1797,10 @@ fn supermajority_valid_dispute_may_be_finalized() {
let candidate_receipt = make_valid_candidate_receipt();
let candidate_hash = candidate_receipt.hash();
let candidate_events = vec![make_candidate_backed_event(candidate_receipt.clone())];
test_state
.activate_leaf_at_session(
&mut virtual_overseer,
session,
1,
vec![make_candidate_backed_event(candidate_receipt.clone())],
)
.activate_leaf_at_session(&mut virtual_overseer, session, 1, candidate_events)
.await;
let supermajority_threshold =
@@ -1805,6 +1829,7 @@ fn supermajority_valid_dispute_may_be_finalized() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -1942,6 +1967,7 @@ fn concluded_supermajority_for_non_active_after_time() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -2058,6 +2084,7 @@ fn concluded_supermajority_against_non_active_after_time() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
assert_matches!(confirmation_rx.await.unwrap(),
@@ -2173,6 +2200,7 @@ fn resume_dispute_without_local_statement() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -2217,13 +2245,23 @@ fn resume_dispute_without_local_statement() {
let candidate_receipt = make_valid_candidate_receipt();
let candidate_hash = candidate_receipt.hash();
participation_with_distribution(
participation_full_happy_path(
&mut virtual_overseer,
&candidate_hash,
candidate_receipt.commitments_hash,
)
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::SendDispute(msg)
) => {
assert_eq!(msg.candidate_receipt().hash(), candidate_hash);
}
);
let mut statements = Vec::new();
// Getting votes for supermajority. Should already have two valid votes.
for i in vec![3, 4, 5, 6, 7] {
@@ -2328,6 +2366,7 @@ fn resume_dispute_with_local_statement() {
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -2425,6 +2464,7 @@ fn resume_dispute_without_local_statement_or_local_key() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(
&mut virtual_overseer,
&candidate_hash,
@@ -2516,6 +2556,7 @@ fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
// Initiate dispute locally:
@@ -2555,6 +2596,651 @@ fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation
});
}
#[test]
fn participation_with_onchain_disabling_unconfirmed() {
test_harness(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = make_valid_candidate_receipt();
let candidate_hash = candidate_receipt.hash();
let events = vec![make_candidate_included_event(candidate_receipt.clone())];
test_state
.activate_leaf_at_session(&mut virtual_overseer, session, 1, events)
.await;
let backer_index = ValidatorIndex(1);
let disabled_index = ValidatorIndex(2);
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
&test_state,
backer_index,
disabled_index,
candidate_hash,
session,
VoteType::Backing,
)
.await;
let (pending_confirmation, confirmation_rx) = oneshot::channel();
let pending_confirmation = Some(pending_confirmation);
// Scenario 1: unconfirmed dispute with onchain disabled validator against.
// Expectation: we import the vote, but do not participate.
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, backer_index),
(invalid_vote, disabled_index),
],
pending_confirmation,
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, vec![disabled_index]).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
// we should not participate due to disabled indices on chain
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
// Scenario 2: unconfirmed dispute with non-disabled validator against.
// Expectation: even if the dispute is unconfirmed, we should participate
// once we receive an invalid vote from a non-disabled validator.
let non_disabled_index = ValidatorIndex(3);
let vote = test_state.issue_explicit_statement_with_index(
non_disabled_index,
candidate_hash,
session,
false,
);
let statements = vec![(vote, non_disabled_index)];
let (pending_confirmation, confirmation_rx) = oneshot::channel();
let pending_confirmation = Some(pending_confirmation);
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: candidate_receipt.clone(),
session,
statements,
pending_confirmation,
},
})
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
participation_with_distribution(
&mut virtual_overseer,
&candidate_hash,
candidate_receipt.commitments_hash,
)
.await;
{
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;
assert_eq!(rx.await.unwrap().len(), 1);
// check if we have participated (cast a vote)
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
vec![(session, candidate_hash)],
tx,
),
})
.await;
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.raw().len(), 2); // 3+1 => we have participated
assert_eq!(votes.invalid.len(), 2);
}
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})
});
}
#[test]
fn participation_with_onchain_disabling_confirmed() {
test_harness(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = make_valid_candidate_receipt();
let candidate_hash = candidate_receipt.hash();
let events = vec![make_candidate_included_event(candidate_receipt.clone())];
test_state
.activate_leaf_at_session(&mut virtual_overseer, session, 1, events)
.await;
let backer_index = ValidatorIndex(1);
let disabled_index = ValidatorIndex(2);
// Scenario 1: confirmed dispute with disabled validator
// Expectation: we import the vote and participate.
let mut statements = Vec::new();
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
&test_state,
backer_index,
disabled_index,
candidate_hash,
session,
VoteType::Backing,
)
.await;
statements.push((valid_vote, backer_index));
statements.push((invalid_vote, disabled_index));
// now import enough votes for dispute confirmation
for i in vec![3, 4] {
let vote = test_state.issue_explicit_statement_with_index(
ValidatorIndex(i),
candidate_hash,
session,
true,
);
statements.push((vote, ValidatorIndex(i as _)));
}
let (pending_confirmation, confirmation_rx) = oneshot::channel();
let pending_confirmation = Some(pending_confirmation);
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: candidate_receipt.clone(),
session,
statements,
pending_confirmation,
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, vec![]).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
participation_with_distribution(
&mut virtual_overseer,
&candidate_hash,
candidate_receipt.commitments_hash,
)
.await;
{
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;
assert_eq!(rx.await.unwrap().len(), 1);
// check if we have participated (cast a vote)
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
vec![(session, candidate_hash)],
tx,
),
})
.await;
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.raw().len(), 4); // 3+1 => we have participated
assert_eq!(votes.invalid.len(), 1);
}
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})
});
}
#[test]
fn participation_with_offchain_disabling() {
test_harness(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = make_valid_candidate_receipt();
let candidate_hash = candidate_receipt.hash();
let events = vec![make_candidate_included_event(candidate_receipt.clone())];
let block_hash = test_state
.activate_leaf_at_session(&mut virtual_overseer, session, 3, events)
.await;
let another_candidate_receipt = make_another_valid_candidate_receipt(block_hash);
let another_candidate_hash = another_candidate_receipt.hash();
let another_events =
vec![make_candidate_included_event(another_candidate_receipt.clone())];
test_state
.activate_leaf_at_session(&mut virtual_overseer, session, 4, another_events)
.await;
// import enough votes for supermajority to conclude the dispute
let mut statements = Vec::new();
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
&test_state,
ValidatorIndex(1),
ValidatorIndex(2),
candidate_hash,
session,
VoteType::Backing,
)
.await;
statements.push((valid_vote, ValidatorIndex(1)));
statements.push((invalid_vote, ValidatorIndex(2)));
for i in vec![3, 4, 5, 6, 7, 8] {
let vote = test_state.issue_explicit_statement_with_index(
ValidatorIndex(i),
candidate_hash,
session,
true,
);
statements.push((vote, ValidatorIndex(i as _)));
}
let (pending_confirmation, confirmation_rx) = oneshot::channel();
let pending_confirmation = Some(pending_confirmation);
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: candidate_receipt.clone(),
session,
statements,
pending_confirmation,
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, vec![]).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
participation_with_distribution(
&mut virtual_overseer,
&candidate_hash,
candidate_receipt.commitments_hash,
)
.await;
{
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;
assert_eq!(rx.await.unwrap().len(), 1);
// check if we have participated (cast a vote)
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
vec![(session, candidate_hash)],
tx,
),
})
.await;
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.raw().len(), 8); // 8 => we have participated
assert_eq!(votes.invalid.len(), 1);
}
// now create another dispute
// Validator 2 should be disabled offchain now
let mut statements = Vec::new();
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
&test_state,
ValidatorIndex(1),
ValidatorIndex(2),
another_candidate_hash,
session,
VoteType::Backing,
)
.await;
statements.push((valid_vote, ValidatorIndex(1)));
statements.push((invalid_vote, ValidatorIndex(2)));
let (pending_confirmation, confirmation_rx) = oneshot::channel();
let pending_confirmation = Some(pending_confirmation);
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: another_candidate_receipt.clone(),
session,
statements,
pending_confirmation,
},
})
.await;
// let's disable validators 3, 4 on chain, but this should not affect this import
let disabled_validators = vec![ValidatorIndex(3), ValidatorIndex(4)];
handle_disabled_validators_queries(&mut virtual_overseer, disabled_validators).await;
handle_approval_vote_request(
&mut virtual_overseer,
&another_candidate_hash,
HashMap::new(),
)
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
// we should not participate since due to offchain disabling
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
// now import enough votes for dispute confirmation
// even though all of these votes are from (on chain) disabled validators
let mut statements = Vec::new();
for i in vec![3, 4] {
let vote = test_state.issue_explicit_statement_with_index(
ValidatorIndex(i),
another_candidate_hash,
session,
true,
);
statements.push((vote, ValidatorIndex(i as _)));
}
let (pending_confirmation, confirmation_rx) = oneshot::channel();
let pending_confirmation = Some(pending_confirmation);
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: another_candidate_receipt.clone(),
session,
statements,
pending_confirmation,
},
})
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
participation_with_distribution(
&mut virtual_overseer,
&another_candidate_hash,
another_candidate_receipt.commitments_hash,
)
.await;
{
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;
assert_eq!(rx.await.unwrap().len(), 2);
// check if we have participated (cast a vote)
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
vec![(session, another_candidate_hash)],
tx,
),
})
.await;
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.raw().len(), 4); // 3+1 => we have participated
assert_eq!(votes.invalid.len(), 1);
}
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})
});
}
// Once the onchain disabling reaches the byzantine threshold,
// offchain disabling will no longer take any effect.
#[test]
fn participation_with_disabling_limits() {
test_harness(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = make_valid_candidate_receipt();
let candidate_hash = candidate_receipt.hash();
let events = vec![make_candidate_included_event(candidate_receipt.clone())];
let block_hash = test_state
.activate_leaf_at_session(&mut virtual_overseer, session, 3, events)
.await;
let another_candidate_receipt = make_another_valid_candidate_receipt(block_hash);
let another_candidate_hash = another_candidate_receipt.hash();
let another_events =
vec![make_candidate_included_event(another_candidate_receipt.clone())];
test_state
.activate_leaf_at_session(&mut virtual_overseer, session, 4, another_events)
.await;
// import enough votes for supermajority to conclude the dispute
let mut statements = Vec::new();
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
&test_state,
ValidatorIndex(1),
ValidatorIndex(2),
candidate_hash,
session,
VoteType::Backing,
)
.await;
statements.push((valid_vote, ValidatorIndex(1)));
statements.push((invalid_vote, ValidatorIndex(2)));
for i in vec![3, 4, 5, 6, 7, 8] {
let vote = test_state.issue_explicit_statement_with_index(
ValidatorIndex(i),
candidate_hash,
session,
true,
);
statements.push((vote, ValidatorIndex(i as _)));
}
let (pending_confirmation, confirmation_rx) = oneshot::channel();
let pending_confirmation = Some(pending_confirmation);
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: candidate_receipt.clone(),
session,
statements,
pending_confirmation,
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, vec![]).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
participation_with_distribution(
&mut virtual_overseer,
&candidate_hash,
candidate_receipt.commitments_hash,
)
.await;
{
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;
assert_eq!(rx.await.unwrap().len(), 1);
// check if we have participated (cast a vote)
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
vec![(session, candidate_hash)],
tx,
),
})
.await;
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.raw().len(), 8); // 8 => we have participated
assert_eq!(votes.invalid.len(), 1);
}
// now create another dispute
// validator 2 should be disabled offchain now
// but due to the byzantine threshold of onchain disabling
// this validator will be considered enabled
let mut statements = Vec::new();
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
&test_state,
ValidatorIndex(1),
ValidatorIndex(2),
another_candidate_hash,
session,
VoteType::Backing,
)
.await;
statements.push((valid_vote, ValidatorIndex(1)));
statements.push((invalid_vote, ValidatorIndex(2)));
let (pending_confirmation, confirmation_rx) = oneshot::channel();
let pending_confirmation = Some(pending_confirmation);
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_receipt: another_candidate_receipt.clone(),
session,
statements,
pending_confirmation,
},
})
.await;
// let's disable validators 3, 4, 5 on chain, reaching the byzantine threshold
let disabled_validators = vec![ValidatorIndex(3), ValidatorIndex(4), ValidatorIndex(5)];
handle_disabled_validators_queries(&mut virtual_overseer, disabled_validators).await;
handle_approval_vote_request(
&mut virtual_overseer,
&another_candidate_hash,
HashMap::new(),
)
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
participation_with_distribution(
&mut virtual_overseer,
&another_candidate_hash,
another_candidate_receipt.commitments_hash,
)
.await;
{
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;
assert_eq!(rx.await.unwrap().len(), 2);
// check if we have participated (cast a vote)
let (tx, rx) = oneshot::channel();
virtual_overseer
.send(FromOrchestra::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
vec![(session, another_candidate_hash)],
tx,
),
})
.await;
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.raw().len(), 2); // 2 => we have participated
assert_eq!(votes.invalid.len(), 1);
}
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})
});
}
#[test]
fn own_approval_vote_gets_distributed_on_dispute() {
test_harness(|mut test_state, mut virtual_overseer| {
@@ -2613,6 +3299,7 @@ fn own_approval_vote_gets_distributed_on_dispute() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -2667,6 +3354,7 @@ fn negative_issue_local_statement_only_triggers_import() {
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
// Assert that subsystem is not participating.
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
@@ -2732,6 +3420,7 @@ fn redundant_votes_ignored() {
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
rx.await.unwrap();
let (tx, rx) = oneshot::channel();
@@ -2806,6 +3495,7 @@ fn no_onesided_disputes() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
assert_matches!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
// We should not have any active disputes now.
@@ -2869,6 +3559,7 @@ fn refrain_from_participation() {
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -2961,6 +3652,7 @@ fn participation_for_included_candidates() {
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -3049,6 +3741,7 @@ fn local_participation_in_dispute_for_backed_candidate() {
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
@@ -3190,6 +3883,7 @@ fn participation_requests_reprioritized_for_newly_included() {
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
// Handle corresponding messages to unblock import
// we need to handle `ApprovalVotingMessage::GetApprovalSignaturesForCandidate` for
// import
@@ -3343,6 +4037,7 @@ fn informs_chain_selection_when_dispute_concluded_against() {
},
})
.await;
handle_disabled_validators_queries(&mut virtual_overseer, Vec::new()).await;
handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new())
.await;
assert_matches!(confirmation_rx.await.unwrap(),
@@ -3655,3 +4350,27 @@ fn session_info_small_jump_works() {
})
});
}
async fn handle_disabled_validators_queries(
virtual_overseer: &mut VirtualOverseer,
disabled_validators: Vec<ValidatorIndex>,
) {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::Version(tx),
)) => {
tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap();
}
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::DisabledValidators(tx),
)) => {
tx.send(Ok(disabled_validators)).unwrap();
}
);
}