BEEFY: client support for detecting equivocated votes (#13285)

* client/beefy: detect equivocated votes

* client/beefy: make sure to persist state after voting

* client/beefy: drop never-used aux-schema v2 migration

* impl review suggestion

---------

Signed-off-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
Adrian Catangiu
2023-02-06 14:42:54 +02:00
committed by GitHub
parent c000d18111
commit 8851bbb0f0
13 changed files with 368 additions and 350 deletions
+56 -77
View File
@@ -26,13 +26,13 @@ use crate::{
keystore::BeefyKeystore,
metric_inc, metric_set,
metrics::Metrics,
round::Rounds,
round::{Rounds, VoteImportResult},
BeefyVoterLinks, LOG_TARGET,
};
use beefy_primitives::{
crypto::{AuthorityId, Signature},
Commitment, ConsensusLog, Payload, PayloadProvider, SignedCommitment, ValidatorSet,
VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
Commitment, ConsensusLog, PayloadProvider, ValidatorSet, VersionedFinalityProof, VoteMessage,
BEEFY_ENGINE_ID,
};
use codec::{Codec, Decode, Encode};
use futures::{stream::Fuse, FutureExt, StreamExt};
@@ -55,6 +55,7 @@ use std::{
marker::PhantomData,
sync::Arc,
};
/// Bound for the number of buffered future voting rounds.
const MAX_BUFFERED_VOTE_ROUNDS: usize = 600;
/// Bound for the number of buffered votes per round number.
@@ -83,17 +84,14 @@ pub(crate) struct VoterOracle<B: Block> {
/// 3. lagging behind GRANDPA: queue has [1, N] elements, where all `mandatory_done == false`.
/// In this state, everytime a session gets its mandatory block BEEFY finalized, it's
/// popped off the queue, eventually getting to state `2. up-to-date`.
sessions: VecDeque<Rounds<Payload, B>>,
sessions: VecDeque<Rounds<B>>,
/// Min delta in block numbers between two blocks, BEEFY should vote on.
min_block_delta: u32,
}
impl<B: Block> VoterOracle<B> {
/// Verify provided `sessions` satisfies requirements, then build `VoterOracle`.
pub fn checked_new(
sessions: VecDeque<Rounds<Payload, B>>,
min_block_delta: u32,
) -> Option<Self> {
pub fn checked_new(sessions: VecDeque<Rounds<B>>, min_block_delta: u32) -> Option<Self> {
let mut prev_start = Zero::zero();
let mut prev_validator_id = None;
// verifies the
@@ -136,13 +134,13 @@ impl<B: Block> VoterOracle<B> {
// Return reference to rounds pertaining to first session in the queue.
// Voting will always happen at the head of the queue.
fn active_rounds(&self) -> Option<&Rounds<Payload, B>> {
fn active_rounds(&self) -> Option<&Rounds<B>> {
self.sessions.front()
}
// Return mutable reference to rounds pertaining to first session in the queue.
// Voting will always happen at the head of the queue.
fn active_rounds_mut(&mut self) -> Option<&mut Rounds<Payload, B>> {
fn active_rounds_mut(&mut self) -> Option<&mut Rounds<B>> {
self.sessions.front_mut()
}
@@ -157,7 +155,7 @@ impl<B: Block> VoterOracle<B> {
}
/// Add new observed session to the Oracle.
pub fn add_session(&mut self, rounds: Rounds<Payload, B>) {
pub fn add_session(&mut self, rounds: Rounds<B>) {
self.sessions.push_back(rounds);
// Once we add a new session we can drop/prune previous session if it's been finalized.
self.try_prune();
@@ -264,6 +262,8 @@ pub(crate) struct PersistedState<B: Block> {
best_grandpa_block_header: <B as Block>::Header,
/// Best block a BEEFY voting round has been concluded for.
best_beefy_block: NumberFor<B>,
/// Best block we voted on.
best_voted: NumberFor<B>,
/// Chooses which incoming votes to accept and which votes to generate.
/// Keeps track of voting seen for current and future rounds.
voting_oracle: VoterOracle<B>,
@@ -273,12 +273,13 @@ impl<B: Block> PersistedState<B> {
pub fn checked_new(
grandpa_header: <B as Block>::Header,
best_beefy: NumberFor<B>,
sessions: VecDeque<Rounds<Payload, B>>,
sessions: VecDeque<Rounds<B>>,
min_block_delta: u32,
) -> Option<Self> {
VoterOracle::checked_new(sessions, min_block_delta).map(|voting_oracle| PersistedState {
best_grandpa_block_header: grandpa_header,
best_beefy_block: best_beefy,
best_voted: Zero::zero(),
voting_oracle,
})
}
@@ -381,7 +382,7 @@ where
&self.persisted_state.voting_oracle
}
fn active_rounds(&mut self) -> Option<&Rounds<Payload, B>> {
fn active_rounds(&mut self) -> Option<&Rounds<B>> {
self.persisted_state.voting_oracle.active_rounds()
}
@@ -486,12 +487,9 @@ where
) -> Result<(), Error> {
let block_num = vote.commitment.block_number;
let best_grandpa = self.best_grandpa_block();
self.gossip_validator.note_round(block_num);
match self.voting_oracle().triage_round(block_num, best_grandpa)? {
RoundAction::Process => self.handle_vote(
(vote.commitment.payload, vote.commitment.block_number),
(vote.id, vote.signature),
false,
)?,
RoundAction::Process => self.handle_vote(vote)?,
RoundAction::Enqueue => {
debug!(target: LOG_TARGET, "🥩 Buffer vote for round: {:?}.", block_num);
if self.pending_votes.len() < MAX_BUFFERED_VOTE_ROUNDS {
@@ -546,57 +544,47 @@ where
fn handle_vote(
&mut self,
round: (Payload, NumberFor<B>),
vote: (AuthorityId, Signature),
self_vote: bool,
vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>,
) -> Result<(), Error> {
self.gossip_validator.note_round(round.1);
let rounds = self
.persisted_state
.voting_oracle
.active_rounds_mut()
.ok_or(Error::UninitSession)?;
if rounds.add_vote(&round, vote, self_vote) {
if let Some(signatures) = rounds.should_conclude(&round) {
self.gossip_validator.conclude_round(round.1);
let block_num = round.1;
let commitment = Commitment {
payload: round.0,
block_number: block_num,
validator_set_id: rounds.validator_set_id(),
};
let finality_proof =
VersionedFinalityProof::V1(SignedCommitment { commitment, signatures });
metric_set!(self, beefy_round_concluded, block_num);
let block_number = vote.commitment.block_number;
match rounds.add_vote(vote) {
VoteImportResult::RoundConcluded(signed_commitment) => {
self.gossip_validator.conclude_round(block_number);
metric_set!(self, beefy_round_concluded, block_number);
let finality_proof = VersionedFinalityProof::V1(signed_commitment);
info!(
target: LOG_TARGET,
"🥩 Round #{} concluded, finality_proof: {:?}.", round.1, finality_proof
"🥩 Round #{} concluded, finality_proof: {:?}.", block_number, finality_proof
);
// We created the `finality_proof` and know to be valid.
// New state is persisted after finalization.
self.finalize(finality_proof)?;
} else {
let mandatory_round = self
},
VoteImportResult::Ok => {
// Persist state after handling mandatory block vote.
if self
.voting_oracle()
.mandatory_pending()
.map(|p| p.0 == round.1)
.unwrap_or(false);
// Persist state after handling self vote to avoid double voting in case
// of voter restarts.
// Also persist state after handling mandatory block vote.
if self_vote || mandatory_round {
.map(|(mandatory_num, _)| mandatory_num == block_number)
.unwrap_or(false)
{
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
.map_err(|e| Error::Backend(e.to_string()))?;
}
}
}
},
VoteImportResult::Equivocation => {
// TODO: report returned `EquivocationProof` to chain through `pallet-beefy`.
()
},
VoteImportResult::Invalid | VoteImportResult::Stale => (),
};
Ok(())
}
@@ -695,11 +683,7 @@ where
for (num, votes) in votes_to_handle.into_iter() {
debug!(target: LOG_TARGET, "🥩 Handle buffered votes for: {:?}.", num);
for v in votes.into_iter() {
if let Err(err) = self.handle_vote(
(v.commitment.payload, v.commitment.block_number),
(v.id, v.signature),
false,
) {
if let Err(err) = self.handle_vote(v) {
error!(target: LOG_TARGET, "🥩 Error handling buffered vote: {}", err);
};
}
@@ -716,7 +700,9 @@ where
.voting_target(self.best_beefy_block(), self.best_grandpa_block())
{
metric_set!(self, beefy_should_vote_on, target);
self.do_vote(target)?;
if target > self.persisted_state.best_voted {
self.do_vote(target)?;
}
}
Ok(())
}
@@ -766,13 +752,6 @@ where
.voting_oracle
.active_rounds_mut()
.ok_or(Error::UninitSession)?;
if !rounds.should_self_vote(&(payload.clone(), target_number)) {
debug!(
target: LOG_TARGET,
"🥩 Don't double vote for block number: {:?}", target_number
);
return Ok(())
}
let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id());
let authority_id = if let Some(id) = self.key_store.authority_id(validators) {
@@ -812,17 +791,17 @@ where
debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", message);
if let Err(err) = self.handle_vote(
(message.commitment.payload, message.commitment.block_number),
(message.id, message.signature),
true,
) {
if let Err(err) = self.handle_vote(message) {
error!(target: LOG_TARGET, "🥩 Error handling self vote: {}", err);
}
self.gossip_engine.gossip_message(topic::<B>(), encoded_message, false);
Ok(())
// Persist state after vote to avoid double voting in case of voter restarts.
self.persisted_state.best_voted = target_number;
metric_set!(self, beefy_best_voted, target_number);
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
.map_err(|e| Error::Backend(e.to_string()))
}
fn process_new_state(&mut self) {
@@ -848,8 +827,7 @@ where
/// Main loop for BEEFY worker.
///
/// Wait for BEEFY runtime pallet to be available, then start the main async loop
/// which is driven by finality notifications and gossiped votes.
/// Run the main async loop which is driven by finality notifications and gossiped votes.
pub(crate) async fn run(
mut self,
mut block_import_justif: Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
@@ -992,14 +970,15 @@ pub(crate) mod tests {
use super::*;
use crate::{
communication::notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream},
keystore::tests::Keyring,
tests::{
create_beefy_keystore, get_beefy_streams, make_beefy_ids, BeefyPeer, BeefyTestNet,
TestApi,
},
BeefyRPCLinks, KnownPeers,
};
use beefy_primitives::{known_payloads, mmr::MmrRootProvider};
use beefy_primitives::{
keyring::Keyring, known_payloads, mmr::MmrRootProvider, Payload, SignedCommitment,
};
use futures::{future::poll_fn, task::Poll};
use parking_lot::Mutex;
use sc_client_api::{Backend as BackendT, HeaderBackend};
@@ -1007,7 +986,7 @@ pub(crate) mod tests {
use sc_network_test::TestNetFactory;
use sp_api::HeaderT;
use sp_blockchain::Backend as BlockchainBackendT;
use sp_runtime::traits::{One, Zero};
use sp_runtime::traits::One;
use substrate_test_runtime_client::{
runtime::{Block, Digest, DigestItem, Header, H256},
Backend,
@@ -1018,7 +997,7 @@ pub(crate) mod tests {
&self.voting_oracle
}
pub fn active_round(&self) -> Option<&Rounds<Payload, B>> {
pub fn active_round(&self) -> Option<&Rounds<B>> {
self.voting_oracle.active_rounds()
}
@@ -1032,7 +1011,7 @@ pub(crate) mod tests {
}
impl<B: super::Block> VoterOracle<B> {
pub fn sessions(&self) -> &VecDeque<Rounds<Payload, B>> {
pub fn sessions(&self) -> &VecDeque<Rounds<B>> {
&self.sessions
}
}
@@ -1090,7 +1069,7 @@ pub(crate) mod tests {
min_block_delta,
)
.unwrap();
let payload_provider = MmrRootProvider::new(api);
let payload_provider = MmrRootProvider::new(api.clone());
let worker_params = crate::worker::WorkerParams {
backend,
payload_provider,