From 647b204b5d6b588ba35bb867b218d559df2faecc Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 8 Sep 2021 06:59:44 -0500 Subject: [PATCH] participate in disputes only if haven't voted already (#3796) * participate in disputes only if haven't voted already * Add tests for dispute participation on local import. * Fixes. * cargo fmt * Get rid of redundant dependency. Co-authored-by: Robert Klotzner --- .../core/dispute-coordinator/src/real/mod.rs | 39 ++- .../dispute-coordinator/src/real/tests.rs | 315 +++++++++++------- 2 files changed, 230 insertions(+), 124 deletions(-) diff --git a/polkadot/node/core/dispute-coordinator/src/real/mod.rs b/polkadot/node/core/dispute-coordinator/src/real/mod.rs index 53f7169779..791a2fdcbf 100644 --- a/polkadot/node/core/dispute-coordinator/src/real/mod.rs +++ b/polkadot/node/core/dispute-coordinator/src/real/mod.rs @@ -48,7 +48,7 @@ use polkadot_node_subsystem_util::rolling_session_window::{ }; use polkadot_primitives::v1::{ BlockNumber, CandidateHash, CandidateReceipt, DisputeStatement, Hash, SessionIndex, - SessionInfo, ValidatorIndex, ValidatorPair, ValidatorSignature, + SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, }; use futures::{channel::oneshot, prelude::*}; @@ -745,8 +745,15 @@ async fn handle_import_statements( if status != prev_status { // This branch is only hit when the candidate is freshly disputed - // status was previously `None`, and now is not. - if prev_status.is_none() { - // No matter what, if the dispute is new, we participate. + if prev_status.is_none() && { + let controlled_indices = + find_controlled_validator_indices(&state.keystore, &validators); + let voted_indices = votes.voted_indices(); + + !controlled_indices.iter().all(|val_index| voted_indices.contains(&val_index)) + } { + // If the dispute is new, we participate UNLESS all our controlled + // keys have already participated. // // We also block the coordinator while awaiting our determination // of whether the vote is available. @@ -792,6 +799,22 @@ async fn handle_import_statements( Ok(()) } +fn find_controlled_validator_indices( + keystore: &LocalKeystore, + validators: &[ValidatorId], +) -> HashSet { + let mut controlled = HashSet::new(); + for (index, validator) in validators.iter().enumerate() { + if keystore.key_pair::(validator).ok().flatten().is_none() { + continue + } + + controlled.insert(ValidatorIndex(index as _)); + } + + controlled +} + async fn issue_local_statement( ctx: &mut impl SubsystemContext, overlay_db: &mut OverlayedBackend<'_, impl Backend>, @@ -833,14 +856,12 @@ async fn issue_local_statement( let mut statements = Vec::new(); let voted_indices: HashSet<_> = voted_indices.into_iter().collect(); - for (index, validator) in validators.iter().enumerate() { - let index = ValidatorIndex(index as _); + let controlled_indices = find_controlled_validator_indices(&state.keystore, &validators[..]); + + for index in controlled_indices { if voted_indices.contains(&index) { continue } - if state.keystore.key_pair::(validator).ok().flatten().is_none() { - continue - } let keystore = state.keystore.clone() as Arc<_>; let res = SignedDisputeStatement::sign_explicit( @@ -848,7 +869,7 @@ async fn issue_local_statement( valid, candidate_hash, session, - validator.clone(), + validators[index.0 as usize].clone(), ) .await; diff --git a/polkadot/node/core/dispute-coordinator/src/real/tests.rs b/polkadot/node/core/dispute-coordinator/src/real/tests.rs index 60d7ade2fd..4445f62f11 100644 --- a/polkadot/node/core/dispute-coordinator/src/real/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/real/tests.rs @@ -107,6 +107,7 @@ impl Default for TestState { Sr25519Keyring::Dave, Sr25519Keyring::Eve, Sr25519Keyring::One, + Sr25519Keyring::Ferdie, ]; let validator_public = validators.iter().map(|k| ValidatorId::from(k.public())).collect(); @@ -114,7 +115,7 @@ impl Default for TestState { let validator_groups = vec![ vec![ValidatorIndex(0), ValidatorIndex(1)], vec![ValidatorIndex(2), ValidatorIndex(3)], - vec![ValidatorIndex(4), ValidatorIndex(5)], + vec![ValidatorIndex(4), ValidatorIndex(5), ValidatorIndex(6)], ]; let master_keystore = make_keystore(&validators).into(); @@ -316,7 +317,7 @@ fn conflicting_votes_lead_to_dispute_participation() { test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await; let valid_vote = - test_state.issue_statement_with_index(0, candidate_hash, session, true).await; + test_state.issue_statement_with_index(3, candidate_hash, session, true).await; let invalid_vote = test_state.issue_statement_with_index(1, candidate_hash, session, false).await; @@ -332,7 +333,7 @@ fn conflicting_votes_lead_to_dispute_participation() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![ - (valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(3)), (invalid_vote, ValidatorIndex(1)), ], pending_confirmation, @@ -434,7 +435,7 @@ fn positive_votes_dont_trigger_participation() { test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await; let valid_vote = - test_state.issue_statement_with_index(0, candidate_hash, session, true).await; + test_state.issue_statement_with_index(2, candidate_hash, session, true).await; let valid_vote_2 = test_state.issue_statement_with_index(1, candidate_hash, session, true).await; @@ -446,7 +447,7 @@ fn positive_votes_dont_trigger_participation() { candidate_hash, candidate_receipt: candidate_receipt.clone(), session, - statements: vec![(valid_vote, ValidatorIndex(0))], + statements: vec![(valid_vote, ValidatorIndex(2))], pending_confirmation, }, }) @@ -539,7 +540,7 @@ fn wrong_validator_index_is_ignored() { test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await; let valid_vote = - test_state.issue_statement_with_index(0, candidate_hash, session, true).await; + test_state.issue_statement_with_index(2, candidate_hash, session, true).await; let invalid_vote = test_state.issue_statement_with_index(1, candidate_hash, session, false).await; @@ -553,7 +554,7 @@ fn wrong_validator_index_is_ignored() { session, statements: vec![ (valid_vote, ValidatorIndex(1)), - (invalid_vote, ValidatorIndex(0)), + (invalid_vote, ValidatorIndex(2)), ], pending_confirmation, }, @@ -609,7 +610,7 @@ fn finality_votes_ignore_disputed_candidates() { test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await; let valid_vote = - test_state.issue_statement_with_index(0, candidate_hash, session, true).await; + test_state.issue_statement_with_index(2, candidate_hash, session, true).await; let invalid_vote = test_state.issue_statement_with_index(1, candidate_hash, session, false).await; @@ -622,7 +623,7 @@ fn finality_votes_ignore_disputed_candidates() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![ - (valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(2)), (invalid_vote, ValidatorIndex(1)), ], pending_confirmation, @@ -715,7 +716,7 @@ fn supermajority_valid_dispute_may_be_finalized() { polkadot_primitives::v1::supermajority_threshold(test_state.validators.len()); let valid_vote = - test_state.issue_statement_with_index(0, candidate_hash, session, true).await; + test_state.issue_statement_with_index(2, candidate_hash, session, true).await; let invalid_vote = test_state.issue_statement_with_index(1, candidate_hash, session, false).await; @@ -728,7 +729,7 @@ fn supermajority_valid_dispute_may_be_finalized() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![ - (valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(2)), (invalid_vote, ValidatorIndex(1)), ], pending_confirmation, @@ -755,7 +756,7 @@ fn supermajority_valid_dispute_may_be_finalized() { ); let mut statements = Vec::new(); - for i in (0..supermajority_threshold - 1).map(|i| i + 2) { + for i in (0..supermajority_threshold - 1).map(|i| i + 3) { let vote = test_state.issue_statement_with_index(i, candidate_hash, session, true).await; @@ -848,7 +849,7 @@ fn concluded_supermajority_for_non_active_after_time() { polkadot_primitives::v1::supermajority_threshold(test_state.validators.len()); let valid_vote = - test_state.issue_statement_with_index(0, candidate_hash, session, true).await; + test_state.issue_statement_with_index(2, candidate_hash, session, true).await; let invalid_vote = test_state.issue_statement_with_index(1, candidate_hash, session, false).await; @@ -861,7 +862,7 @@ fn concluded_supermajority_for_non_active_after_time() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![ - (valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(2)), (invalid_vote, ValidatorIndex(1)), ], pending_confirmation, @@ -882,7 +883,7 @@ fn concluded_supermajority_for_non_active_after_time() { ); let mut statements = Vec::new(); - for i in (0..supermajority_threshold - 1).map(|i| i + 2) { + for i in (0..supermajority_threshold - 1).map(|i| i + 3) { let vote = test_state.issue_statement_with_index(i, candidate_hash, session, true).await; @@ -951,7 +952,7 @@ fn concluded_supermajority_against_non_active_after_time() { polkadot_primitives::v1::supermajority_threshold(test_state.validators.len()); let valid_vote = - test_state.issue_statement_with_index(0, candidate_hash, session, true).await; + test_state.issue_statement_with_index(2, candidate_hash, session, true).await; let invalid_vote = test_state.issue_statement_with_index(1, candidate_hash, session, false).await; @@ -964,7 +965,7 @@ fn concluded_supermajority_against_non_active_after_time() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![ - (valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(2)), (invalid_vote, ValidatorIndex(1)), ], pending_confirmation, @@ -985,7 +986,7 @@ fn concluded_supermajority_against_non_active_after_time() { ); let mut statements = Vec::new(); - for i in (0..supermajority_threshold - 1).map(|i| i + 2) { + for i in (0..supermajority_threshold - 1).map(|i| i + 3) { let vote = test_state.issue_statement_with_index(i, candidate_hash, session, false).await; @@ -1051,7 +1052,7 @@ fn fresh_dispute_ignored_if_unavailable() { test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await; let valid_vote = - test_state.issue_statement_with_index(0, candidate_hash, session, true).await; + test_state.issue_statement_with_index(2, candidate_hash, session, true).await; let invalid_vote = test_state.issue_statement_with_index(1, candidate_hash, session, false).await; @@ -1064,7 +1065,7 @@ fn fresh_dispute_ignored_if_unavailable() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![ - (valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(2)), (invalid_vote, ValidatorIndex(1)), ], pending_confirmation, @@ -1298,18 +1299,6 @@ fn resume_dispute_with_local_statement() { }) .await; - assert_matches!( - virtual_overseer.recv().await, - AllMessages::DisputeParticipation( - DisputeParticipationMessage::Participate { - report_availability, - .. - } - ) => { - report_availability.send(true).unwrap(); - } - ); - assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); { @@ -1330,7 +1319,7 @@ fn resume_dispute_with_local_statement() { test_state }) }) - // Alice should send a DisputeParticiationMessage::Participate on restart since she has no + // Alice should not send a DisputeParticiationMessage::Participate on restart since she has a // local statement for the active dispute. .resume(|test_state, mut virtual_overseer| { Box::pin(async move { @@ -1384,18 +1373,6 @@ fn resume_dispute_without_local_statement_or_local_key() { }) .await; - assert_matches!( - virtual_overseer.recv().await, - AllMessages::DisputeParticipation( - DisputeParticipationMessage::Participate { - report_availability, - .. - } - ) => { - report_availability.send(true).unwrap(); - } - ); - assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); { @@ -1416,8 +1393,8 @@ fn resume_dispute_without_local_statement_or_local_key() { test_state }) }) - // Alice should send a DisputeParticiationMessage::Participate on restart since she has no - // local statement for the active dispute. + // Two should not send a DisputeParticiationMessage::Participate on restart since she is no + // validator in that dispute. .resume(|test_state, mut virtual_overseer| { Box::pin(async move { test_state.handle_resume_sync(&mut virtual_overseer, session).await; @@ -1437,89 +1414,197 @@ fn resume_dispute_without_local_statement_or_local_key() { fn resume_dispute_with_local_statement_without_local_key() { let session = 1; - let mut test_state = TestState::default(); - test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into(); - test_state - .resume(|mut test_state, mut virtual_overseer| { - Box::pin(async move { - test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let test_state = TestState::default(); + let mut test_state = test_state.resume(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; - let candidate_receipt = CandidateReceipt::default(); - let candidate_hash = candidate_receipt.hash(); + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); - test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await; + test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await; - let local_valid_vote = - test_state.issue_statement_with_index(0, candidate_hash, session, true).await; + let local_valid_vote = + test_state.issue_statement_with_index(0, candidate_hash, session, true).await; - let valid_vote = - test_state.issue_statement_with_index(1, candidate_hash, session, true).await; + let valid_vote = + test_state.issue_statement_with_index(1, candidate_hash, session, true).await; - let invalid_vote = - test_state.issue_statement_with_index(2, candidate_hash, session, false).await; + let invalid_vote = + test_state.issue_statement_with_index(2, candidate_hash, session, false).await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer + .send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (local_valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation, + }, + }) + .await; + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + { + let (tx, rx) = oneshot::channel(); - let (pending_confirmation, confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { - msg: DisputeCoordinatorMessage::ImportStatements { - candidate_hash, - candidate_receipt: candidate_receipt.clone(), - session, - statements: vec![ - (local_valid_vote, ValidatorIndex(0)), - (valid_vote, ValidatorIndex(1)), - (invalid_vote, ValidatorIndex(2)), - ], - pending_confirmation, - }, + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), }) .await; - assert_matches!( - virtual_overseer.recv().await, - AllMessages::DisputeParticipation( - DisputeParticipationMessage::Participate { - report_availability, - .. - } - ) => { - report_availability.send(true).unwrap(); - } - ); + assert_eq!(rx.await.unwrap().len(), 1); + } - assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); - { - let (tx, rx) = oneshot::channel(); - - virtual_overseer - .send(FromOverseer::Communication { - msg: DisputeCoordinatorMessage::ActiveDisputes(tx), - }) - .await; - - assert_eq!(rx.await.unwrap().len(), 1); - } - - virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - assert!(virtual_overseer.try_recv().await.is_none()); - - test_state - }) + test_state }) - // Alice should send a DisputeParticiationMessage::Participate on restart since she has no - // local statement for the active dispute. - .resume(|test_state, mut virtual_overseer| { - Box::pin(async move { - test_state.handle_resume_sync(&mut virtual_overseer, session).await; + }); + // No keys: + test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into(); + // Two should not send a DisputeParticiationMessage::Participate on restart since we gave + // her a non existing key. + test_state.resume(|test_state, mut virtual_overseer| { + Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; - // Assert that subsystem is not sending Participation messages because we issued a local statement - assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + // Assert that subsystem is not sending Participation messages because we don't + // have a key. + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); - virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - assert!(virtual_overseer.try_recv().await.is_none()); + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); - test_state - }) - }); + test_state + }) + }); +} + +#[test] +fn issue_valid_local_statement_does_cause_distribution_but_not_duplicate_participation() { + issue_local_statement_does_cause_distribution_but_not_duplicate_participation(true); +} + +#[test] +fn issue_invalid_local_statement_does_cause_distribution_but_not_duplicate_participation() { + issue_local_statement_does_cause_distribution_but_not_duplicate_participation(false); +} + +fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation(validity: bool) { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await; + + let other_vote = test_state + .issue_statement_with_index(1, candidate_hash, session, !validity) + .await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer + .send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![(other_vote, ValidatorIndex(1))], + pending_confirmation, + }, + }) + .await; + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + // Initiate dispute locally: + virtual_overseer + .send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::IssueLocalStatement( + session, + candidate_hash, + candidate_receipt.clone(), + validity, + ), + }) + .await; + + // Dispute distribution should get notified now: + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeDistribution( + DisputeDistributionMessage::SendDispute(msg) + ) => { + assert_eq!(msg.session_index(), session); + assert_eq!(msg.candidate_receipt(), &candidate_receipt); + } + ); + + // Make sure we don't get a `DisputeParticiationMessage`. + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + }) + }); +} + +#[test] +fn negative_issue_local_statement_only_triggers_import() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await; + + virtual_overseer + .send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::IssueLocalStatement( + session, + candidate_hash, + candidate_receipt.clone(), + false, + ), + }) + .await; + + let backend = DbBackend::new(test_state.db.clone(), test_state.config.column_config()); + + let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap(); + assert_eq!(votes.invalid.len(), 1); + assert_eq!(votes.valid.len(), 0); + + let disputes = backend.load_recent_disputes().unwrap(); + assert_eq!(disputes, None); + + // Assert that subsystem is not sending Participation messages: + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + }) + }); }