From 1f48725c541282a1b09b55efd4bfbcf63db75ffb Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 2 Nov 2020 12:37:50 -0600 Subject: [PATCH] Backing fixes (#1897) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Commit my changes * some backing fixes * indentation * fix backing tests * tweak includability rules * comment * Update node/core/backing/src/lib.rs Co-authored-by: Bastian Köcher * Update node/core/backing/src/lib.rs Co-authored-by: Bastian Köcher * Update node/core/backing/src/lib.rs Co-authored-by: Bastian Köcher * Update node/core/backing/src/lib.rs Co-authored-by: Bastian Köcher Co-authored-by: Bastian Köcher Co-authored-by: Bastian Köcher --- polkadot/node/core/backing/src/lib.rs | 233 ++++++++++++++-------- polkadot/node/core/provisioner/src/lib.rs | 5 +- polkadot/statement-table/src/generic.rs | 36 +++- 3 files changed, 184 insertions(+), 90 deletions(-) diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 9b3102d97b..c141fa8a9a 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -104,6 +104,9 @@ struct CandidateBackingJob { issued_statements: HashSet, /// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash. seconded: Option, + /// The candidates that are includable, by hash. Each entry here indicates + /// that we've sent the provisioner the backed candidate. + backed: HashSet, /// We have already reported misbehaviors for these validators. reported_misbehavior_for: HashSet, keystore: SyncCryptoStorePtr, @@ -242,6 +245,41 @@ fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement } } +fn table_attested_to_backed( + attested: TableAttestedCandidate< + ParaId, + CommittedCandidateReceipt, + ValidatorIndex, + ValidatorSignature, + >, + table_context: &TableContext, +) -> Option { + let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested; + + let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes + .into_iter() + .map(|(id, vote)| (id, vote.into())) + .unzip(); + + let group = table_context.groups.get(¶_id)?; + + let mut validator_indices = BitVec::with_capacity(group.len()); + + validator_indices.resize(group.len(), false); + + for id in ids.iter() { + if let Some(position) = group.iter().position(|x| x == id) { + validator_indices.set(position, true); + } + } + + Some(BackedCandidate { + candidate, + validity_votes, + validator_indices, + }) +} + impl CandidateBackingJob { /// Run asynchronously. async fn run_loop(mut self) -> Result<(), Error> { @@ -337,49 +375,30 @@ impl CandidateBackingJob { let issued_statement = statement.is_some(); if let Some(statement) = statement { - if let Some(signed_statement) = self.sign_statement(statement).await { - self.import_statement(&signed_statement).await?; - self.distribute_signed_statement(signed_statement).await?; - } + self.sign_import_and_distribute_statement(statement).await? } Ok(issued_statement) } + async fn sign_import_and_distribute_statement(&mut self, statement: Statement) -> Result<(), Error> { + if let Some(signed_statement) = self.sign_statement(statement).await { + self.import_statement(&signed_statement).await?; + self.distribute_signed_statement(signed_statement).await?; + } + + Ok(()) + } + fn get_backed(&self) -> Vec { let proposed = self.table.proposed_candidates(&self.table_context); let mut res = Vec::with_capacity(proposed.len()); for p in proposed.into_iter() { - let TableAttestedCandidate { candidate, validity_votes, .. } = p; - - let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes - .into_iter() - .map(|(id, vote)| (id, vote.into())) - .unzip(); - - let group = match self.table_context.groups.get(&self.assignment) { - Some(group) => group, + match table_attested_to_backed(p, &self.table_context) { None => continue, - }; - - let mut validator_indices = BitVec::with_capacity(group.len()); - - validator_indices.resize(group.len(), false); - - for id in ids.iter() { - if let Some(position) = group.iter().position(|x| x == id) { - validator_indices.set(position, true); - } + Some(backed) => res.push(NewBackedCandidate(backed)), } - - let backed = BackedCandidate { - candidate, - validity_votes, - validator_indices, - }; - - res.push(NewBackedCandidate(backed.clone())); } res @@ -428,9 +447,29 @@ impl CandidateBackingJob { let summary = self.table.import_statement(&self.table_context, stmt); + if let Some(ref summary) = summary { + if let Some(attested) = self.table.attested_candidate( + &summary.candidate, + &self.table_context, + ) { + // `HashSet::insert` returns true if the thing wasn't in there already. + // one of the few places the Rust-std folks did a bad job with API + if self.backed.insert(summary.candidate) { + if let Some(backed) = + table_attested_to_backed(attested, &self.table_context) + { + let message = ProvisionerMessage::ProvisionableData( + ProvisionableData::BackedCandidate(backed), + ); + self.send_to_provisioner(message).await?; + } + } + } + } + self.issue_new_misbehaviors().await?; - return Ok(summary); + Ok(summary) } async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> { @@ -444,23 +483,19 @@ impl CandidateBackingJob { // If the message is a `CandidateBackingMessage::Second`, sign and dispatch a // Seconded statement only if we have not seconded any other candidate and // have not signed a Valid statement for the requested candidate. - match self.seconded { + if self.seconded.is_none() { // This job has not seconded a candidate yet. - None => { - let candidate_hash = candidate.hash(); + let candidate_hash = candidate.hash(); - if !self.issued_statements.contains(&candidate_hash) { - if let Ok(true) = self.validate_and_second( - &candidate, - pov, - ).await { - self.metrics.on_candidate_seconded(); - self.seconded = Some(candidate_hash); - } + if !self.issued_statements.contains(&candidate_hash) { + if let Ok(true) = self.validate_and_second( + &candidate, + pov, + ).await { + self.metrics.on_candidate_seconded(); + self.seconded = Some(candidate_hash); } } - // This job has already seconded a candidate. - Some(_) => {} } } CandidateBackingMessage::Statement(_, statement) => { @@ -541,11 +576,7 @@ impl CandidateBackingJob { self.issued_statements.insert(candidate_hash); - if let Some(signed_statement) = self.sign_statement(statement).await { - self.distribute_signed_statement(signed_statement).await?; - } - - Ok(()) + self.sign_import_and_distribute_statement(statement).await } /// Import the statement and kick off validation work if it is a part of our assignment. @@ -818,6 +849,7 @@ impl util::JobTrait for CandidateBackingJob { required_collator, issued_statements: HashSet::new(), seconded: None, + backed: HashSet::new(), reported_misbehavior_for: HashSet::new(), keystore, table: Table::default(), @@ -930,6 +962,7 @@ mod tests { Sr25519Keyring::Charlie, Sr25519Keyring::Dave, Sr25519Keyring::Ferdie, + Sr25519Keyring::One, ]; let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory()); @@ -939,7 +972,7 @@ mod tests { let validator_public = validator_pubkeys(&validators); - let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]]; + let validator_groups = vec![vec![2, 0, 3, 5], vec![1], vec![4]]; let group_rotation_info = GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, @@ -1225,12 +1258,20 @@ mod tests { let candidate_a_hash = candidate_a.hash(); let public0 = CryptoStore::sr25519_generate_new( &*test_state.keystore, - ValidatorId::ID, Some(&test_state.validators[0].to_seed()) + ValidatorId::ID, + Some(&test_state.validators[0].to_seed()), + ).await.expect("Insert key into keystore"); + let public1 = CryptoStore::sr25519_generate_new( + &*test_state.keystore, + ValidatorId::ID, + Some(&test_state.validators[5].to_seed()), ).await.expect("Insert key into keystore"); let public2 = CryptoStore::sr25519_generate_new( &*test_state.keystore, - ValidatorId::ID, Some(&test_state.validators[2].to_seed()) + ValidatorId::ID, + Some(&test_state.validators[2].to_seed()), ).await.expect("Insert key into keystore"); + let signed_a = SignedFullStatement::sign( &test_state.keystore, Statement::Seconded(candidate_a.clone()), @@ -1243,8 +1284,8 @@ mod tests { &test_state.keystore, Statement::Valid(candidate_a_hash), &test_state.signing_context, - 0, - &public0.into(), + 5, + &public1.into(), ).await.expect("should be signed"); let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone()); @@ -1301,28 +1342,38 @@ mod tests { virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; - let (tx, rx) = oneshot::channel(); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::Share(hash, stmt) + ) => { + assert_eq!(test_state.relay_parent, hash); + stmt.check_signature(&test_state.signing_context, &public0.into()).expect("Is signed correctly"); + } + ); - // The backed candidats set should be not empty at this point. - virtual_overseer.send(FromOverseer::Communication{ - msg: CandidateBackingMessage::GetBackedCandidates( - test_state.relay_parent, - tx, - ) - }).await; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::Provisioner( + ProvisionerMessage::ProvisionableData( + ProvisionableData::BackedCandidate(BackedCandidate { + candidate, + validity_votes, + validator_indices, + }) + ) + ) if candidate == candidate_a => { + assert_eq!(validity_votes.len(), 3); - let backed = rx.await.unwrap(); - - // `validity_votes` may be in any order so we can't do this in a single assert. - assert_eq!(backed[0].0.candidate, candidate_a); - assert_eq!(backed[0].0.validity_votes.len(), 2); - assert!(backed[0].0.validity_votes.contains( - &ValidityAttestation::Explicit(signed_b.signature().clone()) - )); - assert!(backed[0].0.validity_votes.contains( - &ValidityAttestation::Implicit(signed_a.signature().clone()) - )); - assert_eq!(backed[0].0.validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0]); + assert!(validity_votes.contains( + &ValidityAttestation::Explicit(signed_b.signature().clone()) + )); + assert!(validity_votes.contains( + &ValidityAttestation::Implicit(signed_a.signature().clone()) + )); + assert_eq!(validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0, 1]); + } + ); virtual_overseer.send(FromOverseer::Signal( OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) @@ -1376,10 +1427,10 @@ mod tests { let signed_b = SignedFullStatement::sign( &test_state.keystore, - Statement::Valid(candidate_a_hash), + Statement::Invalid(candidate_a_hash), &test_state.signing_context, - 0, - &public0.into(), + 2, + &public2.into(), ).await.expect("should be signed"); let signed_c = SignedFullStatement::sign( @@ -1449,10 +1500,36 @@ mod tests { } ); + // This `Invalid` statement contradicts the `Candidate` statement + // sent at first. let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_b.clone()); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::Provisioner( + ProvisionerMessage::ProvisionableData( + ProvisionableData::MisbehaviorReport( + relay_parent, + MisbehaviorReport::SelfContradiction(_, s1, s2), + ) + ) + ) if relay_parent == test_state.relay_parent => { + s1.check_signature( + &test_state.signing_context, + &test_state.validator_public[s1.validator_index() as usize], + ).unwrap(); + + s2.check_signature( + &test_state.signing_context, + &test_state.validator_public[s2.validator_index() as usize], + ).unwrap(); + } + ); + + // This `Invalid` statement contradicts the `Valid` statement the subsystem + // should have issued behind the scenes. let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_c.clone()); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 3fe479768a..1d29c064b6 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -368,8 +368,7 @@ async fn select_candidates( let (scheduled_core, assumption) = match core { CoreState::Scheduled(scheduled_core) => (scheduled_core, OccupiedCoreAssumption::Free), CoreState::Occupied(occupied_core) => { - if bitfields_indicate_availability(core_idx, bitfields, &occupied_core.availability) - { + if bitfields_indicate_availability(core_idx, bitfields, &occupied_core.availability) { if let Some(ref scheduled_core) = occupied_core.next_up_on_available { (scheduled_core, OccupiedCoreAssumption::Included) } else { @@ -511,4 +510,4 @@ impl metrics::Metrics for Metrics { delegated_subsystem!(ProvisioningJob((), Metrics) <- ToJob as ProvisioningSubsystem); #[cfg(test)] -mod tests; \ No newline at end of file +mod tests; diff --git a/polkadot/statement-table/src/generic.rs b/polkadot/statement-table/src/generic.rs index cb95d74d62..ba6f827853 100644 --- a/polkadot/statement-table/src/generic.rs +++ b/polkadot/statement-table/src/generic.rs @@ -256,15 +256,14 @@ impl CandidateData { // if it has enough validity votes // and no authorities have called it bad. fn can_be_included(&self, validity_threshold: usize) -> bool { - self.indicated_bad_by.is_empty() - && self.validity_votes.len() >= validity_threshold + self.validity_votes.len() >= validity_threshold } fn summary(&self, digest: C::Digest) -> Summary { Summary { candidate: digest, group_id: self.group_id.clone(), - validity_votes: self.validity_votes.len() - self.indicated_bad_by.len(), + validity_votes: self.validity_votes.len(), signalled_bad: self.indicated_bad(), } } @@ -362,6 +361,20 @@ impl Table { }) } + /// Get the attested candidate for `digest`. + /// + /// Returns `Some(_)` if the candidate exists and is includable. + pub fn attested_candidate(&self, digest: &C::Digest, context: &C) + -> Option> + { + self.candidate_votes.get(digest).and_then(|data| { + let v_threshold = context.requisite_votes(&data.group_id); + data.attested(v_threshold) + }) + } + /// Import a signed statement. Signatures should be checked for validity, and the /// sender should be checked to actually be an authority. /// @@ -489,7 +502,7 @@ impl Table { if new_proposal { self.candidate_votes.entry(digest.clone()).or_insert_with(move || CandidateData { group_id: group, - candidate: candidate, + candidate, validity_votes: HashMap::new(), indicated_bad_by: Vec::new(), }); @@ -581,7 +594,7 @@ impl Table { } Entry::Vacant(vacant) => { if let ValidityVote::Invalid(_) = vote { - votes.indicated_bad_by.push(from); + votes.indicated_bad_by.push(from.clone()); } vacant.insert(vote); @@ -595,7 +608,12 @@ impl Table { } } -fn update_includable_count(map: &mut HashMap, group_id: &G, was_includable: bool, is_includable: bool) { +fn update_includable_count( + map: &mut HashMap, + group_id: &G, + was_includable: bool, + is_includable: bool, +) { if was_includable && !is_includable { if let Entry::Occupied(mut entry) = map.entry(group_id.clone()) { *entry.get_mut() -= 1; @@ -989,7 +1007,7 @@ mod tests { candidate.indicated_bad_by.push(AuthorityId(1024)); - assert!(!candidate.can_be_included(validity_threshold)); + assert!(candidate.can_be_included(validity_threshold)); } #[test] @@ -1039,8 +1057,8 @@ mod tests { table.import_statement(&context, vote); assert!(!table.detected_misbehavior.contains_key(&AuthorityId(3))); - assert!(!table.candidate_includable(&candidate_digest, &context)); - assert!(table.includable_count.is_empty()); + assert!(table.candidate_includable(&candidate_digest, &context)); + assert!(table.includable_count.get(&GroupId(2)).is_some()); } #[test]