diff --git a/substrate/client/beefy/src/round.rs b/substrate/client/beefy/src/round.rs index a5a15bac5f..fecb9557df 100644 --- a/substrate/client/beefy/src/round.rs +++ b/substrate/client/beefy/src/round.rs @@ -73,7 +73,6 @@ pub(crate) struct Rounds { best_done: Option>, session_start: NumberFor, validator_set: ValidatorSet, - prev_validator_set: ValidatorSet, } impl Rounds @@ -81,18 +80,8 @@ where P: Ord + Hash + Clone, B: Block, { - pub(crate) fn new( - session_start: NumberFor, - validator_set: ValidatorSet, - prev_validator_set: ValidatorSet, - ) -> Self { - Rounds { - rounds: BTreeMap::new(), - best_done: None, - session_start, - validator_set, - prev_validator_set, - } + pub(crate) fn new(session_start: NumberFor, validator_set: ValidatorSet) -> Self { + Rounds { rounds: BTreeMap::new(), best_done: None, session_start, validator_set } } } @@ -101,24 +90,12 @@ where P: Ord + Hash + Clone, B: Block, { - pub(crate) fn validator_set_id_for(&self, block_number: NumberFor) -> ValidatorSetId { - if block_number > self.session_start { - self.validator_set.id() - } else { - self.prev_validator_set.id() - } + pub(crate) fn validator_set_id(&self) -> ValidatorSetId { + self.validator_set.id() } - pub(crate) fn validators_for(&self, block_number: NumberFor) -> &[Public] { - if block_number > self.session_start { - self.validator_set.validators() - } else { - self.prev_validator_set.validators() - } - } - - pub(crate) fn validator_set(&self) -> &ValidatorSet { - &self.validator_set + pub(crate) fn validators(&self) -> &[Public] { + self.validator_set.validators() } pub(crate) fn session_start(&self) -> &NumberFor { @@ -143,7 +120,7 @@ where round.1 ); false - } else if !self.validator_set.validators().iter().any(|id| vote.0 == *id) { + } else if !self.validators().iter().any(|id| vote.0 == *id) { debug!( target: "beefy", "🥩 received vote {:?} from validator that is not in the validator set, ignoring", @@ -170,12 +147,11 @@ where // remove this and older (now stale) rounds let signatures = self.rounds.remove(round)?.votes; self.rounds.retain(|&(_, number), _| number > round.1); - self.best_done = self.best_done.clone().max(Some(round.1.clone())); + self.best_done = self.best_done.max(Some(round.1)); debug!(target: "beefy", "🥩 Concluded round #{}", round.1); Some( - self.validator_set - .validators() + self.validators() .iter() .map(|authority_id| signatures.get(authority_id).cloned()) .collect(), @@ -247,13 +223,13 @@ mod tests { .unwrap(); let session_start = 1u64.into(); - let rounds = Rounds::::new(session_start, validators.clone(), validators); + let rounds = Rounds::::new(session_start, validators); - assert_eq!(42, rounds.validator_set_id_for(session_start)); + assert_eq!(42, rounds.validator_set_id()); assert_eq!(1, *rounds.session_start()); assert_eq!( &vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], - rounds.validators_for(session_start) + rounds.validators() ); } @@ -274,7 +250,7 @@ mod tests { let round = (H256::from_low_u64_le(1), 1); let session_start = 1u64.into(); - let mut rounds = Rounds::::new(session_start, validators.clone(), validators); + let mut rounds = Rounds::::new(session_start, validators); // no self vote yet, should self vote assert!(rounds.should_self_vote(&round)); @@ -347,7 +323,7 @@ mod tests { .unwrap(); let session_start = 1u64.into(); - let mut rounds = Rounds::::new(session_start, validators.clone(), validators); + let mut rounds = Rounds::::new(session_start, validators); // round 1 assert!(rounds.add_vote( diff --git a/substrate/client/beefy/src/tests.rs b/substrate/client/beefy/src/tests.rs index e568daba8e..5f5fbd2f1f 100644 --- a/substrate/client/beefy/src/tests.rs +++ b/substrate/client/beefy/src/tests.rs @@ -469,8 +469,8 @@ fn finalize_block_and_wait_for_beefy( } if expected_beefy.is_empty() { - // run for 1 second then verify no new best beefy block available - let timeout = Some(Duration::from_millis(500)); + // run for quarter second then verify no new best beefy block available + let timeout = Some(Duration::from_millis(250)); streams_empty_after_timeout(best_blocks, &net, runtime, timeout); streams_empty_after_timeout(signed_commitments, &net, runtime, None); } else { @@ -535,8 +535,8 @@ fn lagging_validators() { let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); runtime.spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); - // push 42 blocks including `AuthorityChange` digests every 30 blocks. - net.generate_blocks(42, session_len, &validator_set, true); + // push 62 blocks including `AuthorityChange` digests every 30 blocks. + net.generate_blocks(62, session_len, &validator_set, true); net.block_until_sync(); let net = Arc::new(Mutex::new(net)); @@ -550,7 +550,7 @@ fn lagging_validators() { let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers); net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); // verify nothing gets finalized by BEEFY - let timeout = Some(Duration::from_millis(500)); + let timeout = Some(Duration::from_millis(250)); streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout); streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None); @@ -563,6 +563,26 @@ fn lagging_validators() { // Both finalize #30 (mandatory session) and #32 -> BEEFY finalize #30 (mandatory), #31, #32 finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[30, 32], &[30, 31, 32]); + + // Verify that session-boundary votes get buffered by client and only processed once + // session-boundary block is GRANDPA-finalized (this guarantees authenticity for the new session + // validator set). + + // Alice finalizes session-boundary mandatory block #60, Bob lags behind + let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers); + let finalize = BlockId::number(60); + net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); + // verify nothing gets finalized by BEEFY + let timeout = Some(Duration::from_millis(250)); + streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout); + streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None); + + // Bob catches up and also finalizes #60 (and should have buffered Alice's vote on #60) + let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers); + net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); + // verify beefy skips intermediary votes, and successfully finalizes mandatory block #40 + wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[60]); + wait_for_beefy_signed_commitments(signed_commitments, &net, &mut runtime, &[60]); } #[test] @@ -624,7 +644,7 @@ fn correct_beefy_payload() { .unwrap(); // verify consensus is _not_ reached - let timeout = Some(Duration::from_millis(500)); + let timeout = Some(Duration::from_millis(250)); streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout); streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None); diff --git a/substrate/client/beefy/src/worker.rs b/substrate/client/beefy/src/worker.rs index 8ab18c58f9..ae466a71ab 100644 --- a/substrate/client/beefy/src/worker.rs +++ b/substrate/client/beefy/src/worker.rs @@ -16,7 +16,13 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{collections::BTreeSet, fmt::Debug, marker::PhantomData, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt::Debug, + marker::PhantomData, + sync::Arc, + time::Duration, +}; use codec::{Codec, Decode, Encode}; use futures::{future, FutureExt, StreamExt}; @@ -27,7 +33,7 @@ use sc_client_api::{Backend, FinalityNotification, FinalityNotifications}; use sc_network_gossip::GossipEngine; use sp_api::{BlockId, ProvideRuntimeApi}; -use sp_arithmetic::traits::AtLeast32Bit; +use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; use sp_consensus::SyncOracle; use sp_mmr_primitives::MmrApi; use sp_runtime::{ @@ -80,6 +86,8 @@ pub(crate) struct BeefyWorker { min_block_delta: u32, metrics: Option, rounds: Option>, + /// Buffer holding votes for blocks that the client hasn't seen finality for. + pending_votes: BTreeMap, Vec, AuthorityId, Signature>>>, finality_notifications: FinalityNotifications, /// Best block we received a GRANDPA notification for best_grandpa_block_header: ::Header, @@ -141,6 +149,7 @@ where min_block_delta: min_block_delta.max(1), metrics, rounds: None, + pending_votes: BTreeMap::new(), finality_notifications: client.finality_notification_stream(), best_grandpa_block_header: last_finalized_header, best_beefy_block: None, @@ -238,7 +247,11 @@ where } /// Handle session changes by starting new voting round for mandatory blocks. - fn init_session_at(&mut self, active: ValidatorSet, session_start: NumberFor) { + fn init_session_at( + &mut self, + active: ValidatorSet, + new_session_start: NumberFor, + ) { debug!(target: "beefy", "🥩 New active validator set: {:?}", active); metric_set!(self, beefy_validator_set_id, active.id()); // BEEFY should produce a signed commitment for each session @@ -246,23 +259,22 @@ where active.id() != GENESIS_AUTHORITY_SET_ID && self.last_signed_id != 0 { + debug!( + target: "beefy", "🥩 Detected skipped session: active-id {:?}, last-signed-id {:?}", + active.id(), + self.last_signed_id, + ); metric_inc!(self, beefy_skipped_sessions); } if log_enabled!(target: "beefy", log::Level::Debug) { // verify the new validator set - only do it if we're also logging the warning - let _ = self.verify_validator_set(&session_start, &active); + let _ = self.verify_validator_set(&new_session_start, &active); } - let prev_validator_set = if let Some(r) = &self.rounds { - r.validator_set().clone() - } else { - // no previous rounds present use new validator set instead (genesis case) - active.clone() - }; let id = active.id(); - self.rounds = Some(Rounds::new(session_start, active, prev_validator_set)); - info!(target: "beefy", "🥩 New Rounds for validator set id: {:?} with session_start {:?}", id, session_start); + self.rounds = Some(Rounds::new(new_session_start, active)); + info!(target: "beefy", "🥩 New Rounds for validator set id: {:?} with session_start {:?}", id, new_session_start); } fn handle_finality_notification(&mut self, notification: &FinalityNotification) { @@ -287,12 +299,36 @@ where self.init_session_at(new_validator_set, *header.number()); } + // Handle any pending votes for now finalized blocks. + self.check_pending_votes(); + // Vote if there's now a new vote target. if let Some(target_number) = self.current_vote_target() { self.do_vote(target_number); } } + // Handles all buffered votes for now finalized blocks. + fn check_pending_votes(&mut self) { + let not_finalized = self.best_grandpa_block_header.number().saturating_add(1u32.into()); + let still_pending = self.pending_votes.split_off(¬_finalized); + let votes_to_handle = std::mem::replace(&mut self.pending_votes, still_pending); + for (num, votes) in votes_to_handle.into_iter() { + if Some(num) > self.best_beefy_block { + debug!(target: "beefy", "🥩 Handling buffered votes for now GRANDPA finalized block: {:?}.", num); + for v in votes.into_iter() { + self.handle_vote( + (v.commitment.payload, v.commitment.block_number), + (v.id, v.signature), + false, + ); + } + } else { + debug!(target: "beefy", "🥩 Dropping outdated buffered votes for now BEEFY finalized block: {:?}.", num); + } + } + } + fn handle_vote( &mut self, round: (Payload, NumberFor), @@ -313,7 +349,7 @@ where self.gossip_validator.conclude_round(round.1); // id is stored for skipped session metric calculation - self.last_signed_id = rounds.validator_set_id_for(round.1); + self.last_signed_id = rounds.validator_set_id(); let block_num = round.1; let commitment = Commitment { @@ -390,7 +426,7 @@ where debug!(target: "beefy", "🥩 Don't double vote for block number: {:?}", target_number); return } - (rounds.validators_for(target_number), rounds.validator_set_id_for(target_number)) + (rounds.validators(), rounds.validator_set_id()) } else { debug!(target: "beefy", "🥩 Missing validator set - can't vote for: {:?}", target_hash); return @@ -506,11 +542,23 @@ where }, vote = votes.next().fuse() => { if let Some(vote) = vote { - self.handle_vote( - (vote.commitment.payload, vote.commitment.block_number), - (vote.id, vote.signature), - false - ); + let block_num = vote.commitment.block_number; + if block_num > *self.best_grandpa_block_header.number() { + // Only handle votes for blocks we _know_ have been finalized. + // Buffer vote to be handled later. + debug!( + target: "beefy", + "🥩 Buffering vote for not (yet) finalized block: {:?}.", + block_num + ); + self.pending_votes.entry(block_num).or_default().push(vote); + } else { + self.handle_vote( + (vote.commitment.payload, vote.commitment.block_number), + (vote.id, vote.signature), + false + ); + } } else { return; } @@ -854,8 +902,7 @@ pub(crate) mod tests { worker.best_grandpa_block_header = grandpa_header; worker.best_beefy_block = best_beefy; worker.min_block_delta = min_delta; - worker.rounds = - Some(Rounds::new(session_start, validator_set.clone(), validator_set.clone())); + worker.rounds = Some(Rounds::new(session_start, validator_set.clone())); }; // under min delta @@ -970,11 +1017,10 @@ pub(crate) mod tests { worker.init_session_at(validator_set.clone(), 1); let worker_rounds = worker.rounds.as_ref().unwrap(); - assert_eq!(worker_rounds.validator_set(), &validator_set); assert_eq!(worker_rounds.session_start(), &1); // in genesis case both current and prev validator sets are the same - assert_eq!(worker_rounds.validator_set_id_for(1), validator_set.id()); - assert_eq!(worker_rounds.validator_set_id_for(2), validator_set.id()); + assert_eq!(worker_rounds.validators(), validator_set.validators()); + assert_eq!(worker_rounds.validator_set_id(), validator_set.id()); // new validator set let keys = &[Keyring::Bob]; @@ -984,11 +1030,8 @@ pub(crate) mod tests { worker.init_session_at(new_validator_set.clone(), 11); let worker_rounds = worker.rounds.as_ref().unwrap(); - assert_eq!(worker_rounds.validator_set(), &new_validator_set); assert_eq!(worker_rounds.session_start(), &11); - // mandatory block gets prev set, further blocks get new set - assert_eq!(worker_rounds.validator_set_id_for(11), validator_set.id()); - assert_eq!(worker_rounds.validator_set_id_for(12), new_validator_set.id()); - assert_eq!(worker_rounds.validator_set_id_for(13), new_validator_set.id()); + assert_eq!(worker_rounds.validators(), new_validator_set.validators()); + assert_eq!(worker_rounds.validator_set_id(), new_validator_set.id()); } }