mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 18:41:03 +00:00
Backing fixes (#1897)
* Commit my changes * some backing fixes * indentation * fix backing tests * tweak includability rules * comment * Update node/core/backing/src/lib.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update node/core/backing/src/lib.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update node/core/backing/src/lib.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update node/core/backing/src/lib.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Bastian Köcher <git@kchr.de> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
1f4121c444
commit
1f48725c54
@@ -104,6 +104,9 @@ struct CandidateBackingJob {
|
||||
issued_statements: HashSet<Hash>,
|
||||
/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
|
||||
seconded: Option<Hash>,
|
||||
/// The candidates that are includable, by hash. Each entry here indicates
|
||||
/// that we've sent the provisioner the backed candidate.
|
||||
backed: HashSet<Hash>,
|
||||
/// We have already reported misbehaviors for these validators.
|
||||
reported_misbehavior_for: HashSet<ValidatorIndex>,
|
||||
keystore: SyncCryptoStorePtr,
|
||||
@@ -242,6 +245,41 @@ fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement
|
||||
}
|
||||
}
|
||||
|
||||
fn table_attested_to_backed(
|
||||
attested: TableAttestedCandidate<
|
||||
ParaId,
|
||||
CommittedCandidateReceipt,
|
||||
ValidatorIndex,
|
||||
ValidatorSignature,
|
||||
>,
|
||||
table_context: &TableContext,
|
||||
) -> Option<BackedCandidate> {
|
||||
let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;
|
||||
|
||||
let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes
|
||||
.into_iter()
|
||||
.map(|(id, vote)| (id, vote.into()))
|
||||
.unzip();
|
||||
|
||||
let group = table_context.groups.get(¶_id)?;
|
||||
|
||||
let mut validator_indices = BitVec::with_capacity(group.len());
|
||||
|
||||
validator_indices.resize(group.len(), false);
|
||||
|
||||
for id in ids.iter() {
|
||||
if let Some(position) = group.iter().position(|x| x == id) {
|
||||
validator_indices.set(position, true);
|
||||
}
|
||||
}
|
||||
|
||||
Some(BackedCandidate {
|
||||
candidate,
|
||||
validity_votes,
|
||||
validator_indices,
|
||||
})
|
||||
}
|
||||
|
||||
impl CandidateBackingJob {
|
||||
/// Run asynchronously.
|
||||
async fn run_loop(mut self) -> Result<(), Error> {
|
||||
@@ -337,49 +375,30 @@ impl CandidateBackingJob {
|
||||
let issued_statement = statement.is_some();
|
||||
|
||||
if let Some(statement) = statement {
|
||||
if let Some(signed_statement) = self.sign_statement(statement).await {
|
||||
self.import_statement(&signed_statement).await?;
|
||||
self.distribute_signed_statement(signed_statement).await?;
|
||||
}
|
||||
self.sign_import_and_distribute_statement(statement).await?
|
||||
}
|
||||
|
||||
Ok(issued_statement)
|
||||
}
|
||||
|
||||
async fn sign_import_and_distribute_statement(&mut self, statement: Statement) -> Result<(), Error> {
|
||||
if let Some(signed_statement) = self.sign_statement(statement).await {
|
||||
self.import_statement(&signed_statement).await?;
|
||||
self.distribute_signed_statement(signed_statement).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_backed(&self) -> Vec<NewBackedCandidate> {
|
||||
let proposed = self.table.proposed_candidates(&self.table_context);
|
||||
let mut res = Vec::with_capacity(proposed.len());
|
||||
|
||||
for p in proposed.into_iter() {
|
||||
let TableAttestedCandidate { candidate, validity_votes, .. } = p;
|
||||
|
||||
let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes
|
||||
.into_iter()
|
||||
.map(|(id, vote)| (id, vote.into()))
|
||||
.unzip();
|
||||
|
||||
let group = match self.table_context.groups.get(&self.assignment) {
|
||||
Some(group) => group,
|
||||
match table_attested_to_backed(p, &self.table_context) {
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let mut validator_indices = BitVec::with_capacity(group.len());
|
||||
|
||||
validator_indices.resize(group.len(), false);
|
||||
|
||||
for id in ids.iter() {
|
||||
if let Some(position) = group.iter().position(|x| x == id) {
|
||||
validator_indices.set(position, true);
|
||||
}
|
||||
Some(backed) => res.push(NewBackedCandidate(backed)),
|
||||
}
|
||||
|
||||
let backed = BackedCandidate {
|
||||
candidate,
|
||||
validity_votes,
|
||||
validator_indices,
|
||||
};
|
||||
|
||||
res.push(NewBackedCandidate(backed.clone()));
|
||||
}
|
||||
|
||||
res
|
||||
@@ -428,9 +447,29 @@ impl CandidateBackingJob {
|
||||
|
||||
let summary = self.table.import_statement(&self.table_context, stmt);
|
||||
|
||||
if let Some(ref summary) = summary {
|
||||
if let Some(attested) = self.table.attested_candidate(
|
||||
&summary.candidate,
|
||||
&self.table_context,
|
||||
) {
|
||||
// `HashSet::insert` returns true if the thing wasn't in there already.
|
||||
// one of the few places the Rust-std folks did a bad job with API
|
||||
if self.backed.insert(summary.candidate) {
|
||||
if let Some(backed) =
|
||||
table_attested_to_backed(attested, &self.table_context)
|
||||
{
|
||||
let message = ProvisionerMessage::ProvisionableData(
|
||||
ProvisionableData::BackedCandidate(backed),
|
||||
);
|
||||
self.send_to_provisioner(message).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.issue_new_misbehaviors().await?;
|
||||
|
||||
return Ok(summary);
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
|
||||
@@ -444,23 +483,19 @@ impl CandidateBackingJob {
|
||||
// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
|
||||
// Seconded statement only if we have not seconded any other candidate and
|
||||
// have not signed a Valid statement for the requested candidate.
|
||||
match self.seconded {
|
||||
if self.seconded.is_none() {
|
||||
// This job has not seconded a candidate yet.
|
||||
None => {
|
||||
let candidate_hash = candidate.hash();
|
||||
let candidate_hash = candidate.hash();
|
||||
|
||||
if !self.issued_statements.contains(&candidate_hash) {
|
||||
if let Ok(true) = self.validate_and_second(
|
||||
&candidate,
|
||||
pov,
|
||||
).await {
|
||||
self.metrics.on_candidate_seconded();
|
||||
self.seconded = Some(candidate_hash);
|
||||
}
|
||||
if !self.issued_statements.contains(&candidate_hash) {
|
||||
if let Ok(true) = self.validate_and_second(
|
||||
&candidate,
|
||||
pov,
|
||||
).await {
|
||||
self.metrics.on_candidate_seconded();
|
||||
self.seconded = Some(candidate_hash);
|
||||
}
|
||||
}
|
||||
// This job has already seconded a candidate.
|
||||
Some(_) => {}
|
||||
}
|
||||
}
|
||||
CandidateBackingMessage::Statement(_, statement) => {
|
||||
@@ -541,11 +576,7 @@ impl CandidateBackingJob {
|
||||
|
||||
self.issued_statements.insert(candidate_hash);
|
||||
|
||||
if let Some(signed_statement) = self.sign_statement(statement).await {
|
||||
self.distribute_signed_statement(signed_statement).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
self.sign_import_and_distribute_statement(statement).await
|
||||
}
|
||||
|
||||
/// Import the statement and kick off validation work if it is a part of our assignment.
|
||||
@@ -818,6 +849,7 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
required_collator,
|
||||
issued_statements: HashSet::new(),
|
||||
seconded: None,
|
||||
backed: HashSet::new(),
|
||||
reported_misbehavior_for: HashSet::new(),
|
||||
keystore,
|
||||
table: Table::default(),
|
||||
@@ -930,6 +962,7 @@ mod tests {
|
||||
Sr25519Keyring::Charlie,
|
||||
Sr25519Keyring::Dave,
|
||||
Sr25519Keyring::Ferdie,
|
||||
Sr25519Keyring::One,
|
||||
];
|
||||
|
||||
let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory());
|
||||
@@ -939,7 +972,7 @@ mod tests {
|
||||
|
||||
let validator_public = validator_pubkeys(&validators);
|
||||
|
||||
let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]];
|
||||
let validator_groups = vec![vec![2, 0, 3, 5], vec![1], vec![4]];
|
||||
let group_rotation_info = GroupRotationInfo {
|
||||
session_start_block: 0,
|
||||
group_rotation_frequency: 100,
|
||||
@@ -1225,12 +1258,20 @@ mod tests {
|
||||
let candidate_a_hash = candidate_a.hash();
|
||||
let public0 = CryptoStore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID, Some(&test_state.validators[0].to_seed())
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[0].to_seed()),
|
||||
).await.expect("Insert key into keystore");
|
||||
let public1 = CryptoStore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[5].to_seed()),
|
||||
).await.expect("Insert key into keystore");
|
||||
let public2 = CryptoStore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID, Some(&test_state.validators[2].to_seed())
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[2].to_seed()),
|
||||
).await.expect("Insert key into keystore");
|
||||
|
||||
let signed_a = SignedFullStatement::sign(
|
||||
&test_state.keystore,
|
||||
Statement::Seconded(candidate_a.clone()),
|
||||
@@ -1243,8 +1284,8 @@ mod tests {
|
||||
&test_state.keystore,
|
||||
Statement::Valid(candidate_a_hash),
|
||||
&test_state.signing_context,
|
||||
0,
|
||||
&public0.into(),
|
||||
5,
|
||||
&public1.into(),
|
||||
).await.expect("should be signed");
|
||||
|
||||
let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone());
|
||||
@@ -1301,28 +1342,38 @@ mod tests {
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::StatementDistribution(
|
||||
StatementDistributionMessage::Share(hash, stmt)
|
||||
) => {
|
||||
assert_eq!(test_state.relay_parent, hash);
|
||||
stmt.check_signature(&test_state.signing_context, &public0.into()).expect("Is signed correctly");
|
||||
}
|
||||
);
|
||||
|
||||
// The backed candidats set should be not empty at this point.
|
||||
virtual_overseer.send(FromOverseer::Communication{
|
||||
msg: CandidateBackingMessage::GetBackedCandidates(
|
||||
test_state.relay_parent,
|
||||
tx,
|
||||
)
|
||||
}).await;
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
ProvisionerMessage::ProvisionableData(
|
||||
ProvisionableData::BackedCandidate(BackedCandidate {
|
||||
candidate,
|
||||
validity_votes,
|
||||
validator_indices,
|
||||
})
|
||||
)
|
||||
) if candidate == candidate_a => {
|
||||
assert_eq!(validity_votes.len(), 3);
|
||||
|
||||
let backed = rx.await.unwrap();
|
||||
|
||||
// `validity_votes` may be in any order so we can't do this in a single assert.
|
||||
assert_eq!(backed[0].0.candidate, candidate_a);
|
||||
assert_eq!(backed[0].0.validity_votes.len(), 2);
|
||||
assert!(backed[0].0.validity_votes.contains(
|
||||
&ValidityAttestation::Explicit(signed_b.signature().clone())
|
||||
));
|
||||
assert!(backed[0].0.validity_votes.contains(
|
||||
&ValidityAttestation::Implicit(signed_a.signature().clone())
|
||||
));
|
||||
assert_eq!(backed[0].0.validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0]);
|
||||
assert!(validity_votes.contains(
|
||||
&ValidityAttestation::Explicit(signed_b.signature().clone())
|
||||
));
|
||||
assert!(validity_votes.contains(
|
||||
&ValidityAttestation::Implicit(signed_a.signature().clone())
|
||||
));
|
||||
assert_eq!(validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0, 1]);
|
||||
}
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOverseer::Signal(
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
|
||||
@@ -1376,10 +1427,10 @@ mod tests {
|
||||
|
||||
let signed_b = SignedFullStatement::sign(
|
||||
&test_state.keystore,
|
||||
Statement::Valid(candidate_a_hash),
|
||||
Statement::Invalid(candidate_a_hash),
|
||||
&test_state.signing_context,
|
||||
0,
|
||||
&public0.into(),
|
||||
2,
|
||||
&public2.into(),
|
||||
).await.expect("should be signed");
|
||||
|
||||
let signed_c = SignedFullStatement::sign(
|
||||
@@ -1449,10 +1500,36 @@ mod tests {
|
||||
}
|
||||
);
|
||||
|
||||
// This `Invalid` statement contradicts the `Candidate` statement
|
||||
// sent at first.
|
||||
let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_b.clone());
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
ProvisionerMessage::ProvisionableData(
|
||||
ProvisionableData::MisbehaviorReport(
|
||||
relay_parent,
|
||||
MisbehaviorReport::SelfContradiction(_, s1, s2),
|
||||
)
|
||||
)
|
||||
) if relay_parent == test_state.relay_parent => {
|
||||
s1.check_signature(
|
||||
&test_state.signing_context,
|
||||
&test_state.validator_public[s1.validator_index() as usize],
|
||||
).unwrap();
|
||||
|
||||
s2.check_signature(
|
||||
&test_state.signing_context,
|
||||
&test_state.validator_public[s2.validator_index() as usize],
|
||||
).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// This `Invalid` statement contradicts the `Valid` statement the subsystem
|
||||
// should have issued behind the scenes.
|
||||
let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_c.clone());
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
|
||||
|
||||
@@ -368,8 +368,7 @@ async fn select_candidates(
|
||||
let (scheduled_core, assumption) = match core {
|
||||
CoreState::Scheduled(scheduled_core) => (scheduled_core, OccupiedCoreAssumption::Free),
|
||||
CoreState::Occupied(occupied_core) => {
|
||||
if bitfields_indicate_availability(core_idx, bitfields, &occupied_core.availability)
|
||||
{
|
||||
if bitfields_indicate_availability(core_idx, bitfields, &occupied_core.availability) {
|
||||
if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
|
||||
(scheduled_core, OccupiedCoreAssumption::Included)
|
||||
} else {
|
||||
|
||||
@@ -256,15 +256,14 @@ impl<C: Context> CandidateData<C> {
|
||||
// if it has enough validity votes
|
||||
// and no authorities have called it bad.
|
||||
fn can_be_included(&self, validity_threshold: usize) -> bool {
|
||||
self.indicated_bad_by.is_empty()
|
||||
&& self.validity_votes.len() >= validity_threshold
|
||||
self.validity_votes.len() >= validity_threshold
|
||||
}
|
||||
|
||||
fn summary(&self, digest: C::Digest) -> Summary<C::Digest, C::GroupId> {
|
||||
Summary {
|
||||
candidate: digest,
|
||||
group_id: self.group_id.clone(),
|
||||
validity_votes: self.validity_votes.len() - self.indicated_bad_by.len(),
|
||||
validity_votes: self.validity_votes.len(),
|
||||
signalled_bad: self.indicated_bad(),
|
||||
}
|
||||
}
|
||||
@@ -362,6 +361,20 @@ impl<C: Context> Table<C> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the attested candidate for `digest`.
|
||||
///
|
||||
/// Returns `Some(_)` if the candidate exists and is includable.
|
||||
pub fn attested_candidate(&self, digest: &C::Digest, context: &C)
|
||||
-> Option<AttestedCandidate<
|
||||
C::GroupId, C::Candidate, C::AuthorityId, C::Signature,
|
||||
>>
|
||||
{
|
||||
self.candidate_votes.get(digest).and_then(|data| {
|
||||
let v_threshold = context.requisite_votes(&data.group_id);
|
||||
data.attested(v_threshold)
|
||||
})
|
||||
}
|
||||
|
||||
/// Import a signed statement. Signatures should be checked for validity, and the
|
||||
/// sender should be checked to actually be an authority.
|
||||
///
|
||||
@@ -489,7 +502,7 @@ impl<C: Context> Table<C> {
|
||||
if new_proposal {
|
||||
self.candidate_votes.entry(digest.clone()).or_insert_with(move || CandidateData {
|
||||
group_id: group,
|
||||
candidate: candidate,
|
||||
candidate,
|
||||
validity_votes: HashMap::new(),
|
||||
indicated_bad_by: Vec::new(),
|
||||
});
|
||||
@@ -581,7 +594,7 @@ impl<C: Context> Table<C> {
|
||||
}
|
||||
Entry::Vacant(vacant) => {
|
||||
if let ValidityVote::Invalid(_) = vote {
|
||||
votes.indicated_bad_by.push(from);
|
||||
votes.indicated_bad_by.push(from.clone());
|
||||
}
|
||||
|
||||
vacant.insert(vote);
|
||||
@@ -595,7 +608,12 @@ impl<C: Context> Table<C> {
|
||||
}
|
||||
}
|
||||
|
||||
fn update_includable_count<G: Hash + Eq + Clone>(map: &mut HashMap<G, usize>, group_id: &G, was_includable: bool, is_includable: bool) {
|
||||
fn update_includable_count<G: Hash + Eq + Clone>(
|
||||
map: &mut HashMap<G, usize>,
|
||||
group_id: &G,
|
||||
was_includable: bool,
|
||||
is_includable: bool,
|
||||
) {
|
||||
if was_includable && !is_includable {
|
||||
if let Entry::Occupied(mut entry) = map.entry(group_id.clone()) {
|
||||
*entry.get_mut() -= 1;
|
||||
@@ -989,7 +1007,7 @@ mod tests {
|
||||
|
||||
candidate.indicated_bad_by.push(AuthorityId(1024));
|
||||
|
||||
assert!(!candidate.can_be_included(validity_threshold));
|
||||
assert!(candidate.can_be_included(validity_threshold));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1039,8 +1057,8 @@ mod tests {
|
||||
|
||||
table.import_statement(&context, vote);
|
||||
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(3)));
|
||||
assert!(!table.candidate_includable(&candidate_digest, &context));
|
||||
assert!(table.includable_count.is_empty());
|
||||
assert!(table.candidate_includable(&candidate_digest, &context));
|
||||
assert!(table.includable_count.get(&GroupId(2)).is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user