BEEFY: gossip finality proofs (#13727)

* sc-consensus-beefy: add justifications to gossip protocol

* sc-consensus-beefy: voter gossips finality proofs

* sc-consensus-beefy: add finality proof gossip test

* sc-consensus-beefy: always gossip finality proof

Gossip finality proof in _both_ cases of reaching finality threshold
through votes:
1. threshold reached through self vote,
2. threshold reached through incoming vote.

* address comments
This commit is contained in:
Adrian Catangiu
2023-03-30 17:23:36 +03:00
committed by GitHub
parent 25a616ce08
commit 92c1229e24
7 changed files with 637 additions and 201 deletions
+64 -30
View File
@@ -18,7 +18,7 @@
use crate::{
communication::{
gossip::{topic, GossipValidator, GossipVoteFilter},
gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator},
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
},
error::Error,
@@ -42,7 +42,7 @@ use sp_consensus_beefy::{
check_equivocation_proof,
crypto::{AuthorityId, Signature},
BeefyApi, Commitment, ConsensusLog, EquivocationProof, PayloadProvider, ValidatorSet,
ValidatorSetId, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
};
use sp_runtime::{
generic::OpaqueDigestItemId,
@@ -158,8 +158,8 @@ impl<B: Block> VoterOracle<B> {
self.sessions.front_mut().ok_or(Error::UninitSession)
}
fn current_validator_set_id(&self) -> Result<ValidatorSetId, Error> {
self.active_rounds().map(|r| r.validator_set_id())
fn current_validator_set(&self) -> Result<&ValidatorSet<AuthorityId>, Error> {
self.active_rounds().map(|r| r.validator_set())
}
// Prune the sessions queue to keep the Oracle in one of the expected three states.
@@ -301,10 +301,10 @@ impl<B: Block> PersistedState<B> {
self.voting_oracle.best_grandpa_block_header = best_grandpa;
}
pub(crate) fn current_gossip_filter(&self) -> Result<GossipVoteFilter<B>, Error> {
pub(crate) fn gossip_filter_config(&self) -> Result<GossipFilterCfg<B>, Error> {
let (start, end) = self.voting_oracle.accepted_interval()?;
let validator_set_id = self.voting_oracle.current_validator_set_id()?;
Ok(GossipVoteFilter { start, end, validator_set_id })
let validator_set = self.voting_oracle.current_validator_set()?;
Ok(GossipFilterCfg { start, end, validator_set })
}
}
@@ -494,7 +494,7 @@ where
// Update gossip validator votes filter.
if let Err(e) = self
.persisted_state
.current_gossip_filter()
.gossip_filter_config()
.map(|filter| self.gossip_validator.update_filter(filter))
{
error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e);
@@ -509,7 +509,12 @@ where
) -> Result<(), Error> {
let block_num = vote.commitment.block_number;
match self.voting_oracle().triage_round(block_num)? {
RoundAction::Process => self.handle_vote(vote)?,
RoundAction::Process =>
if let Some(finality_proof) = self.handle_vote(vote)? {
let gossip_proof = GossipMessage::<B>::FinalityProof(finality_proof);
let encoded_proof = gossip_proof.encode();
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
},
RoundAction::Drop => metric_inc!(self, beefy_stale_votes),
RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
};
@@ -554,7 +559,7 @@ where
fn handle_vote(
&mut self,
vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>,
) -> Result<(), Error> {
) -> Result<Option<BeefyVersionedFinalityProof<B>>, Error> {
let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
let block_number = vote.commitment.block_number;
@@ -567,8 +572,9 @@ where
);
// We created the `finality_proof` and know to be valid.
// New state is persisted after finalization.
self.finalize(finality_proof)?;
self.finalize(finality_proof.clone())?;
metric_inc!(self, beefy_good_votes_processed);
return Ok(Some(finality_proof))
},
VoteImportResult::Ok => {
// Persist state after handling mandatory block vote.
@@ -590,7 +596,7 @@ where
VoteImportResult::Invalid => metric_inc!(self, beefy_invalid_votes),
VoteImportResult::Stale => metric_inc!(self, beefy_stale_votes),
};
Ok(())
Ok(None)
}
/// Provide BEEFY finality for block based on `finality_proof`:
@@ -643,7 +649,7 @@ where
// Update gossip validator votes filter.
self.persisted_state
.current_gossip_filter()
.gossip_filter_config()
.map(|filter| self.gossip_validator.update_filter(filter))?;
Ok(())
}
@@ -758,20 +764,20 @@ where
BeefyKeystore::verify(&authority_id, &signature, &encoded_commitment)
);
let message = VoteMessage { commitment, id: authority_id, signature };
let encoded_message = message.encode();
metric_inc!(self, beefy_votes_sent);
debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", message);
if let Err(err) = self.handle_vote(message) {
let vote = VoteMessage { commitment, id: authority_id, signature };
if let Some(finality_proof) = self.handle_vote(vote.clone()).map_err(|err| {
error!(target: LOG_TARGET, "🥩 Error handling self vote: {}", err);
err
})? {
let encoded_proof = GossipMessage::<B>::FinalityProof(finality_proof).encode();
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
} else {
metric_inc!(self, beefy_votes_sent);
debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
self.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
}
self.gossip_engine.gossip_message(topic::<B>(), encoded_message, false);
// Persist state after vote to avoid double voting in case of voter restarts.
self.persisted_state.best_voted = target_number;
metric_set!(self, beefy_best_voted, target_number);
@@ -816,17 +822,28 @@ where
let mut votes = Box::pin(
self.gossip_engine
.messages_for(topic::<B>())
.messages_for(votes_topic::<B>())
.filter_map(|notification| async move {
let vote = VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode(
&mut &notification.message[..],
)
.ok();
let vote = GossipMessage::<B>::decode(&mut &notification.message[..])
.ok()
.and_then(|message| message.unwrap_vote());
trace!(target: LOG_TARGET, "🥩 Got vote message: {:?}", vote);
vote
})
.fuse(),
);
let mut gossip_proofs = Box::pin(
self.gossip_engine
.messages_for(proofs_topic::<B>())
.filter_map(|notification| async move {
let proof = GossipMessage::<B>::decode(&mut &notification.message[..])
.ok()
.and_then(|message| message.unwrap_finality_proof());
trace!(target: LOG_TARGET, "🥩 Got gossip proof message: {:?}", proof);
proof
})
.fuse(),
);
loop {
// Act on changed 'state'.
@@ -872,6 +889,20 @@ where
return;
}
},
justif = gossip_proofs.next() => {
if let Some(justif) = justif {
// Gossiped justifications have already been verified by `GossipValidator`.
if let Err(err) = self.triage_incoming_justif(justif) {
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
error!(
target: LOG_TARGET,
"🥩 Finality proofs gossiping stream terminated, closing worker."
);
return;
}
},
// Finally process incoming votes.
vote = votes.next() => {
if let Some(vote) = vote {
@@ -880,7 +911,10 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
error!(target: LOG_TARGET, "🥩 Votes gossiping stream terminated, closing worker.");
error!(
target: LOG_TARGET,
"🥩 Votes gossiping stream terminated, closing worker."
);
return;
}
},