From 3708b156d9be8ed90e379465b6a1fd3b02b1535f Mon Sep 17 00:00:00 2001 From: Adrian Catangiu Date: Thu, 16 Mar 2023 11:02:39 +0200 Subject: [PATCH] sc-consensus-beefy: improve beefy gossip validator (#13606) * sc-consensus-beefy: improve beefy gossip validator Old gossip validator was pretty dumb, being very permissive with incoming votes - only condition it had was to be newer than best finalized. New filter conditions: - voter rounds are initialized (discarding votes until voter is actually active), - only votes for current active set id are accepted, - only votes for rounds in the current voting session are accepted, - only votes for GRANDPA finalized blocks are accepted, - when BEEFY voter reaches mandatory round, only votes for said mandatory round are accepted. New validator uses the VoterOracle to easily implement above conditions and only allow through votes that are immediately useful to the voter. After every GRANDPA or BEEFY finality, the gossip validator filter is updated. * sc-consensus-beefy: remove votes enqueueing Since gossip validator will simply disallow votes for future rounds, and only allow votes that the voter can immediately process, there is no need for the voter to enqueue votes. It will see these "future" votes later in rebroadcasts, when voter will also be able to process them. Only at that point does gossip accept and consume them. * sc-consensus-beefy: refactor persistent state Move best-beefy and best-grandpa into VoterOracle instead of passing them around as params. VoterOracle ultimately needs to know best-beefy and/or best-grandpa for most of its functions. * sc-consensus-beefy: further restrict gossip validator Assuming mandatory done in current session: Instead of allowing votes for any round in the current session, only accept votes for rounds equal or better than best BEEFY finalized. * sc-consensus-beefy: add a couple of comments * sc-consensus-beefy: fix tests involving multiple tasks Finalize blocks one a time in tests where we want gossip to happen in a certain round. Otherwise, some tasks may be left behind in terms of gossip round numbers because once "scheduled" a task will greedily process as much as possible. This change should be in line with the real-world scenario where voters run "in parallel" across nodes, the only points of synchronization being the finality notifications. * sc-consensus-beefy: address review comments --------- Signed-off-by: acatangiu --- .../client/consensus/beefy/src/aux_schema.rs | 6 +- .../beefy/src/communication/gossip.rs | 204 +++---- substrate/client/consensus/beefy/src/lib.rs | 10 + .../client/consensus/beefy/src/metrics.rs | 24 - substrate/client/consensus/beefy/src/tests.rs | 181 ++---- .../client/consensus/beefy/src/worker.rs | 532 ++++++++---------- 6 files changed, 384 insertions(+), 573 deletions(-) diff --git a/substrate/client/consensus/beefy/src/aux_schema.rs b/substrate/client/consensus/beefy/src/aux_schema.rs index 2e99b4cc40..84186140b6 100644 --- a/substrate/client/consensus/beefy/src/aux_schema.rs +++ b/substrate/client/consensus/beefy/src/aux_schema.rs @@ -28,7 +28,7 @@ use sp_runtime::traits::Block as BlockT; const VERSION_KEY: &[u8] = b"beefy_auxschema_version"; const WORKER_STATE_KEY: &[u8] = b"beefy_voter_state"; -const CURRENT_VERSION: u32 = 2; +const CURRENT_VERSION: u32 = 3; pub(crate) fn write_current_version(backend: &BE) -> ClientResult<()> { info!(target: LOG_TARGET, "🥩 write aux schema version {:?}", CURRENT_VERSION); @@ -63,8 +63,8 @@ where match version { None => (), - Some(1) => (), // version 1 is totally obsolete and should be simply ignored - Some(2) => return load_decode::<_, PersistedState>(backend, WORKER_STATE_KEY), + Some(1) | Some(2) => (), // versions 1 & 2 are obsolete and should be simply ignored + Some(3) => return load_decode::<_, PersistedState>(backend, WORKER_STATE_KEY), other => return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))), } diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index 219203ee4e..e49382251e 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -31,11 +31,14 @@ use wasm_timer::Instant; use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore, LOG_TARGET}; use sp_consensus_beefy::{ crypto::{Public, Signature}, - VoteMessage, + ValidatorSetId, VoteMessage, }; // Timeout for rebroadcasting messages. -const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5); +#[cfg(not(test))] +const REBROADCAST_AFTER: Duration = Duration::from_secs(60); +#[cfg(test)] +const REBROADCAST_AFTER: Duration = Duration::from_secs(5); /// Gossip engine messages topic pub(crate) fn topic() -> B::Hash @@ -45,45 +48,51 @@ where <::Hashing as Hash>::hash(b"beefy") } +#[derive(Debug)] +pub(crate) struct GossipVoteFilter { + pub start: NumberFor, + pub end: NumberFor, + pub validator_set_id: ValidatorSetId, +} + /// A type that represents hash of the message. pub type MessageHash = [u8; 8]; -struct KnownVotes { - last_done: Option>, +struct VotesFilter { + filter: Option>, live: BTreeMap, fnv::FnvHashSet>, } -impl KnownVotes { +impl VotesFilter { pub fn new() -> Self { - Self { last_done: None, live: BTreeMap::new() } + Self { filter: None, live: BTreeMap::new() } } - /// Create new round votes set if not already present. - fn insert(&mut self, round: NumberFor) { - self.live.entry(round).or_default(); + /// Update filter to new `start` and `set_id`. + fn update(&mut self, filter: GossipVoteFilter) { + self.live.retain(|&round, _| round >= filter.start && round <= filter.end); + self.filter = Some(filter); } - /// Remove `round` and older from live set, update `last_done` accordingly. - fn conclude(&mut self, round: NumberFor) { - self.live.retain(|&number, _| number > round); - self.last_done = self.last_done.max(Some(round)); - } - - /// Return true if `round` is newer than previously concluded rounds. + /// Return true if `round` is >= than `max(session_start, best_beefy)`, + /// and vote set id matches session set id. /// /// Latest concluded round is still considered alive to allow proper gossiping for it. - fn is_live(&self, round: &NumberFor) -> bool { - Some(*round) >= self.last_done + fn is_live(&self, round: NumberFor, set_id: ValidatorSetId) -> bool { + self.filter + .as_ref() + .map(|f| set_id == f.validator_set_id && round >= f.start && round <= f.end) + .unwrap_or(false) } /// Add new _known_ `hash` to the round's known votes. - fn add_known(&mut self, round: &NumberFor, hash: MessageHash) { - self.live.get_mut(round).map(|known| known.insert(hash)); + fn add_known(&mut self, round: NumberFor, hash: MessageHash) { + self.live.entry(round).or_default().insert(hash); } /// Check if `hash` is already part of round's known votes. - fn is_known(&self, round: &NumberFor, hash: &MessageHash) -> bool { - self.live.get(round).map(|known| known.contains(hash)).unwrap_or(false) + fn is_known(&self, round: NumberFor, hash: &MessageHash) -> bool { + self.live.get(&round).map(|known| known.contains(hash)).unwrap_or(false) } } @@ -100,7 +109,7 @@ where B: Block, { topic: B::Hash, - known_votes: RwLock>, + votes_filter: RwLock>, next_rebroadcast: Mutex, known_peers: Arc>>, } @@ -112,26 +121,18 @@ where pub fn new(known_peers: Arc>>) -> GossipValidator { GossipValidator { topic: topic::(), - known_votes: RwLock::new(KnownVotes::new()), + votes_filter: RwLock::new(VotesFilter::new()), next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), known_peers, } } - /// Note a voting round. + /// Update gossip validator filter. /// - /// Noting round will track gossiped votes for `round`. - pub(crate) fn note_round(&self, round: NumberFor) { - debug!(target: LOG_TARGET, "🥩 About to note gossip round #{}", round); - self.known_votes.write().insert(round); - } - - /// Conclude a voting round. - /// - /// This can be called once round is complete so we stop gossiping for it. - pub(crate) fn conclude_round(&self, round: NumberFor) { - debug!(target: LOG_TARGET, "🥩 About to drop gossip round #{}", round); - self.known_votes.write().conclude(round); + /// Only votes for `set_id` and rounds `start <= round <= end` will be accepted. + pub(crate) fn update_filter(&self, filter: GossipVoteFilter) { + debug!(target: LOG_TARGET, "🥩 New gossip filter {:?}", filter); + self.votes_filter.write().update(filter); } } @@ -152,25 +153,26 @@ where if let Ok(msg) = VoteMessage::, Public, Signature>::decode(&mut data) { let msg_hash = twox_64(data); let round = msg.commitment.block_number; + let set_id = msg.commitment.validator_set_id; + self.known_peers.lock().note_vote_for(*sender, round); // Verify general usefulness of the message. // We are going to discard old votes right away (without verification) // Also we keep track of already received votes to avoid verifying duplicates. { - let known_votes = self.known_votes.read(); + let filter = self.votes_filter.read(); - if !known_votes.is_live(&round) { + if !filter.is_live(round, set_id) { return ValidationResult::Discard } - if known_votes.is_known(&round, &msg_hash) { + if filter.is_known(round, &msg_hash) { return ValidationResult::ProcessAndKeep(self.topic) } } if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) { - self.known_votes.write().add_known(&round, msg_hash); - self.known_peers.lock().note_vote_for(*sender, round); + self.votes_filter.write().add_known(round, msg_hash); return ValidationResult::ProcessAndKeep(self.topic) } else { // TODO: report peer @@ -185,7 +187,7 @@ where } fn message_expired<'a>(&'a self) -> Box bool + 'a> { - let known_votes = self.known_votes.read(); + let filter = self.votes_filter.read(); Box::new(move |_topic, mut data| { let msg = match VoteMessage::, Public, Signature>::decode(&mut data) { Ok(vote) => vote, @@ -193,7 +195,8 @@ where }; let round = msg.commitment.block_number; - let expired = !known_votes.is_live(&round); + let set_id = msg.commitment.validator_set_id; + let expired = !filter.is_live(round, set_id); trace!(target: LOG_TARGET, "🥩 Message for round #{} expired: {}", round, expired); @@ -208,6 +211,7 @@ where let now = Instant::now(); let mut next_rebroadcast = self.next_rebroadcast.lock(); if now >= *next_rebroadcast { + trace!(target: LOG_TARGET, "🥩 Gossip rebroadcast"); *next_rebroadcast = now + REBROADCAST_AFTER; true } else { @@ -215,7 +219,7 @@ where } }; - let known_votes = self.known_votes.read(); + let filter = self.votes_filter.read(); Box::new(move |_who, intent, _topic, mut data| { if let MessageIntent::PeriodicRebroadcast = intent { return do_rebroadcast @@ -227,7 +231,8 @@ where }; let round = msg.commitment.block_number; - let allowed = known_votes.is_live(&round); + let set_id = msg.commitment.validator_set_id; + let allowed = filter.is_live(round, set_id); trace!(target: LOG_TARGET, "🥩 Message for round #{} allowed: {}", round, allowed); @@ -252,81 +257,35 @@ mod tests { #[test] fn known_votes_insert_remove() { - let mut kv = KnownVotes::::new(); + let mut kv = VotesFilter::::new(); + let msg_hash = twox_64(b"data"); - kv.insert(1); - kv.insert(1); - kv.insert(2); + kv.add_known(1, msg_hash); + kv.add_known(1, msg_hash); + kv.add_known(2, msg_hash); assert_eq!(kv.live.len(), 2); - let mut kv = KnownVotes::::new(); - kv.insert(1); - kv.insert(2); - kv.insert(3); + kv.add_known(3, msg_hash); + assert!(kv.is_known(3, &msg_hash)); + assert!(!kv.is_known(3, &twox_64(b"other"))); + assert!(!kv.is_known(4, &msg_hash)); + assert_eq!(kv.live.len(), 3); - assert!(kv.last_done.is_none()); - kv.conclude(2); + assert!(kv.filter.is_none()); + assert!(!kv.is_live(1, 1)); + + kv.update(GossipVoteFilter { start: 3, end: 10, validator_set_id: 1 }); assert_eq!(kv.live.len(), 1); - assert!(!kv.live.contains_key(&2)); - assert_eq!(kv.last_done, Some(2)); + assert!(kv.live.contains_key(&3)); + assert!(!kv.is_live(2, 1)); + assert!(kv.is_live(3, 1)); + assert!(kv.is_live(4, 1)); + assert!(!kv.is_live(4, 2)); - kv.conclude(1); - assert_eq!(kv.last_done, Some(2)); - - kv.conclude(3); - assert_eq!(kv.last_done, Some(3)); + kv.update(GossipVoteFilter { start: 5, end: 10, validator_set_id: 2 }); assert!(kv.live.is_empty()); } - #[test] - fn note_and_drop_round_works() { - let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); - - gv.note_round(1u64); - - assert!(gv.known_votes.read().is_live(&1u64)); - - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); - - assert_eq!(gv.known_votes.read().live.len(), 4); - - gv.conclude_round(7u64); - - let votes = gv.known_votes.read(); - - // rounds 1 and 3 are outdated, don't gossip anymore - assert!(!votes.is_live(&1u64)); - assert!(!votes.is_live(&3u64)); - // latest concluded round is still gossiped - assert!(votes.is_live(&7u64)); - // round 10 is alive and in-progress - assert!(votes.is_live(&10u64)); - } - - #[test] - fn note_same_round_twice() { - let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); - - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); - - assert_eq!(gv.known_votes.read().live.len(), 3); - - // note round #7 again -> should not change anything - gv.note_round(7u64); - - let votes = gv.known_votes.read(); - - assert_eq!(votes.live.len(), 3); - - assert!(votes.is_live(&3u64)); - assert!(votes.is_live(&7u64)); - assert!(votes.is_live(&10u64)); - } - struct TestContext; impl ValidatorContext for TestContext { fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) { @@ -368,21 +327,18 @@ mod tests { #[test] fn should_avoid_verifying_signatures_twice() { let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 }); let sender = sc_network::PeerId::random(); let mut context = TestContext; let vote = dummy_vote(3); - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); - // first time the cache should be populated let res = gv.validate(&mut context, &sender, &vote.encode()); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); assert_eq!( - gv.known_votes.read().live.get(&vote.commitment.block_number).map(|x| x.len()), + gv.votes_filter.read().live.get(&vote.commitment.block_number).map(|x| x.len()), Some(1) ); @@ -392,9 +348,11 @@ mod tests { assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); // next we should quickly reject if the round is not live - gv.conclude_round(7_u64); + gv.update_filter(GossipVoteFilter { start: 7, end: 10, validator_set_id: 0 }); - assert!(!gv.known_votes.read().is_live(&vote.commitment.block_number)); + let number = vote.commitment.block_number; + let set_id = vote.commitment.validator_set_id; + assert!(!gv.votes_filter.read().is_live(number, set_id)); let res = gv.validate(&mut context, &sender, &vote.encode()); @@ -404,14 +362,13 @@ mod tests { #[test] fn messages_allowed_and_expired() { let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 }); let sender = sc_network::PeerId::random(); let topic = Default::default(); let intent = MessageIntent::Broadcast; - // note round 2 and 3, then conclude 2 - gv.note_round(2u64); - gv.note_round(3u64); - gv.conclude_round(2u64); + // conclude 2 + gv.update_filter(GossipVoteFilter { start: 2, end: 10, validator_set_id: 0 }); let mut allowed = gv.message_allowed(); let mut expired = gv.message_expired(); @@ -447,6 +404,7 @@ mod tests { #[test] fn messages_rebroadcast() { let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 }); let sender = sc_network::PeerId::random(); let topic = Default::default(); diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index eb56a97de1..d632f58332 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -247,6 +247,8 @@ pub async fn start_beefy_gadget( } = network_params; let known_peers = Arc::new(Mutex::new(KnownPeers::new())); + // Default votes filter is to discard everything. + // Validator is updated later with correct starting round and set id. let gossip_validator = Arc::new(communication::gossip::GossipValidator::new(known_peers.clone())); let mut gossip_engine = sc_network_gossip::GossipEngine::new( @@ -284,6 +286,14 @@ pub async fn start_beefy_gadget( return }, }; + // Update the gossip validator with the right starting round and set id. + if let Err(e) = persisted_state + .current_gossip_filter() + .map(|f| gossip_validator.update_filter(f)) + { + error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); + return + } let worker_params = worker::WorkerParams { backend, diff --git a/substrate/client/consensus/beefy/src/metrics.rs b/substrate/client/consensus/beefy/src/metrics.rs index 0ce48e60eb..6653763fc6 100644 --- a/substrate/client/consensus/beefy/src/metrics.rs +++ b/substrate/client/consensus/beefy/src/metrics.rs @@ -45,10 +45,6 @@ pub struct VoterMetrics { pub beefy_lagging_sessions: Counter, /// Number of times no Authority public key found in store pub beefy_no_authority_found_in_store: Counter, - /// Number of currently buffered votes - pub beefy_buffered_votes: Gauge, - /// Number of votes dropped due to full buffers - pub beefy_buffered_votes_dropped: Counter, /// Number of good votes successfully handled pub beefy_good_votes_processed: Counter, /// Number of equivocation votes received @@ -65,8 +61,6 @@ pub struct VoterMetrics { pub beefy_imported_justifications: Counter, /// Number of justifications dropped due to full buffers pub beefy_buffered_justifications_dropped: Counter, - /// Trying to set Best Beefy block to old block - pub beefy_best_block_set_last_failure: Gauge, } impl PrometheusRegister for VoterMetrics { @@ -110,17 +104,6 @@ impl PrometheusRegister for VoterMetrics { )?, registry, )?, - beefy_buffered_votes: register( - Gauge::new("substrate_beefy_buffered_votes", "Number of currently buffered votes")?, - registry, - )?, - beefy_buffered_votes_dropped: register( - Counter::new( - "substrate_beefy_buffered_votes_dropped", - "Number of votes dropped due to full buffers", - )?, - registry, - )?, beefy_good_votes_processed: register( Counter::new( "substrate_beefy_successful_handled_votes", @@ -174,13 +157,6 @@ impl PrometheusRegister for VoterMetrics { )?, registry, )?, - beefy_best_block_set_last_failure: register( - Gauge::new( - "substrate_beefy_best_block_to_old_block", - "Trying to set Best Beefy block to old block", - )?, - registry, - )?, }) } } diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index 27dc8d8191..629f144246 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -526,17 +526,15 @@ async fn finalize_block_and_wait_for_beefy( net: &Arc>, // peer index and key peers: impl Iterator + Clone, - finalize_targets: &[H256], + finalize_target: &H256, expected_beefy: &[u64], ) { let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); - for block in finalize_targets { - peers.clone().for_each(|(index, _)| { - let client = net.lock().peer(index).client().as_client(); - client.finalize_block(*block, None).unwrap(); - }) - } + peers.clone().for_each(|(index, _)| { + let client = net.lock().peer(index).client().as_client(); + client.finalize_block(*finalize_target, None).unwrap(); + }); if expected_beefy.is_empty() { // run for quarter second then verify no new best beefy block available @@ -574,31 +572,32 @@ async fn beefy_finalizing_blocks() { let peers = peers.into_iter().enumerate(); // finalize block #5 -> BEEFY should finalize #1 (mandatory) and #5 from diff-power-of-two rule. - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[1], hashes[5]], &[1, 5]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[1], &[1]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[5], &[5]).await; // GRANDPA finalize #10 -> BEEFY finalize #10 (mandatory) - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[10]], &[10]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[10], &[10]).await; // GRANDPA finalize #18 -> BEEFY finalize #14, then #18 (diff-power-of-two rule) - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[18]], &[14, 18]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[18], &[14, 18]).await; // GRANDPA finalize #20 -> BEEFY finalize #20 (mandatory) - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[20]], &[20]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[20], &[20]).await; // GRANDPA finalize #21 -> BEEFY finalize nothing (yet) because min delta is 4 - finalize_block_and_wait_for_beefy(&net, peers, &[hashes[21]], &[]).await; + finalize_block_and_wait_for_beefy(&net, peers, &hashes[21], &[]).await; } #[tokio::test] async fn lagging_validators() { sp_tracing::try_init_simple(); - let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob]; + let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie]; let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 0).unwrap(); let session_len = 30; let min_block_delta = 1; - let mut net = BeefyTestNet::new(2); + let mut net = BeefyTestNet::new(3); let api = Arc::new(TestApi::with_validator_set(&validator_set)); let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); tokio::spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); @@ -611,55 +610,47 @@ async fn lagging_validators() { let peers = peers.into_iter().enumerate(); // finalize block #15 -> BEEFY should finalize #1 (mandatory) and #9, #13, #14, #15 from // diff-power-of-two rule. - finalize_block_and_wait_for_beefy( - &net, - peers.clone(), - &[hashes[1], hashes[15]], - &[1, 9, 13, 14, 15], - ) - .await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[1], &[1]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[15], &[9, 13, 14, 15]).await; - // Alice finalizes #25, Bob lags behind + // Alice and Bob finalize #25, Charlie lags behind let finalize = hashes[25]; let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); // verify nothing gets finalized by BEEFY - let timeout = Some(Duration::from_millis(250)); + let timeout = Some(Duration::from_millis(100)); streams_empty_after_timeout(best_blocks, &net, timeout).await; streams_empty_after_timeout(versioned_finality_proof, &net, None).await; - // Bob catches up and also finalizes #25 + // Charlie catches up and also finalizes #25 let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); - net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); - // expected beefy finalizes block #17 from diff-power-of-two + net.lock().peer(2).client().as_client().finalize_block(finalize, None).unwrap(); + // expected beefy finalizes blocks 23, 24, 25 from diff-power-of-two wait_for_best_beefy_blocks(best_blocks, &net, &[23, 24, 25]).await; wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &[23, 24, 25]).await; // Both finalize #30 (mandatory session) and #32 -> BEEFY finalize #30 (mandatory), #31, #32 - finalize_block_and_wait_for_beefy( - &net, - peers.clone(), - &[hashes[30], hashes[32]], - &[30, 31, 32], - ) - .await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[30], &[30]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[32], &[31, 32]).await; // Verify that session-boundary votes get buffered by client and only processed once // session-boundary block is GRANDPA-finalized (this guarantees authenticity for the new session // validator set). - // Alice finalizes session-boundary mandatory block #60, Bob lags behind + // Alice and Bob finalize session-boundary mandatory block #60, Charlie lags behind let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); let finalize = hashes[60]; net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); // verify nothing gets finalized by BEEFY - let timeout = Some(Duration::from_millis(250)); + let timeout = Some(Duration::from_millis(100)); streams_empty_after_timeout(best_blocks, &net, timeout).await; streams_empty_after_timeout(versioned_finality_proof, &net, None).await; - // Bob catches up and also finalizes #60 (and should have buffered Alice's vote on #60) + // Charlie catches up and also finalizes #60 (and should have buffered Alice's vote on #60) let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers); - net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(2).client().as_client().finalize_block(finalize, None).unwrap(); // verify beefy skips intermediary votes, and successfully finalizes mandatory block #60 wait_for_best_beefy_blocks(best_blocks, &net, &[60]).await; wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &[60]).await; @@ -696,7 +687,8 @@ async fn correct_beefy_payload() { let net = Arc::new(Mutex::new(net)); let peers = peers.into_iter().enumerate(); // with 3 good voters and 1 bad one, consensus should happen and best blocks produced. - finalize_block_and_wait_for_beefy(&net, peers, &[hashes[1], hashes[10]], &[1, 9]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[1], &[1]).await; + finalize_block_and_wait_for_beefy(&net, peers, &hashes[10], &[9]).await; let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), [(0, BeefyKeyring::Alice)].into_iter()); @@ -708,7 +700,7 @@ async fn correct_beefy_payload() { net.lock().peer(3).client().as_client().finalize_block(hashof11, None).unwrap(); // verify consensus is _not_ reached - let timeout = Some(Duration::from_millis(250)); + let timeout = Some(Duration::from_millis(100)); streams_empty_after_timeout(best_blocks, &net, timeout).await; streams_empty_after_timeout(versioned_finality_proof, &net, None).await; @@ -874,39 +866,6 @@ async fn beefy_importing_justifications() { } } -#[tokio::test] -async fn voter_initialization() { - sp_tracing::try_init_simple(); - // Regression test for voter initialization where finality notifications were dropped - // after waiting for BEEFY pallet availability. - - let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob]; - let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 0).unwrap(); - let session_len = 5; - // Should vote on all mandatory blocks no matter the `min_block_delta`. - let min_block_delta = 10; - - let mut net = BeefyTestNet::new(2); - let api = Arc::new(TestApi::with_validator_set(&validator_set)); - let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); - tokio::spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); - - // push 26 blocks - let hashes = net.generate_blocks_and_sync(26, session_len, &validator_set, false).await; - let net = Arc::new(Mutex::new(net)); - - // Finalize multiple blocks at once to get a burst of finality notifications right from start. - // Need to finalize at least one block in each session, choose randomly. - // Expect voters to pick up all of them and BEEFY-finalize the mandatory blocks of each session. - finalize_block_and_wait_for_beefy( - &net, - peers.into_iter().enumerate(), - &[hashes[1], hashes[6], hashes[10], hashes[17], hashes[24], hashes[26]], - &[1, 5, 10, 15, 20, 25], - ) - .await; -} - #[tokio::test] async fn on_demand_beefy_justification_sync() { sp_tracing::try_init_simple(); @@ -940,13 +899,11 @@ async fn on_demand_beefy_justification_sync() { let net = Arc::new(Mutex::new(net)); // With 3 active voters and one inactive, consensus should happen and blocks BEEFY-finalized. // Need to finalize at least one block in each session, choose randomly. - finalize_block_and_wait_for_beefy( - &net, - fast_peers.clone(), - &[hashes[1], hashes[6], hashes[10], hashes[17], hashes[23]], - &[1, 5, 10, 15, 20], - ) - .await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[1], &[1]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[6], &[5]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[10], &[10]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[17], &[15]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[24], &[20]).await; // Spawn Dave, they are now way behind voting and can only catch up through on-demand justif // sync. @@ -971,7 +928,8 @@ async fn on_demand_beefy_justification_sync() { // freshly spun up Dave now needs to listen for gossip to figure out the state of their peers. // Have the other peers do some gossip so Dave finds out about their progress. - finalize_block_and_wait_for_beefy(&net, fast_peers, &[hashes[25], hashes[29]], &[25, 29]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[25], &[25]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers, &hashes[29], &[29]).await; // Kick Dave's async loop by finalizing another block. client.finalize_block(hashes[2], None).unwrap(); @@ -982,13 +940,14 @@ async fn on_demand_beefy_justification_sync() { // Give all tasks some cpu cycles to burn through their events queues, run_for(Duration::from_millis(100), &net).await; // then verify Dave catches up through on-demand justification requests. - finalize_block_and_wait_for_beefy( - &net, - [(dave_index, BeefyKeyring::Dave)].into_iter(), - &[hashes[6], hashes[10], hashes[17], hashes[24], hashes[26]], - &[5, 10, 15, 20, 25], - ) - .await; + let (dave_best_blocks, _) = + get_beefy_streams(&mut net.lock(), [(dave_index, BeefyKeyring::Dave)].into_iter()); + client.finalize_block(hashes[6], None).unwrap(); + client.finalize_block(hashes[10], None).unwrap(); + client.finalize_block(hashes[17], None).unwrap(); + client.finalize_block(hashes[24], None).unwrap(); + client.finalize_block(hashes[26], None).unwrap(); + wait_for_best_beefy_blocks(dave_best_blocks, &net, &[5, 10, 15, 20, 25]).await; } #[tokio::test] @@ -1022,13 +981,8 @@ async fn should_initialize_voter_at_genesis() { // verify next vote target is mandatory block 1 assert_eq!(persisted_state.best_beefy_block(), 0); - assert_eq!(persisted_state.best_grandpa_block(), 13); - assert_eq!( - persisted_state - .voting_oracle() - .voting_target(persisted_state.best_beefy_block(), 13), - Some(1) - ); + assert_eq!(persisted_state.best_grandpa_number(), 13); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(1)); // verify state also saved to db assert!(verify_persisted_version(&*backend)); @@ -1070,13 +1024,8 @@ async fn should_initialize_voter_at_custom_genesis() { // verify next vote target is mandatory block 7 assert_eq!(persisted_state.best_beefy_block(), 0); - assert_eq!(persisted_state.best_grandpa_block(), 8); - assert_eq!( - persisted_state - .voting_oracle() - .voting_target(persisted_state.best_beefy_block(), 13), - Some(custom_pallet_genesis) - ); + assert_eq!(persisted_state.best_grandpa_number(), 8); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(custom_pallet_genesis)); // verify state also saved to db assert!(verify_persisted_version(&*backend)); @@ -1128,14 +1077,9 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() { // verify block 10 is correctly marked as finalized assert_eq!(persisted_state.best_beefy_block(), 10); - assert_eq!(persisted_state.best_grandpa_block(), 13); + assert_eq!(persisted_state.best_grandpa_number(), 13); // verify next vote target is diff-power-of-two block 12 - assert_eq!( - persisted_state - .voting_oracle() - .voting_target(persisted_state.best_beefy_block(), 13), - Some(12) - ); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(12)); // verify state also saved to db assert!(verify_persisted_version(&*backend)); @@ -1186,13 +1130,8 @@ async fn should_initialize_voter_at_latest_finalized() { // verify next vote target is 13 assert_eq!(persisted_state.best_beefy_block(), 12); - assert_eq!(persisted_state.best_grandpa_block(), 13); - assert_eq!( - persisted_state - .voting_oracle() - .voting_target(persisted_state.best_beefy_block(), 13), - Some(13) - ); + assert_eq!(persisted_state.best_grandpa_number(), 13); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(13)); // verify state also saved to db assert!(verify_persisted_version(&*backend)); @@ -1225,13 +1164,13 @@ async fn beefy_finalizing_after_pallet_genesis() { // Minimum BEEFY block delta is 1. // GRANDPA finalize blocks leading up to BEEFY pallet genesis -> BEEFY should finalize nothing. - finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[1..14], &[]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[14], &[]).await; // GRANDPA finalize block #16 -> BEEFY should finalize #15 (genesis mandatory) and #16. - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[16]], &[15, 16]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[16], &[15, 16]).await; // GRANDPA finalize #21 -> BEEFY finalize #20 (mandatory) and #21 - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[21]], &[20, 21]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[21], &[20, 21]).await; } #[tokio::test] @@ -1249,14 +1188,14 @@ async fn beefy_reports_equivocations() { let mut api_alice = TestApi::with_validator_set(&validator_set); api_alice.allow_equivocations(); let api_alice = Arc::new(api_alice); - let alice = (0, &peers[0], api_alice.clone()); + let alice = (0, &BeefyKeyring::Alice, api_alice.clone()); tokio::spawn(initialize_beefy(&mut net, vec![alice], min_block_delta)); // Bob votes on bad MMR roots, equivocations are allowed/expected. let mut api_bob = TestApi::new(1, &validator_set, BAD_MMR_ROOT); api_bob.allow_equivocations(); let api_bob = Arc::new(api_bob); - let bob = (1, &peers[1], api_bob.clone()); + let bob = (1, &BeefyKeyring::Bob, api_bob.clone()); tokio::spawn(initialize_beefy(&mut net, vec![bob], min_block_delta)); // We spawn another node voting with Bob key, on alternate bad MMR roots (equivocating). @@ -1276,7 +1215,7 @@ async fn beefy_reports_equivocations() { let peers = peers.into_iter().enumerate(); // finalize block #1 -> BEEFY should not finalize anything (each node votes on different MMR). - finalize_block_and_wait_for_beefy(&net, peers, &[hashes[1]], &[]).await; + finalize_block_and_wait_for_beefy(&net, peers, &hashes[1], &[]).await; // Verify neither Bob or Bob_Prime report themselves as equivocating. assert!(api_bob.reported_equivocations.as_ref().unwrap().lock().is_empty()); diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index 0abb38d022..0260d7693c 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -18,13 +18,13 @@ use crate::{ communication::{ - gossip::{topic, GossipValidator}, + gossip::{topic, GossipValidator, GossipVoteFilter}, request_response::outgoing_requests_engine::OnDemandJustificationsEngine, }, error::Error, justification::BeefyVersionedFinalityProof, keystore::{BeefyKeystore, BeefySignatureHasher}, - metric_get, metric_inc, metric_set, + metric_inc, metric_set, metrics::VoterMetrics, round::{Rounds, VoteImportResult}, BeefyVoterLinks, LOG_TARGET, @@ -42,24 +42,19 @@ use sp_consensus_beefy::{ check_equivocation_proof, crypto::{AuthorityId, Signature}, BeefyApi, Commitment, ConsensusLog, EquivocationProof, PayloadProvider, ValidatorSet, - VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, + ValidatorSetId, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, }; use sp_runtime::{ generic::OpaqueDigestItemId, - traits::{Block, ConstU32, Header, NumberFor, Zero}, - BoundedVec, SaturatedConversion, + traits::{Block, Header, NumberFor, Zero}, + SaturatedConversion, }; use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, fmt::Debug, - marker::PhantomData, sync::Arc, }; -/// Bound for the number of buffered future voting rounds. -const MAX_BUFFERED_VOTE_ROUNDS: usize = 600; -/// Bound for the number of buffered votes per round number. -const MAX_BUFFERED_VOTES_PER_ROUND: u32 = 1000; /// Bound for the number of pending justifications - use 2400 - the max number /// of justifications possible in a single session. const MAX_BUFFERED_JUSTIFICATIONS: usize = 2400; @@ -87,23 +82,34 @@ pub(crate) struct VoterOracle { sessions: VecDeque>, /// Min delta in block numbers between two blocks, BEEFY should vote on. min_block_delta: u32, + /// Best block we received a GRANDPA finality for. + best_grandpa_block_header: ::Header, + /// Best block a BEEFY voting round has been concluded for. + best_beefy_block: NumberFor, } impl VoterOracle { /// Verify provided `sessions` satisfies requirements, then build `VoterOracle`. - pub fn checked_new(sessions: VecDeque>, min_block_delta: u32) -> Option { + pub fn checked_new( + sessions: VecDeque>, + min_block_delta: u32, + grandpa_header: ::Header, + best_beefy: NumberFor, + ) -> Option { let mut prev_start = Zero::zero(); let mut prev_validator_id = None; // verifies the let mut validate = || -> bool { - if sessions.is_empty() { + let best_grandpa = *grandpa_header.number(); + if sessions.is_empty() || best_beefy > best_grandpa { return false } for (idx, session) in sessions.iter().enumerate() { + let start = session.session_start(); if session.validators().is_empty() { return false } - if session.session_start() <= prev_start { + if start > best_grandpa || start <= prev_start { return false } #[cfg(not(test))] @@ -125,23 +131,35 @@ impl VoterOracle { sessions, // Always target at least one block better than current best beefy. min_block_delta: min_block_delta.max(1), + best_grandpa_block_header: grandpa_header, + best_beefy_block: best_beefy, }) } else { - error!(target: LOG_TARGET, "🥩 Invalid sessions queue: {:?}.", sessions); + error!( + target: LOG_TARGET, + "🥩 Invalid sessions queue: {:?}; best-beefy {:?} best-grandpa-header {:?}.", + sessions, + best_beefy, + grandpa_header + ); None } } // Return reference to rounds pertaining to first session in the queue. // Voting will always happen at the head of the queue. - fn active_rounds(&self) -> Option<&Rounds> { - self.sessions.front() + fn active_rounds(&self) -> Result<&Rounds, Error> { + self.sessions.front().ok_or(Error::UninitSession) } // Return mutable reference to rounds pertaining to first session in the queue. // Voting will always happen at the head of the queue. - fn active_rounds_mut(&mut self) -> Option<&mut Rounds> { - self.sessions.front_mut() + fn active_rounds_mut(&mut self) -> Result<&mut Rounds, Error> { + self.sessions.front_mut().ok_or(Error::UninitSession) + } + + fn current_validator_set_id(&self) -> Result { + self.active_rounds().map(|r| r.validator_set_id()) } // Prune the sessions queue to keep the Oracle in one of the expected three states. @@ -164,7 +182,7 @@ impl VoterOracle { /// Finalize a particular block. pub fn finalize(&mut self, block: NumberFor) -> Result<(), Error> { // Conclude voting round for this block. - self.active_rounds_mut().ok_or(Error::UninitSession)?.conclude(block); + self.active_rounds_mut()?.conclude(block); // Prune any now "finalized" sessions from queue. self.try_prune(); Ok(()) @@ -182,30 +200,26 @@ impl VoterOracle { } /// Return `(A, B)` tuple representing inclusive [A, B] interval of votes to accept. - pub fn accepted_interval( - &self, - best_grandpa: NumberFor, - ) -> Result<(NumberFor, NumberFor), Error> { + pub fn accepted_interval(&self) -> Result<(NumberFor, NumberFor), Error> { let rounds = self.sessions.front().ok_or(Error::UninitSession)?; if rounds.mandatory_done() { // There's only one session active and its mandatory is done. - // Accept any GRANDPA finalized vote. - Ok((rounds.session_start(), best_grandpa.into())) + // Accept any vote for a GRANDPA finalized block in a better round. + Ok(( + rounds.session_start().max(self.best_beefy_block), + (*self.best_grandpa_block_header.number()).into(), + )) } else { - // There's at least one session with mandatory not done. + // Current session has mandatory not done. // Only accept votes for the mandatory block in the front of queue. Ok((rounds.session_start(), rounds.session_start())) } } /// Utility function to quickly decide what to do for each round. - pub fn triage_round( - &self, - round: NumberFor, - best_grandpa: NumberFor, - ) -> Result { - let (start, end) = self.accepted_interval(best_grandpa)?; + pub fn triage_round(&self, round: NumberFor) -> Result { + let (start, end) = self.accepted_interval()?; if start <= round && round <= end { Ok(RoundAction::Process) } else if round > end { @@ -217,17 +231,15 @@ impl VoterOracle { /// Return `Some(number)` if we should be voting on block `number`, /// return `None` if there is no block we should vote on. - pub fn voting_target( - &self, - best_beefy: NumberFor, - best_grandpa: NumberFor, - ) -> Option> { + pub fn voting_target(&self) -> Option> { let rounds = if let Some(r) = self.sessions.front() { r } else { debug!(target: LOG_TARGET, "🥩 No voting round started"); return None }; + let best_grandpa = *self.best_grandpa_block_header.number(); + let best_beefy = self.best_beefy_block; // `target` is guaranteed > `best_beefy` since `min_block_delta` is at least `1`. let target = @@ -259,10 +271,6 @@ pub(crate) struct WorkerParams { #[derive(Debug, Decode, Encode, PartialEq)] pub(crate) struct PersistedState { - /// Best block we received a GRANDPA finality for. - best_grandpa_block_header: ::Header, - /// Best block a BEEFY voting round has been concluded for. - best_beefy_block: NumberFor, /// Best block we voted on. best_voted: NumberFor, /// Chooses which incoming votes to accept and which votes to generate. @@ -277,20 +285,26 @@ impl PersistedState { sessions: VecDeque>, min_block_delta: u32, ) -> Option { - VoterOracle::checked_new(sessions, min_block_delta).map(|voting_oracle| PersistedState { - best_grandpa_block_header: grandpa_header, - best_beefy_block: best_beefy, - best_voted: Zero::zero(), - voting_oracle, - }) + VoterOracle::checked_new(sessions, min_block_delta, grandpa_header, best_beefy) + .map(|voting_oracle| PersistedState { best_voted: Zero::zero(), voting_oracle }) } pub(crate) fn set_min_block_delta(&mut self, min_block_delta: u32) { self.voting_oracle.min_block_delta = min_block_delta.max(1); } + pub(crate) fn set_best_beefy(&mut self, best_beefy: NumberFor) { + self.voting_oracle.best_beefy_block = best_beefy; + } + pub(crate) fn set_best_grandpa(&mut self, best_grandpa: ::Header) { - self.best_grandpa_block_header = best_grandpa; + self.voting_oracle.best_grandpa_block_header = best_grandpa; + } + + pub(crate) fn current_gossip_filter(&self) -> Result, Error> { + let (start, end) = self.voting_oracle.accepted_interval()?; + let validator_set_id = self.voting_oracle.current_validator_set_id()?; + Ok(GossipVoteFilter { start, end, validator_set_id }) } } @@ -315,14 +329,6 @@ pub(crate) struct BeefyWorker { // voter state /// BEEFY client metrics. metrics: Option, - /// Buffer holding votes for future processing. - pending_votes: BTreeMap< - NumberFor, - BoundedVec< - VoteMessage, AuthorityId, Signature>, - ConstU32, - >, - >, /// Buffer holding justifications for future processing. pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, /// Persisted voter state. @@ -370,25 +376,20 @@ where on_demand_justifications, links, metrics, - pending_votes: BTreeMap::new(), pending_justifications: BTreeMap::new(), persisted_state, } } fn best_grandpa_block(&self) -> NumberFor { - *self.persisted_state.best_grandpa_block_header.number() - } - - fn best_beefy_block(&self) -> NumberFor { - self.persisted_state.best_beefy_block + *self.persisted_state.voting_oracle.best_grandpa_block_header.number() } fn voting_oracle(&self) -> &VoterOracle { &self.persisted_state.voting_oracle } - fn active_rounds(&mut self) -> Option<&Rounds> { + fn active_rounds(&mut self) -> Result<&Rounds, Error> { self.persisted_state.voting_oracle.active_rounds() } @@ -429,7 +430,7 @@ where debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set); // BEEFY should finalize a mandatory block during each session. - if let Some(active_session) = self.active_rounds() { + if let Ok(active_session) = self.active_rounds() { if !active_session.mandatory_done() { debug!( target: LOG_TARGET, @@ -460,12 +461,17 @@ where } fn handle_finality_notification(&mut self, notification: &FinalityNotification) { - debug!(target: LOG_TARGET, "🥩 Finality notification: {:?}", notification); + debug!( + target: LOG_TARGET, + "🥩 Finality notification: header {:?} tree_route {:?}", + notification.header, + notification.tree_route, + ); let header = ¬ification.header; if *header.number() > self.best_grandpa_block() { // update best GRANDPA finalized block we have seen - self.persisted_state.best_grandpa_block_header = header.clone(); + self.persisted_state.set_best_grandpa(header.clone()); // Check all (newly) finalized blocks for new session(s). let backend = self.backend.clone(); @@ -484,38 +490,28 @@ where self.init_session_at(new_validator_set, *header.number()); } } + + // Update gossip validator votes filter. + if let Err(e) = self + .persisted_state + .current_gossip_filter() + .map(|filter| self.gossip_validator.update_filter(filter)) + { + error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e); + } } } - /// Based on [VoterOracle] this vote is either processed here or enqueued for later. + /// Based on [VoterOracle] this vote is either processed here or discarded. fn triage_incoming_vote( &mut self, vote: VoteMessage, AuthorityId, Signature>, ) -> Result<(), Error> { let block_num = vote.commitment.block_number; - let best_grandpa = self.best_grandpa_block(); - self.gossip_validator.note_round(block_num); - match self.voting_oracle().triage_round(block_num, best_grandpa)? { + match self.voting_oracle().triage_round(block_num)? { RoundAction::Process => self.handle_vote(vote)?, - RoundAction::Enqueue => { - debug!(target: LOG_TARGET, "🥩 Buffer vote for round: {:?}.", block_num); - if self.pending_votes.len() < MAX_BUFFERED_VOTE_ROUNDS { - let votes_vec = self.pending_votes.entry(block_num).or_default(); - if votes_vec.try_push(vote).is_ok() { - metric_inc!(self, beefy_buffered_votes); - } else { - warn!( - target: LOG_TARGET, - "🥩 Buffer vote dropped for round: {:?}", block_num - ); - metric_inc!(self, beefy_buffered_votes_dropped); - } - } else { - warn!(target: LOG_TARGET, "🥩 Buffer vote dropped for round: {:?}.", block_num); - metric_inc!(self, beefy_buffered_votes_dropped); - } - }, RoundAction::Drop => metric_inc!(self, beefy_stale_votes), + RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote), }; Ok(()) } @@ -531,8 +527,7 @@ where VersionedFinalityProof::V1(ref sc) => sc, }; let block_num = signed_commitment.commitment.block_number; - let best_grandpa = self.best_grandpa_block(); - match self.voting_oracle().triage_round(block_num, best_grandpa)? { + match self.voting_oracle().triage_round(block_num)? { RoundAction::Process => { debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num); metric_inc!(self, beefy_imported_justifications); @@ -560,11 +555,7 @@ where &mut self, vote: VoteMessage, AuthorityId, Signature>, ) -> Result<(), Error> { - let rounds = self - .persisted_state - .voting_oracle - .active_rounds_mut() - .ok_or(Error::UninitSession)?; + let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?; let block_number = vote.commitment.block_number; match rounds.add_vote(vote) { @@ -608,7 +599,7 @@ where /// 3. Persist voter state, /// 4. Send best block hash and `finality_proof` to RPC worker. /// - /// Expects `finality proof` to be valid. + /// Expects `finality proof` to be valid and for a block > current-best-beefy. fn finalize(&mut self, finality_proof: BeefyVersionedFinalityProof) -> Result<(), Error> { let block_num = match finality_proof { VersionedFinalityProof::V1(ref sc) => sc.commitment.block_number, @@ -616,74 +607,62 @@ where // Finalize inner round and update voting_oracle state. self.persisted_state.voting_oracle.finalize(block_num)?; - self.gossip_validator.conclude_round(block_num); - if block_num > self.best_beefy_block() { - // Set new best BEEFY block number. - self.persisted_state.best_beefy_block = block_num; - crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) - .map_err(|e| Error::Backend(e.to_string()))?; + // Set new best BEEFY block number. + self.persisted_state.set_best_beefy(block_num); + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) + .map_err(|e| Error::Backend(e.to_string()))?; - metric_set!(self, beefy_best_block, block_num); + metric_set!(self, beefy_best_block, block_num); - self.on_demand_justifications.cancel_requests_older_than(block_num); + self.on_demand_justifications.cancel_requests_older_than(block_num); - if let Err(e) = self - .backend - .blockchain() - .expect_block_hash_from_id(&BlockId::Number(block_num)) - .and_then(|hash| { - self.links - .to_rpc_best_block_sender - .notify(|| Ok::<_, ()>(hash)) - .expect("forwards closure result; the closure always returns Ok; qed."); + if let Err(e) = self + .backend + .blockchain() + .expect_block_hash_from_id(&BlockId::Number(block_num)) + .and_then(|hash| { + self.links + .to_rpc_best_block_sender + .notify(|| Ok::<_, ()>(hash)) + .expect("forwards closure result; the closure always returns Ok; qed."); - self.backend - .append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode())) - }) { - error!( - target: LOG_TARGET, - "🥩 Error {:?} on appending justification: {:?}", e, finality_proof - ); - } - - self.links - .to_rpc_justif_sender - .notify(|| Ok::<_, ()>(finality_proof)) - .expect("forwards closure result; the closure always returns Ok; qed."); - } else { - debug!(target: LOG_TARGET, "🥩 Can't set best beefy to old: {}", block_num); - metric_set!(self, beefy_best_block_set_last_failure, block_num); + self.backend + .append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode())) + }) { + error!( + target: LOG_TARGET, + "🥩 Error {:?} on appending justification: {:?}", e, finality_proof + ); } + + self.links + .to_rpc_justif_sender + .notify(|| Ok::<_, ()>(finality_proof)) + .expect("forwards closure result; the closure always returns Ok; qed."); + + // Update gossip validator votes filter. + self.persisted_state + .current_gossip_filter() + .map(|filter| self.gossip_validator.update_filter(filter))?; Ok(()) } - /// Handle previously buffered justifications and votes that now land in the voting interval. - fn try_pending_justif_and_votes(&mut self) -> Result<(), Error> { - let best_grandpa = self.best_grandpa_block(); - let _ph = PhantomData::::default(); - - fn to_process_for( - pending: &mut BTreeMap, T>, - (start, end): (NumberFor, NumberFor), - _: PhantomData, - ) -> BTreeMap, T> { - // These are still pending. - let still_pending = pending.split_off(&end.saturating_add(1u32.into())); - // These can be processed. - let to_handle = pending.split_off(&start); - // The rest can be dropped. - *pending = still_pending; - // Return ones to process. - to_handle - } + /// Handle previously buffered justifications, that now land in the voting interval. + fn try_pending_justififactions(&mut self) -> Result<(), Error> { // Interval of blocks for which we can process justifications and votes right now. - let mut interval = self.voting_oracle().accepted_interval(best_grandpa)?; - + let (start, end) = self.voting_oracle().accepted_interval()?; // Process pending justifications. if !self.pending_justifications.is_empty() { - let justifs_to_handle = to_process_for(&mut self.pending_justifications, interval, _ph); - for (num, justification) in justifs_to_handle.into_iter() { + // These are still pending. + let still_pending = + self.pending_justifications.split_off(&end.saturating_add(1u32.into())); + // These can be processed. + let justifs_to_process = self.pending_justifications.split_off(&start); + // The rest can be dropped. + self.pending_justifications = still_pending; + + for (num, justification) in justifs_to_process.into_iter() { debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num); metric_inc!(self, beefy_imported_justifications); if let Err(err) = self.finalize(justification) { @@ -691,27 +670,6 @@ where } } metric_set!(self, beefy_buffered_justifications, self.pending_justifications.len()); - // Possibly new interval after processing justifications. - interval = self.voting_oracle().accepted_interval(best_grandpa)?; - } - - // Process pending votes. - if !self.pending_votes.is_empty() { - let mut processed = 0u64; - let votes_to_handle = to_process_for(&mut self.pending_votes, interval, _ph); - for (num, votes) in votes_to_handle.into_iter() { - debug!(target: LOG_TARGET, "🥩 Handle buffered votes for: {:?}.", num); - processed += votes.len() as u64; - for v in votes.into_iter() { - if let Err(err) = self.handle_vote(v) { - error!(target: LOG_TARGET, "🥩 Error handling buffered vote: {}", err); - }; - } - } - if let Some(previous) = metric_get!(self, beefy_buffered_votes) { - previous.sub(processed); - metric_set!(self, beefy_buffered_votes, previous.get()); - } } Ok(()) } @@ -719,10 +677,7 @@ where /// Decide if should vote, then vote.. or don't.. fn try_to_vote(&mut self) -> Result<(), Error> { // Vote if there's now a new vote target. - if let Some(target) = self - .voting_oracle() - .voting_target(self.best_beefy_block(), self.best_grandpa_block()) - { + if let Some(target) = self.voting_oracle().voting_target() { metric_set!(self, beefy_should_vote_on, target); if target > self.persisted_state.best_voted { self.do_vote(target)?; @@ -740,7 +695,7 @@ where // Most of the time we get here, `target` is actually `best_grandpa`, // avoid getting header from backend in that case. let target_header = if target_number == self.best_grandpa_block() { - self.persisted_state.best_grandpa_block_header.clone() + self.persisted_state.voting_oracle.best_grandpa_block_header.clone() } else { let hash = self .backend @@ -771,11 +726,7 @@ where return Ok(()) }; - let rounds = self - .persisted_state - .voting_oracle - .active_rounds_mut() - .ok_or(Error::UninitSession)?; + let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?; let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id()); let authority_id = if let Some(id) = self.key_store.authority_id(validators) { @@ -830,7 +781,7 @@ where fn process_new_state(&mut self) { // Handle pending justifications and/or votes for now GRANDPA finalized blocks. - if let Err(err) = self.try_pending_justif_and_votes() { + if let Err(err) = self.try_pending_justififactions() { debug!(target: LOG_TARGET, "🥩 {}", err); } @@ -867,12 +818,12 @@ where self.gossip_engine .messages_for(topic::()) .filter_map(|notification| async move { - trace!(target: LOG_TARGET, "🥩 Got vote message: {:?}", notification); - - VoteMessage::, AuthorityId, Signature>::decode( + let vote = VoteMessage::, AuthorityId, Signature>::decode( &mut ¬ification.message[..], ) - .ok() + .ok(); + trace!(target: LOG_TARGET, "🥩 Got vote message: {:?}", vote); + vote }) .fuse(), ); @@ -946,8 +897,7 @@ where &self, proof: EquivocationProof, AuthorityId, Signature>, ) -> Result<(), Error> { - let rounds = - self.persisted_state.voting_oracle.active_rounds().ok_or(Error::UninitSession)?; + let rounds = self.persisted_state.voting_oracle.active_rounds()?; let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id()); let offender_id = proof.offender_id().clone(); @@ -1083,16 +1033,16 @@ pub(crate) mod tests { &self.voting_oracle } - pub fn active_round(&self) -> Option<&Rounds> { + pub fn active_round(&self) -> Result<&Rounds, Error> { self.voting_oracle.active_rounds() } pub fn best_beefy_block(&self) -> NumberFor { - self.best_beefy_block + self.voting_oracle.best_beefy_block } - pub fn best_grandpa_block(&self) -> NumberFor { - *self.best_grandpa_block_header.number() + pub fn best_grandpa_number(&self) -> NumberFor { + *self.voting_oracle.best_grandpa_block_header.number() } } @@ -1103,7 +1053,7 @@ pub(crate) mod tests { } fn create_beefy_worker( - peer: &BeefyPeer, + peer: &mut BeefyPeer, key: &Keyring, min_block_delta: u32, genesis_validator_set: ValidatorSet, @@ -1153,12 +1103,15 @@ pub(crate) mod tests { known_peers, None, ); - let genesis_header = backend + // Push 1 block - will start first session. + let hashes = peer.push_blocks(1, false); + backend.finalize_block(hashes[0], None).unwrap(); + let first_header = backend .blockchain() - .expect_header(backend.blockchain().info().genesis_hash) + .expect_header(backend.blockchain().info().best_hash) .unwrap(); let persisted_state = PersistedState::checked_new( - genesis_header, + first_header, Zero::zero(), vec![Rounds::new(One::one(), genesis_validator_set)].into(), min_block_delta, @@ -1275,10 +1228,30 @@ pub(crate) mod tests { #[test] fn should_vote_target() { - let mut oracle = VoterOracle:: { min_block_delta: 1, sessions: VecDeque::new() }; + let header = Header::new( + 1u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + let mut oracle = VoterOracle:: { + best_beefy_block: 0, + best_grandpa_block_header: header, + min_block_delta: 1, + sessions: VecDeque::new(), + }; + let voting_target_with = |oracle: &mut VoterOracle, + best_beefy: NumberFor, + best_grandpa: NumberFor| + -> Option> { + oracle.best_beefy_block = best_beefy; + oracle.best_grandpa_block_header.number = best_grandpa; + oracle.voting_target() + }; // rounds not initialized -> should vote: `None` - assert_eq!(oracle.voting_target(0, 1), None); + assert_eq!(voting_target_with(&mut oracle, 0, 1), None); let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); @@ -1287,29 +1260,29 @@ pub(crate) mod tests { // under min delta oracle.min_block_delta = 4; - assert_eq!(oracle.voting_target(1, 1), None); - assert_eq!(oracle.voting_target(2, 5), None); + assert_eq!(voting_target_with(&mut oracle, 1, 1), None); + assert_eq!(voting_target_with(&mut oracle, 2, 5), None); // vote on min delta - assert_eq!(oracle.voting_target(4, 9), Some(8)); + assert_eq!(voting_target_with(&mut oracle, 4, 9), Some(8)); oracle.min_block_delta = 8; - assert_eq!(oracle.voting_target(10, 18), Some(18)); + assert_eq!(voting_target_with(&mut oracle, 10, 18), Some(18)); // vote on power of two oracle.min_block_delta = 1; - assert_eq!(oracle.voting_target(1000, 1008), Some(1004)); - assert_eq!(oracle.voting_target(1000, 1016), Some(1008)); + assert_eq!(voting_target_with(&mut oracle, 1000, 1008), Some(1004)); + assert_eq!(voting_target_with(&mut oracle, 1000, 1016), Some(1008)); // nothing new to vote on - assert_eq!(oracle.voting_target(1000, 1000), None); + assert_eq!(voting_target_with(&mut oracle, 1000, 1000), None); // vote on mandatory oracle.sessions.clear(); oracle.add_session(Rounds::new(1000, validator_set.clone())); - assert_eq!(oracle.voting_target(0, 1008), Some(1000)); + assert_eq!(voting_target_with(&mut oracle, 0, 1008), Some(1000)); oracle.sessions.clear(); oracle.add_session(Rounds::new(1001, validator_set.clone())); - assert_eq!(oracle.voting_target(1000, 1008), Some(1001)); + assert_eq!(voting_target_with(&mut oracle, 1000, 1008), Some(1001)); } #[test] @@ -1317,16 +1290,34 @@ pub(crate) mod tests { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut oracle = VoterOracle:: { min_block_delta: 1, sessions: VecDeque::new() }; + let header = Header::new( + 1u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + let mut oracle = VoterOracle:: { + best_beefy_block: 0, + best_grandpa_block_header: header, + min_block_delta: 1, + sessions: VecDeque::new(), + }; + let accepted_interval_with = |oracle: &mut VoterOracle, + best_grandpa: NumberFor| + -> Result<(NumberFor, NumberFor), Error> { + oracle.best_grandpa_block_header.number = best_grandpa; + oracle.accepted_interval() + }; // rounds not initialized -> should accept votes: `None` - assert!(oracle.accepted_interval(1).is_err()); + assert!(accepted_interval_with(&mut oracle, 1).is_err()); let session_one = 1; oracle.add_session(Rounds::new(session_one, validator_set.clone())); // mandatory not done, only accept mandatory for i in 0..15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_one, session_one))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_one, session_one))); } // add more sessions, nothing changes @@ -1336,7 +1327,7 @@ pub(crate) mod tests { oracle.add_session(Rounds::new(session_three, validator_set.clone())); // mandatory not done, should accept mandatory for session_one for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_one, session_one))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_one, session_one))); } // simulate finish mandatory for session one, prune oracle @@ -1344,7 +1335,7 @@ pub(crate) mod tests { oracle.try_prune(); // session_one pruned, should accept mandatory for session_two for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_two, session_two))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_two, session_two))); } // simulate finish mandatory for session two, prune oracle @@ -1352,26 +1343,29 @@ pub(crate) mod tests { oracle.try_prune(); // session_two pruned, should accept mandatory for session_three for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_three, session_three))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, session_three))); } // simulate finish mandatory for session three oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true); // verify all other blocks in this session are now open to voting for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_three, i))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, i))); } // pruning does nothing in this case oracle.try_prune(); for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_three, i))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, i))); } // adding new session automatically prunes "finalized" previous session let session_four = 31; oracle.add_session(Rounds::new(session_four, validator_set.clone())); assert_eq!(oracle.sessions.front().unwrap().session_start(), session_four); - assert_eq!(oracle.accepted_interval(session_four + 10), Ok((session_four, session_four))); + assert_eq!( + accepted_interval_with(&mut oracle, session_four + 10), + Ok((session_four, session_four)) + ); } #[test] @@ -1405,7 +1399,7 @@ pub(crate) mod tests { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let mut net = BeefyTestNet::new(1); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); + let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); // keystore doesn't contain other keys than validators' assert_eq!(worker.verify_validator_set(&1, &validator_set), Ok(())); @@ -1429,7 +1423,7 @@ pub(crate) mod tests { let validator_set = ValidatorSet::new(make_beefy_ids(&keys), 0).unwrap(); let mut net = BeefyTestNet::new(1); let backend = net.peer(0).client().as_backend(); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); + let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); // remove default session, will manually add custom one. worker.persisted_state.voting_oracle.sessions.clear(); @@ -1449,7 +1443,7 @@ pub(crate) mod tests { }; // no 'best beefy block' or finality proofs - assert_eq!(worker.best_beefy_block(), 0); + assert_eq!(worker.persisted_state.best_beefy_block(), 0); poll_fn(move |cx| { assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); assert_eq!(finality_proof.poll_next_unpin(cx), Poll::Pending); @@ -1457,6 +1451,7 @@ pub(crate) mod tests { }) .await; + let client = net.peer(0).client().as_client(); // unknown hash for block #1 let (mut best_block_streams, mut finality_proofs) = get_beefy_streams(&mut net, keys.clone()); @@ -1471,10 +1466,16 @@ pub(crate) mod tests { // try to finalize block #1 worker.finalize(justif.clone()).unwrap(); // verify block finalized - assert_eq!(worker.best_beefy_block(), 1); + assert_eq!(worker.persisted_state.best_beefy_block(), 1); poll_fn(move |cx| { - // unknown hash -> nothing streamed - assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + // expect Some(hash-of-block-1) + match best_block_stream.poll_next_unpin(cx) { + Poll::Ready(Some(hash)) => { + let block_num = client.number(hash).unwrap(); + assert_eq!(block_num, Some(1)); + }, + v => panic!("unexpected value: {:?}", v), + } // commitment streamed match finality_proof.poll_next_unpin(cx) { // expect justification @@ -1488,11 +1489,9 @@ pub(crate) mod tests { // generate 2 blocks, try again expect success let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); - let hashes = net.peer(0).push_blocks(2, false); + let hashes = net.peer(0).push_blocks(1, false); // finalize 1 and 2 without justifications (hashes does not contain genesis) - let hashof1 = hashes[0]; - let hashof2 = hashes[1]; - backend.finalize_block(hashof1, None).unwrap(); + let hashof2 = hashes[0]; backend.finalize_block(hashof2, None).unwrap(); let justif = create_finality_proof(2); @@ -1504,7 +1503,7 @@ pub(crate) mod tests { // new session starting at #2 is in front assert_eq!(worker.active_rounds().unwrap().session_start(), 2); // verify block finalized - assert_eq!(worker.best_beefy_block(), 2); + assert_eq!(worker.persisted_state.best_beefy_block(), 2); poll_fn(move |cx| { match best_block_stream.poll_next_unpin(cx) { // expect Some(hash-of-block-2) @@ -1528,7 +1527,7 @@ pub(crate) mod tests { let keys = &[Keyring::Alice, Keyring::Bob]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let mut net = BeefyTestNet::new(1); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); + let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); let worker_rounds = worker.active_rounds().unwrap(); assert_eq!(worker_rounds.session_start(), 1); @@ -1554,77 +1553,6 @@ pub(crate) mod tests { assert_eq!(rounds.validator_set_id(), new_validator_set.id()); } - #[tokio::test] - async fn should_triage_votes_and_process_later() { - let keys = &[Keyring::Alice, Keyring::Bob]; - let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut net = BeefyTestNet::new(1); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); - // remove default session, will manually add custom one. - worker.persisted_state.voting_oracle.sessions.clear(); - - fn new_vote( - block_number: NumberFor, - ) -> VoteMessage, AuthorityId, Signature> { - let commitment = Commitment { - payload: Payload::from_single_entry(*b"BF", vec![]), - block_number, - validator_set_id: 0, - }; - VoteMessage { - commitment, - id: Keyring::Alice.public(), - signature: Keyring::Alice.sign(b"I am committed"), - } - } - - // best grandpa is 20 - let best_grandpa_header = Header::new( - 20u32.into(), - Default::default(), - Default::default(), - Default::default(), - Digest::default(), - ); - - worker - .persisted_state - .voting_oracle - .add_session(Rounds::new(10, validator_set.clone())); - worker.persisted_state.best_grandpa_block_header = best_grandpa_header; - - // triage votes for blocks 10..13 - worker.triage_incoming_vote(new_vote(10)).unwrap(); - worker.triage_incoming_vote(new_vote(11)).unwrap(); - worker.triage_incoming_vote(new_vote(12)).unwrap(); - // triage votes for blocks 20..23 - worker.triage_incoming_vote(new_vote(20)).unwrap(); - worker.triage_incoming_vote(new_vote(21)).unwrap(); - worker.triage_incoming_vote(new_vote(22)).unwrap(); - - // vote for 10 should have been handled, while the rest buffered for later processing - let mut votes = worker.pending_votes.values(); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 11); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 12); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 20); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22); - assert!(votes.next().is_none()); - - // simulate mandatory done, and retry buffered votes - worker - .persisted_state - .voting_oracle - .active_rounds_mut() - .unwrap() - .test_set_mandatory_done(true); - worker.try_pending_justif_and_votes().unwrap(); - // all blocks <= grandpa finalized should have been handled, rest still buffered - let mut votes = worker.pending_votes.values(); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22); - } - #[tokio::test] async fn should_not_report_bad_old_or_self_equivocations() { let block_num = 1; @@ -1637,7 +1565,7 @@ pub(crate) mod tests { let api_alice = Arc::new(api_alice); let mut net = BeefyTestNet::new(1); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); + let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); worker.runtime = api_alice.clone(); // let there be a block with num = 1: