diff --git a/substrate/client/consensus/beefy/src/aux_schema.rs b/substrate/client/consensus/beefy/src/aux_schema.rs index 2e99b4cc40..84186140b6 100644 --- a/substrate/client/consensus/beefy/src/aux_schema.rs +++ b/substrate/client/consensus/beefy/src/aux_schema.rs @@ -28,7 +28,7 @@ use sp_runtime::traits::Block as BlockT; const VERSION_KEY: &[u8] = b"beefy_auxschema_version"; const WORKER_STATE_KEY: &[u8] = b"beefy_voter_state"; -const CURRENT_VERSION: u32 = 2; +const CURRENT_VERSION: u32 = 3; pub(crate) fn write_current_version(backend: &BE) -> ClientResult<()> { info!(target: LOG_TARGET, "🥩 write aux schema version {:?}", CURRENT_VERSION); @@ -63,8 +63,8 @@ where match version { None => (), - Some(1) => (), // version 1 is totally obsolete and should be simply ignored - Some(2) => return load_decode::<_, PersistedState>(backend, WORKER_STATE_KEY), + Some(1) | Some(2) => (), // versions 1 & 2 are obsolete and should be simply ignored + Some(3) => return load_decode::<_, PersistedState>(backend, WORKER_STATE_KEY), other => return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))), } diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index 219203ee4e..e49382251e 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -31,11 +31,14 @@ use wasm_timer::Instant; use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore, LOG_TARGET}; use sp_consensus_beefy::{ crypto::{Public, Signature}, - VoteMessage, + ValidatorSetId, VoteMessage, }; // Timeout for rebroadcasting messages. -const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5); +#[cfg(not(test))] +const REBROADCAST_AFTER: Duration = Duration::from_secs(60); +#[cfg(test)] +const REBROADCAST_AFTER: Duration = Duration::from_secs(5); /// Gossip engine messages topic pub(crate) fn topic() -> B::Hash @@ -45,45 +48,51 @@ where <::Hashing as Hash>::hash(b"beefy") } +#[derive(Debug)] +pub(crate) struct GossipVoteFilter { + pub start: NumberFor, + pub end: NumberFor, + pub validator_set_id: ValidatorSetId, +} + /// A type that represents hash of the message. pub type MessageHash = [u8; 8]; -struct KnownVotes { - last_done: Option>, +struct VotesFilter { + filter: Option>, live: BTreeMap, fnv::FnvHashSet>, } -impl KnownVotes { +impl VotesFilter { pub fn new() -> Self { - Self { last_done: None, live: BTreeMap::new() } + Self { filter: None, live: BTreeMap::new() } } - /// Create new round votes set if not already present. - fn insert(&mut self, round: NumberFor) { - self.live.entry(round).or_default(); + /// Update filter to new `start` and `set_id`. + fn update(&mut self, filter: GossipVoteFilter) { + self.live.retain(|&round, _| round >= filter.start && round <= filter.end); + self.filter = Some(filter); } - /// Remove `round` and older from live set, update `last_done` accordingly. - fn conclude(&mut self, round: NumberFor) { - self.live.retain(|&number, _| number > round); - self.last_done = self.last_done.max(Some(round)); - } - - /// Return true if `round` is newer than previously concluded rounds. + /// Return true if `round` is >= than `max(session_start, best_beefy)`, + /// and vote set id matches session set id. /// /// Latest concluded round is still considered alive to allow proper gossiping for it. - fn is_live(&self, round: &NumberFor) -> bool { - Some(*round) >= self.last_done + fn is_live(&self, round: NumberFor, set_id: ValidatorSetId) -> bool { + self.filter + .as_ref() + .map(|f| set_id == f.validator_set_id && round >= f.start && round <= f.end) + .unwrap_or(false) } /// Add new _known_ `hash` to the round's known votes. - fn add_known(&mut self, round: &NumberFor, hash: MessageHash) { - self.live.get_mut(round).map(|known| known.insert(hash)); + fn add_known(&mut self, round: NumberFor, hash: MessageHash) { + self.live.entry(round).or_default().insert(hash); } /// Check if `hash` is already part of round's known votes. - fn is_known(&self, round: &NumberFor, hash: &MessageHash) -> bool { - self.live.get(round).map(|known| known.contains(hash)).unwrap_or(false) + fn is_known(&self, round: NumberFor, hash: &MessageHash) -> bool { + self.live.get(&round).map(|known| known.contains(hash)).unwrap_or(false) } } @@ -100,7 +109,7 @@ where B: Block, { topic: B::Hash, - known_votes: RwLock>, + votes_filter: RwLock>, next_rebroadcast: Mutex, known_peers: Arc>>, } @@ -112,26 +121,18 @@ where pub fn new(known_peers: Arc>>) -> GossipValidator { GossipValidator { topic: topic::(), - known_votes: RwLock::new(KnownVotes::new()), + votes_filter: RwLock::new(VotesFilter::new()), next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), known_peers, } } - /// Note a voting round. + /// Update gossip validator filter. /// - /// Noting round will track gossiped votes for `round`. - pub(crate) fn note_round(&self, round: NumberFor) { - debug!(target: LOG_TARGET, "🥩 About to note gossip round #{}", round); - self.known_votes.write().insert(round); - } - - /// Conclude a voting round. - /// - /// This can be called once round is complete so we stop gossiping for it. - pub(crate) fn conclude_round(&self, round: NumberFor) { - debug!(target: LOG_TARGET, "🥩 About to drop gossip round #{}", round); - self.known_votes.write().conclude(round); + /// Only votes for `set_id` and rounds `start <= round <= end` will be accepted. + pub(crate) fn update_filter(&self, filter: GossipVoteFilter) { + debug!(target: LOG_TARGET, "🥩 New gossip filter {:?}", filter); + self.votes_filter.write().update(filter); } } @@ -152,25 +153,26 @@ where if let Ok(msg) = VoteMessage::, Public, Signature>::decode(&mut data) { let msg_hash = twox_64(data); let round = msg.commitment.block_number; + let set_id = msg.commitment.validator_set_id; + self.known_peers.lock().note_vote_for(*sender, round); // Verify general usefulness of the message. // We are going to discard old votes right away (without verification) // Also we keep track of already received votes to avoid verifying duplicates. { - let known_votes = self.known_votes.read(); + let filter = self.votes_filter.read(); - if !known_votes.is_live(&round) { + if !filter.is_live(round, set_id) { return ValidationResult::Discard } - if known_votes.is_known(&round, &msg_hash) { + if filter.is_known(round, &msg_hash) { return ValidationResult::ProcessAndKeep(self.topic) } } if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) { - self.known_votes.write().add_known(&round, msg_hash); - self.known_peers.lock().note_vote_for(*sender, round); + self.votes_filter.write().add_known(round, msg_hash); return ValidationResult::ProcessAndKeep(self.topic) } else { // TODO: report peer @@ -185,7 +187,7 @@ where } fn message_expired<'a>(&'a self) -> Box bool + 'a> { - let known_votes = self.known_votes.read(); + let filter = self.votes_filter.read(); Box::new(move |_topic, mut data| { let msg = match VoteMessage::, Public, Signature>::decode(&mut data) { Ok(vote) => vote, @@ -193,7 +195,8 @@ where }; let round = msg.commitment.block_number; - let expired = !known_votes.is_live(&round); + let set_id = msg.commitment.validator_set_id; + let expired = !filter.is_live(round, set_id); trace!(target: LOG_TARGET, "🥩 Message for round #{} expired: {}", round, expired); @@ -208,6 +211,7 @@ where let now = Instant::now(); let mut next_rebroadcast = self.next_rebroadcast.lock(); if now >= *next_rebroadcast { + trace!(target: LOG_TARGET, "🥩 Gossip rebroadcast"); *next_rebroadcast = now + REBROADCAST_AFTER; true } else { @@ -215,7 +219,7 @@ where } }; - let known_votes = self.known_votes.read(); + let filter = self.votes_filter.read(); Box::new(move |_who, intent, _topic, mut data| { if let MessageIntent::PeriodicRebroadcast = intent { return do_rebroadcast @@ -227,7 +231,8 @@ where }; let round = msg.commitment.block_number; - let allowed = known_votes.is_live(&round); + let set_id = msg.commitment.validator_set_id; + let allowed = filter.is_live(round, set_id); trace!(target: LOG_TARGET, "🥩 Message for round #{} allowed: {}", round, allowed); @@ -252,81 +257,35 @@ mod tests { #[test] fn known_votes_insert_remove() { - let mut kv = KnownVotes::::new(); + let mut kv = VotesFilter::::new(); + let msg_hash = twox_64(b"data"); - kv.insert(1); - kv.insert(1); - kv.insert(2); + kv.add_known(1, msg_hash); + kv.add_known(1, msg_hash); + kv.add_known(2, msg_hash); assert_eq!(kv.live.len(), 2); - let mut kv = KnownVotes::::new(); - kv.insert(1); - kv.insert(2); - kv.insert(3); + kv.add_known(3, msg_hash); + assert!(kv.is_known(3, &msg_hash)); + assert!(!kv.is_known(3, &twox_64(b"other"))); + assert!(!kv.is_known(4, &msg_hash)); + assert_eq!(kv.live.len(), 3); - assert!(kv.last_done.is_none()); - kv.conclude(2); + assert!(kv.filter.is_none()); + assert!(!kv.is_live(1, 1)); + + kv.update(GossipVoteFilter { start: 3, end: 10, validator_set_id: 1 }); assert_eq!(kv.live.len(), 1); - assert!(!kv.live.contains_key(&2)); - assert_eq!(kv.last_done, Some(2)); + assert!(kv.live.contains_key(&3)); + assert!(!kv.is_live(2, 1)); + assert!(kv.is_live(3, 1)); + assert!(kv.is_live(4, 1)); + assert!(!kv.is_live(4, 2)); - kv.conclude(1); - assert_eq!(kv.last_done, Some(2)); - - kv.conclude(3); - assert_eq!(kv.last_done, Some(3)); + kv.update(GossipVoteFilter { start: 5, end: 10, validator_set_id: 2 }); assert!(kv.live.is_empty()); } - #[test] - fn note_and_drop_round_works() { - let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); - - gv.note_round(1u64); - - assert!(gv.known_votes.read().is_live(&1u64)); - - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); - - assert_eq!(gv.known_votes.read().live.len(), 4); - - gv.conclude_round(7u64); - - let votes = gv.known_votes.read(); - - // rounds 1 and 3 are outdated, don't gossip anymore - assert!(!votes.is_live(&1u64)); - assert!(!votes.is_live(&3u64)); - // latest concluded round is still gossiped - assert!(votes.is_live(&7u64)); - // round 10 is alive and in-progress - assert!(votes.is_live(&10u64)); - } - - #[test] - fn note_same_round_twice() { - let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); - - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); - - assert_eq!(gv.known_votes.read().live.len(), 3); - - // note round #7 again -> should not change anything - gv.note_round(7u64); - - let votes = gv.known_votes.read(); - - assert_eq!(votes.live.len(), 3); - - assert!(votes.is_live(&3u64)); - assert!(votes.is_live(&7u64)); - assert!(votes.is_live(&10u64)); - } - struct TestContext; impl ValidatorContext for TestContext { fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) { @@ -368,21 +327,18 @@ mod tests { #[test] fn should_avoid_verifying_signatures_twice() { let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 }); let sender = sc_network::PeerId::random(); let mut context = TestContext; let vote = dummy_vote(3); - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); - // first time the cache should be populated let res = gv.validate(&mut context, &sender, &vote.encode()); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); assert_eq!( - gv.known_votes.read().live.get(&vote.commitment.block_number).map(|x| x.len()), + gv.votes_filter.read().live.get(&vote.commitment.block_number).map(|x| x.len()), Some(1) ); @@ -392,9 +348,11 @@ mod tests { assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); // next we should quickly reject if the round is not live - gv.conclude_round(7_u64); + gv.update_filter(GossipVoteFilter { start: 7, end: 10, validator_set_id: 0 }); - assert!(!gv.known_votes.read().is_live(&vote.commitment.block_number)); + let number = vote.commitment.block_number; + let set_id = vote.commitment.validator_set_id; + assert!(!gv.votes_filter.read().is_live(number, set_id)); let res = gv.validate(&mut context, &sender, &vote.encode()); @@ -404,14 +362,13 @@ mod tests { #[test] fn messages_allowed_and_expired() { let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 }); let sender = sc_network::PeerId::random(); let topic = Default::default(); let intent = MessageIntent::Broadcast; - // note round 2 and 3, then conclude 2 - gv.note_round(2u64); - gv.note_round(3u64); - gv.conclude_round(2u64); + // conclude 2 + gv.update_filter(GossipVoteFilter { start: 2, end: 10, validator_set_id: 0 }); let mut allowed = gv.message_allowed(); let mut expired = gv.message_expired(); @@ -447,6 +404,7 @@ mod tests { #[test] fn messages_rebroadcast() { let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 }); let sender = sc_network::PeerId::random(); let topic = Default::default(); diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index eb56a97de1..d632f58332 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -247,6 +247,8 @@ pub async fn start_beefy_gadget( } = network_params; let known_peers = Arc::new(Mutex::new(KnownPeers::new())); + // Default votes filter is to discard everything. + // Validator is updated later with correct starting round and set id. let gossip_validator = Arc::new(communication::gossip::GossipValidator::new(known_peers.clone())); let mut gossip_engine = sc_network_gossip::GossipEngine::new( @@ -284,6 +286,14 @@ pub async fn start_beefy_gadget( return }, }; + // Update the gossip validator with the right starting round and set id. + if let Err(e) = persisted_state + .current_gossip_filter() + .map(|f| gossip_validator.update_filter(f)) + { + error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); + return + } let worker_params = worker::WorkerParams { backend, diff --git a/substrate/client/consensus/beefy/src/metrics.rs b/substrate/client/consensus/beefy/src/metrics.rs index 0ce48e60eb..6653763fc6 100644 --- a/substrate/client/consensus/beefy/src/metrics.rs +++ b/substrate/client/consensus/beefy/src/metrics.rs @@ -45,10 +45,6 @@ pub struct VoterMetrics { pub beefy_lagging_sessions: Counter, /// Number of times no Authority public key found in store pub beefy_no_authority_found_in_store: Counter, - /// Number of currently buffered votes - pub beefy_buffered_votes: Gauge, - /// Number of votes dropped due to full buffers - pub beefy_buffered_votes_dropped: Counter, /// Number of good votes successfully handled pub beefy_good_votes_processed: Counter, /// Number of equivocation votes received @@ -65,8 +61,6 @@ pub struct VoterMetrics { pub beefy_imported_justifications: Counter, /// Number of justifications dropped due to full buffers pub beefy_buffered_justifications_dropped: Counter, - /// Trying to set Best Beefy block to old block - pub beefy_best_block_set_last_failure: Gauge, } impl PrometheusRegister for VoterMetrics { @@ -110,17 +104,6 @@ impl PrometheusRegister for VoterMetrics { )?, registry, )?, - beefy_buffered_votes: register( - Gauge::new("substrate_beefy_buffered_votes", "Number of currently buffered votes")?, - registry, - )?, - beefy_buffered_votes_dropped: register( - Counter::new( - "substrate_beefy_buffered_votes_dropped", - "Number of votes dropped due to full buffers", - )?, - registry, - )?, beefy_good_votes_processed: register( Counter::new( "substrate_beefy_successful_handled_votes", @@ -174,13 +157,6 @@ impl PrometheusRegister for VoterMetrics { )?, registry, )?, - beefy_best_block_set_last_failure: register( - Gauge::new( - "substrate_beefy_best_block_to_old_block", - "Trying to set Best Beefy block to old block", - )?, - registry, - )?, }) } } diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index 27dc8d8191..629f144246 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -526,17 +526,15 @@ async fn finalize_block_and_wait_for_beefy( net: &Arc>, // peer index and key peers: impl Iterator + Clone, - finalize_targets: &[H256], + finalize_target: &H256, expected_beefy: &[u64], ) { let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); - for block in finalize_targets { - peers.clone().for_each(|(index, _)| { - let client = net.lock().peer(index).client().as_client(); - client.finalize_block(*block, None).unwrap(); - }) - } + peers.clone().for_each(|(index, _)| { + let client = net.lock().peer(index).client().as_client(); + client.finalize_block(*finalize_target, None).unwrap(); + }); if expected_beefy.is_empty() { // run for quarter second then verify no new best beefy block available @@ -574,31 +572,32 @@ async fn beefy_finalizing_blocks() { let peers = peers.into_iter().enumerate(); // finalize block #5 -> BEEFY should finalize #1 (mandatory) and #5 from diff-power-of-two rule. - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[1], hashes[5]], &[1, 5]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[1], &[1]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[5], &[5]).await; // GRANDPA finalize #10 -> BEEFY finalize #10 (mandatory) - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[10]], &[10]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[10], &[10]).await; // GRANDPA finalize #18 -> BEEFY finalize #14, then #18 (diff-power-of-two rule) - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[18]], &[14, 18]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[18], &[14, 18]).await; // GRANDPA finalize #20 -> BEEFY finalize #20 (mandatory) - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[20]], &[20]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[20], &[20]).await; // GRANDPA finalize #21 -> BEEFY finalize nothing (yet) because min delta is 4 - finalize_block_and_wait_for_beefy(&net, peers, &[hashes[21]], &[]).await; + finalize_block_and_wait_for_beefy(&net, peers, &hashes[21], &[]).await; } #[tokio::test] async fn lagging_validators() { sp_tracing::try_init_simple(); - let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob]; + let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie]; let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 0).unwrap(); let session_len = 30; let min_block_delta = 1; - let mut net = BeefyTestNet::new(2); + let mut net = BeefyTestNet::new(3); let api = Arc::new(TestApi::with_validator_set(&validator_set)); let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); tokio::spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); @@ -611,55 +610,47 @@ async fn lagging_validators() { let peers = peers.into_iter().enumerate(); // finalize block #15 -> BEEFY should finalize #1 (mandatory) and #9, #13, #14, #15 from // diff-power-of-two rule. - finalize_block_and_wait_for_beefy( - &net, - peers.clone(), - &[hashes[1], hashes[15]], - &[1, 9, 13, 14, 15], - ) - .await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[1], &[1]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[15], &[9, 13, 14, 15]).await; - // Alice finalizes #25, Bob lags behind + // Alice and Bob finalize #25, Charlie lags behind let finalize = hashes[25]; let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); // verify nothing gets finalized by BEEFY - let timeout = Some(Duration::from_millis(250)); + let timeout = Some(Duration::from_millis(100)); streams_empty_after_timeout(best_blocks, &net, timeout).await; streams_empty_after_timeout(versioned_finality_proof, &net, None).await; - // Bob catches up and also finalizes #25 + // Charlie catches up and also finalizes #25 let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); - net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); - // expected beefy finalizes block #17 from diff-power-of-two + net.lock().peer(2).client().as_client().finalize_block(finalize, None).unwrap(); + // expected beefy finalizes blocks 23, 24, 25 from diff-power-of-two wait_for_best_beefy_blocks(best_blocks, &net, &[23, 24, 25]).await; wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &[23, 24, 25]).await; // Both finalize #30 (mandatory session) and #32 -> BEEFY finalize #30 (mandatory), #31, #32 - finalize_block_and_wait_for_beefy( - &net, - peers.clone(), - &[hashes[30], hashes[32]], - &[30, 31, 32], - ) - .await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[30], &[30]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[32], &[31, 32]).await; // Verify that session-boundary votes get buffered by client and only processed once // session-boundary block is GRANDPA-finalized (this guarantees authenticity for the new session // validator set). - // Alice finalizes session-boundary mandatory block #60, Bob lags behind + // Alice and Bob finalize session-boundary mandatory block #60, Charlie lags behind let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); let finalize = hashes[60]; net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); // verify nothing gets finalized by BEEFY - let timeout = Some(Duration::from_millis(250)); + let timeout = Some(Duration::from_millis(100)); streams_empty_after_timeout(best_blocks, &net, timeout).await; streams_empty_after_timeout(versioned_finality_proof, &net, None).await; - // Bob catches up and also finalizes #60 (and should have buffered Alice's vote on #60) + // Charlie catches up and also finalizes #60 (and should have buffered Alice's vote on #60) let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers); - net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(2).client().as_client().finalize_block(finalize, None).unwrap(); // verify beefy skips intermediary votes, and successfully finalizes mandatory block #60 wait_for_best_beefy_blocks(best_blocks, &net, &[60]).await; wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &[60]).await; @@ -696,7 +687,8 @@ async fn correct_beefy_payload() { let net = Arc::new(Mutex::new(net)); let peers = peers.into_iter().enumerate(); // with 3 good voters and 1 bad one, consensus should happen and best blocks produced. - finalize_block_and_wait_for_beefy(&net, peers, &[hashes[1], hashes[10]], &[1, 9]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[1], &[1]).await; + finalize_block_and_wait_for_beefy(&net, peers, &hashes[10], &[9]).await; let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), [(0, BeefyKeyring::Alice)].into_iter()); @@ -708,7 +700,7 @@ async fn correct_beefy_payload() { net.lock().peer(3).client().as_client().finalize_block(hashof11, None).unwrap(); // verify consensus is _not_ reached - let timeout = Some(Duration::from_millis(250)); + let timeout = Some(Duration::from_millis(100)); streams_empty_after_timeout(best_blocks, &net, timeout).await; streams_empty_after_timeout(versioned_finality_proof, &net, None).await; @@ -874,39 +866,6 @@ async fn beefy_importing_justifications() { } } -#[tokio::test] -async fn voter_initialization() { - sp_tracing::try_init_simple(); - // Regression test for voter initialization where finality notifications were dropped - // after waiting for BEEFY pallet availability. - - let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob]; - let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 0).unwrap(); - let session_len = 5; - // Should vote on all mandatory blocks no matter the `min_block_delta`. - let min_block_delta = 10; - - let mut net = BeefyTestNet::new(2); - let api = Arc::new(TestApi::with_validator_set(&validator_set)); - let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); - tokio::spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); - - // push 26 blocks - let hashes = net.generate_blocks_and_sync(26, session_len, &validator_set, false).await; - let net = Arc::new(Mutex::new(net)); - - // Finalize multiple blocks at once to get a burst of finality notifications right from start. - // Need to finalize at least one block in each session, choose randomly. - // Expect voters to pick up all of them and BEEFY-finalize the mandatory blocks of each session. - finalize_block_and_wait_for_beefy( - &net, - peers.into_iter().enumerate(), - &[hashes[1], hashes[6], hashes[10], hashes[17], hashes[24], hashes[26]], - &[1, 5, 10, 15, 20, 25], - ) - .await; -} - #[tokio::test] async fn on_demand_beefy_justification_sync() { sp_tracing::try_init_simple(); @@ -940,13 +899,11 @@ async fn on_demand_beefy_justification_sync() { let net = Arc::new(Mutex::new(net)); // With 3 active voters and one inactive, consensus should happen and blocks BEEFY-finalized. // Need to finalize at least one block in each session, choose randomly. - finalize_block_and_wait_for_beefy( - &net, - fast_peers.clone(), - &[hashes[1], hashes[6], hashes[10], hashes[17], hashes[23]], - &[1, 5, 10, 15, 20], - ) - .await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[1], &[1]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[6], &[5]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[10], &[10]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[17], &[15]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[24], &[20]).await; // Spawn Dave, they are now way behind voting and can only catch up through on-demand justif // sync. @@ -971,7 +928,8 @@ async fn on_demand_beefy_justification_sync() { // freshly spun up Dave now needs to listen for gossip to figure out the state of their peers. // Have the other peers do some gossip so Dave finds out about their progress. - finalize_block_and_wait_for_beefy(&net, fast_peers, &[hashes[25], hashes[29]], &[25, 29]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers.clone(), &hashes[25], &[25]).await; + finalize_block_and_wait_for_beefy(&net, fast_peers, &hashes[29], &[29]).await; // Kick Dave's async loop by finalizing another block. client.finalize_block(hashes[2], None).unwrap(); @@ -982,13 +940,14 @@ async fn on_demand_beefy_justification_sync() { // Give all tasks some cpu cycles to burn through their events queues, run_for(Duration::from_millis(100), &net).await; // then verify Dave catches up through on-demand justification requests. - finalize_block_and_wait_for_beefy( - &net, - [(dave_index, BeefyKeyring::Dave)].into_iter(), - &[hashes[6], hashes[10], hashes[17], hashes[24], hashes[26]], - &[5, 10, 15, 20, 25], - ) - .await; + let (dave_best_blocks, _) = + get_beefy_streams(&mut net.lock(), [(dave_index, BeefyKeyring::Dave)].into_iter()); + client.finalize_block(hashes[6], None).unwrap(); + client.finalize_block(hashes[10], None).unwrap(); + client.finalize_block(hashes[17], None).unwrap(); + client.finalize_block(hashes[24], None).unwrap(); + client.finalize_block(hashes[26], None).unwrap(); + wait_for_best_beefy_blocks(dave_best_blocks, &net, &[5, 10, 15, 20, 25]).await; } #[tokio::test] @@ -1022,13 +981,8 @@ async fn should_initialize_voter_at_genesis() { // verify next vote target is mandatory block 1 assert_eq!(persisted_state.best_beefy_block(), 0); - assert_eq!(persisted_state.best_grandpa_block(), 13); - assert_eq!( - persisted_state - .voting_oracle() - .voting_target(persisted_state.best_beefy_block(), 13), - Some(1) - ); + assert_eq!(persisted_state.best_grandpa_number(), 13); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(1)); // verify state also saved to db assert!(verify_persisted_version(&*backend)); @@ -1070,13 +1024,8 @@ async fn should_initialize_voter_at_custom_genesis() { // verify next vote target is mandatory block 7 assert_eq!(persisted_state.best_beefy_block(), 0); - assert_eq!(persisted_state.best_grandpa_block(), 8); - assert_eq!( - persisted_state - .voting_oracle() - .voting_target(persisted_state.best_beefy_block(), 13), - Some(custom_pallet_genesis) - ); + assert_eq!(persisted_state.best_grandpa_number(), 8); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(custom_pallet_genesis)); // verify state also saved to db assert!(verify_persisted_version(&*backend)); @@ -1128,14 +1077,9 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() { // verify block 10 is correctly marked as finalized assert_eq!(persisted_state.best_beefy_block(), 10); - assert_eq!(persisted_state.best_grandpa_block(), 13); + assert_eq!(persisted_state.best_grandpa_number(), 13); // verify next vote target is diff-power-of-two block 12 - assert_eq!( - persisted_state - .voting_oracle() - .voting_target(persisted_state.best_beefy_block(), 13), - Some(12) - ); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(12)); // verify state also saved to db assert!(verify_persisted_version(&*backend)); @@ -1186,13 +1130,8 @@ async fn should_initialize_voter_at_latest_finalized() { // verify next vote target is 13 assert_eq!(persisted_state.best_beefy_block(), 12); - assert_eq!(persisted_state.best_grandpa_block(), 13); - assert_eq!( - persisted_state - .voting_oracle() - .voting_target(persisted_state.best_beefy_block(), 13), - Some(13) - ); + assert_eq!(persisted_state.best_grandpa_number(), 13); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(13)); // verify state also saved to db assert!(verify_persisted_version(&*backend)); @@ -1225,13 +1164,13 @@ async fn beefy_finalizing_after_pallet_genesis() { // Minimum BEEFY block delta is 1. // GRANDPA finalize blocks leading up to BEEFY pallet genesis -> BEEFY should finalize nothing. - finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[1..14], &[]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[14], &[]).await; // GRANDPA finalize block #16 -> BEEFY should finalize #15 (genesis mandatory) and #16. - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[16]], &[15, 16]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[16], &[15, 16]).await; // GRANDPA finalize #21 -> BEEFY finalize #20 (mandatory) and #21 - finalize_block_and_wait_for_beefy(&net, peers.clone(), &[hashes[21]], &[20, 21]).await; + finalize_block_and_wait_for_beefy(&net, peers.clone(), &hashes[21], &[20, 21]).await; } #[tokio::test] @@ -1249,14 +1188,14 @@ async fn beefy_reports_equivocations() { let mut api_alice = TestApi::with_validator_set(&validator_set); api_alice.allow_equivocations(); let api_alice = Arc::new(api_alice); - let alice = (0, &peers[0], api_alice.clone()); + let alice = (0, &BeefyKeyring::Alice, api_alice.clone()); tokio::spawn(initialize_beefy(&mut net, vec![alice], min_block_delta)); // Bob votes on bad MMR roots, equivocations are allowed/expected. let mut api_bob = TestApi::new(1, &validator_set, BAD_MMR_ROOT); api_bob.allow_equivocations(); let api_bob = Arc::new(api_bob); - let bob = (1, &peers[1], api_bob.clone()); + let bob = (1, &BeefyKeyring::Bob, api_bob.clone()); tokio::spawn(initialize_beefy(&mut net, vec![bob], min_block_delta)); // We spawn another node voting with Bob key, on alternate bad MMR roots (equivocating). @@ -1276,7 +1215,7 @@ async fn beefy_reports_equivocations() { let peers = peers.into_iter().enumerate(); // finalize block #1 -> BEEFY should not finalize anything (each node votes on different MMR). - finalize_block_and_wait_for_beefy(&net, peers, &[hashes[1]], &[]).await; + finalize_block_and_wait_for_beefy(&net, peers, &hashes[1], &[]).await; // Verify neither Bob or Bob_Prime report themselves as equivocating. assert!(api_bob.reported_equivocations.as_ref().unwrap().lock().is_empty()); diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index 0abb38d022..0260d7693c 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -18,13 +18,13 @@ use crate::{ communication::{ - gossip::{topic, GossipValidator}, + gossip::{topic, GossipValidator, GossipVoteFilter}, request_response::outgoing_requests_engine::OnDemandJustificationsEngine, }, error::Error, justification::BeefyVersionedFinalityProof, keystore::{BeefyKeystore, BeefySignatureHasher}, - metric_get, metric_inc, metric_set, + metric_inc, metric_set, metrics::VoterMetrics, round::{Rounds, VoteImportResult}, BeefyVoterLinks, LOG_TARGET, @@ -42,24 +42,19 @@ use sp_consensus_beefy::{ check_equivocation_proof, crypto::{AuthorityId, Signature}, BeefyApi, Commitment, ConsensusLog, EquivocationProof, PayloadProvider, ValidatorSet, - VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, + ValidatorSetId, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, }; use sp_runtime::{ generic::OpaqueDigestItemId, - traits::{Block, ConstU32, Header, NumberFor, Zero}, - BoundedVec, SaturatedConversion, + traits::{Block, Header, NumberFor, Zero}, + SaturatedConversion, }; use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, fmt::Debug, - marker::PhantomData, sync::Arc, }; -/// Bound for the number of buffered future voting rounds. -const MAX_BUFFERED_VOTE_ROUNDS: usize = 600; -/// Bound for the number of buffered votes per round number. -const MAX_BUFFERED_VOTES_PER_ROUND: u32 = 1000; /// Bound for the number of pending justifications - use 2400 - the max number /// of justifications possible in a single session. const MAX_BUFFERED_JUSTIFICATIONS: usize = 2400; @@ -87,23 +82,34 @@ pub(crate) struct VoterOracle { sessions: VecDeque>, /// Min delta in block numbers between two blocks, BEEFY should vote on. min_block_delta: u32, + /// Best block we received a GRANDPA finality for. + best_grandpa_block_header: ::Header, + /// Best block a BEEFY voting round has been concluded for. + best_beefy_block: NumberFor, } impl VoterOracle { /// Verify provided `sessions` satisfies requirements, then build `VoterOracle`. - pub fn checked_new(sessions: VecDeque>, min_block_delta: u32) -> Option { + pub fn checked_new( + sessions: VecDeque>, + min_block_delta: u32, + grandpa_header: ::Header, + best_beefy: NumberFor, + ) -> Option { let mut prev_start = Zero::zero(); let mut prev_validator_id = None; // verifies the let mut validate = || -> bool { - if sessions.is_empty() { + let best_grandpa = *grandpa_header.number(); + if sessions.is_empty() || best_beefy > best_grandpa { return false } for (idx, session) in sessions.iter().enumerate() { + let start = session.session_start(); if session.validators().is_empty() { return false } - if session.session_start() <= prev_start { + if start > best_grandpa || start <= prev_start { return false } #[cfg(not(test))] @@ -125,23 +131,35 @@ impl VoterOracle { sessions, // Always target at least one block better than current best beefy. min_block_delta: min_block_delta.max(1), + best_grandpa_block_header: grandpa_header, + best_beefy_block: best_beefy, }) } else { - error!(target: LOG_TARGET, "🥩 Invalid sessions queue: {:?}.", sessions); + error!( + target: LOG_TARGET, + "🥩 Invalid sessions queue: {:?}; best-beefy {:?} best-grandpa-header {:?}.", + sessions, + best_beefy, + grandpa_header + ); None } } // Return reference to rounds pertaining to first session in the queue. // Voting will always happen at the head of the queue. - fn active_rounds(&self) -> Option<&Rounds> { - self.sessions.front() + fn active_rounds(&self) -> Result<&Rounds, Error> { + self.sessions.front().ok_or(Error::UninitSession) } // Return mutable reference to rounds pertaining to first session in the queue. // Voting will always happen at the head of the queue. - fn active_rounds_mut(&mut self) -> Option<&mut Rounds> { - self.sessions.front_mut() + fn active_rounds_mut(&mut self) -> Result<&mut Rounds, Error> { + self.sessions.front_mut().ok_or(Error::UninitSession) + } + + fn current_validator_set_id(&self) -> Result { + self.active_rounds().map(|r| r.validator_set_id()) } // Prune the sessions queue to keep the Oracle in one of the expected three states. @@ -164,7 +182,7 @@ impl VoterOracle { /// Finalize a particular block. pub fn finalize(&mut self, block: NumberFor) -> Result<(), Error> { // Conclude voting round for this block. - self.active_rounds_mut().ok_or(Error::UninitSession)?.conclude(block); + self.active_rounds_mut()?.conclude(block); // Prune any now "finalized" sessions from queue. self.try_prune(); Ok(()) @@ -182,30 +200,26 @@ impl VoterOracle { } /// Return `(A, B)` tuple representing inclusive [A, B] interval of votes to accept. - pub fn accepted_interval( - &self, - best_grandpa: NumberFor, - ) -> Result<(NumberFor, NumberFor), Error> { + pub fn accepted_interval(&self) -> Result<(NumberFor, NumberFor), Error> { let rounds = self.sessions.front().ok_or(Error::UninitSession)?; if rounds.mandatory_done() { // There's only one session active and its mandatory is done. - // Accept any GRANDPA finalized vote. - Ok((rounds.session_start(), best_grandpa.into())) + // Accept any vote for a GRANDPA finalized block in a better round. + Ok(( + rounds.session_start().max(self.best_beefy_block), + (*self.best_grandpa_block_header.number()).into(), + )) } else { - // There's at least one session with mandatory not done. + // Current session has mandatory not done. // Only accept votes for the mandatory block in the front of queue. Ok((rounds.session_start(), rounds.session_start())) } } /// Utility function to quickly decide what to do for each round. - pub fn triage_round( - &self, - round: NumberFor, - best_grandpa: NumberFor, - ) -> Result { - let (start, end) = self.accepted_interval(best_grandpa)?; + pub fn triage_round(&self, round: NumberFor) -> Result { + let (start, end) = self.accepted_interval()?; if start <= round && round <= end { Ok(RoundAction::Process) } else if round > end { @@ -217,17 +231,15 @@ impl VoterOracle { /// Return `Some(number)` if we should be voting on block `number`, /// return `None` if there is no block we should vote on. - pub fn voting_target( - &self, - best_beefy: NumberFor, - best_grandpa: NumberFor, - ) -> Option> { + pub fn voting_target(&self) -> Option> { let rounds = if let Some(r) = self.sessions.front() { r } else { debug!(target: LOG_TARGET, "🥩 No voting round started"); return None }; + let best_grandpa = *self.best_grandpa_block_header.number(); + let best_beefy = self.best_beefy_block; // `target` is guaranteed > `best_beefy` since `min_block_delta` is at least `1`. let target = @@ -259,10 +271,6 @@ pub(crate) struct WorkerParams { #[derive(Debug, Decode, Encode, PartialEq)] pub(crate) struct PersistedState { - /// Best block we received a GRANDPA finality for. - best_grandpa_block_header: ::Header, - /// Best block a BEEFY voting round has been concluded for. - best_beefy_block: NumberFor, /// Best block we voted on. best_voted: NumberFor, /// Chooses which incoming votes to accept and which votes to generate. @@ -277,20 +285,26 @@ impl PersistedState { sessions: VecDeque>, min_block_delta: u32, ) -> Option { - VoterOracle::checked_new(sessions, min_block_delta).map(|voting_oracle| PersistedState { - best_grandpa_block_header: grandpa_header, - best_beefy_block: best_beefy, - best_voted: Zero::zero(), - voting_oracle, - }) + VoterOracle::checked_new(sessions, min_block_delta, grandpa_header, best_beefy) + .map(|voting_oracle| PersistedState { best_voted: Zero::zero(), voting_oracle }) } pub(crate) fn set_min_block_delta(&mut self, min_block_delta: u32) { self.voting_oracle.min_block_delta = min_block_delta.max(1); } + pub(crate) fn set_best_beefy(&mut self, best_beefy: NumberFor) { + self.voting_oracle.best_beefy_block = best_beefy; + } + pub(crate) fn set_best_grandpa(&mut self, best_grandpa: ::Header) { - self.best_grandpa_block_header = best_grandpa; + self.voting_oracle.best_grandpa_block_header = best_grandpa; + } + + pub(crate) fn current_gossip_filter(&self) -> Result, Error> { + let (start, end) = self.voting_oracle.accepted_interval()?; + let validator_set_id = self.voting_oracle.current_validator_set_id()?; + Ok(GossipVoteFilter { start, end, validator_set_id }) } } @@ -315,14 +329,6 @@ pub(crate) struct BeefyWorker { // voter state /// BEEFY client metrics. metrics: Option, - /// Buffer holding votes for future processing. - pending_votes: BTreeMap< - NumberFor, - BoundedVec< - VoteMessage, AuthorityId, Signature>, - ConstU32, - >, - >, /// Buffer holding justifications for future processing. pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, /// Persisted voter state. @@ -370,25 +376,20 @@ where on_demand_justifications, links, metrics, - pending_votes: BTreeMap::new(), pending_justifications: BTreeMap::new(), persisted_state, } } fn best_grandpa_block(&self) -> NumberFor { - *self.persisted_state.best_grandpa_block_header.number() - } - - fn best_beefy_block(&self) -> NumberFor { - self.persisted_state.best_beefy_block + *self.persisted_state.voting_oracle.best_grandpa_block_header.number() } fn voting_oracle(&self) -> &VoterOracle { &self.persisted_state.voting_oracle } - fn active_rounds(&mut self) -> Option<&Rounds> { + fn active_rounds(&mut self) -> Result<&Rounds, Error> { self.persisted_state.voting_oracle.active_rounds() } @@ -429,7 +430,7 @@ where debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set); // BEEFY should finalize a mandatory block during each session. - if let Some(active_session) = self.active_rounds() { + if let Ok(active_session) = self.active_rounds() { if !active_session.mandatory_done() { debug!( target: LOG_TARGET, @@ -460,12 +461,17 @@ where } fn handle_finality_notification(&mut self, notification: &FinalityNotification) { - debug!(target: LOG_TARGET, "🥩 Finality notification: {:?}", notification); + debug!( + target: LOG_TARGET, + "🥩 Finality notification: header {:?} tree_route {:?}", + notification.header, + notification.tree_route, + ); let header = ¬ification.header; if *header.number() > self.best_grandpa_block() { // update best GRANDPA finalized block we have seen - self.persisted_state.best_grandpa_block_header = header.clone(); + self.persisted_state.set_best_grandpa(header.clone()); // Check all (newly) finalized blocks for new session(s). let backend = self.backend.clone(); @@ -484,38 +490,28 @@ where self.init_session_at(new_validator_set, *header.number()); } } + + // Update gossip validator votes filter. + if let Err(e) = self + .persisted_state + .current_gossip_filter() + .map(|filter| self.gossip_validator.update_filter(filter)) + { + error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e); + } } } - /// Based on [VoterOracle] this vote is either processed here or enqueued for later. + /// Based on [VoterOracle] this vote is either processed here or discarded. fn triage_incoming_vote( &mut self, vote: VoteMessage, AuthorityId, Signature>, ) -> Result<(), Error> { let block_num = vote.commitment.block_number; - let best_grandpa = self.best_grandpa_block(); - self.gossip_validator.note_round(block_num); - match self.voting_oracle().triage_round(block_num, best_grandpa)? { + match self.voting_oracle().triage_round(block_num)? { RoundAction::Process => self.handle_vote(vote)?, - RoundAction::Enqueue => { - debug!(target: LOG_TARGET, "🥩 Buffer vote for round: {:?}.", block_num); - if self.pending_votes.len() < MAX_BUFFERED_VOTE_ROUNDS { - let votes_vec = self.pending_votes.entry(block_num).or_default(); - if votes_vec.try_push(vote).is_ok() { - metric_inc!(self, beefy_buffered_votes); - } else { - warn!( - target: LOG_TARGET, - "🥩 Buffer vote dropped for round: {:?}", block_num - ); - metric_inc!(self, beefy_buffered_votes_dropped); - } - } else { - warn!(target: LOG_TARGET, "🥩 Buffer vote dropped for round: {:?}.", block_num); - metric_inc!(self, beefy_buffered_votes_dropped); - } - }, RoundAction::Drop => metric_inc!(self, beefy_stale_votes), + RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote), }; Ok(()) } @@ -531,8 +527,7 @@ where VersionedFinalityProof::V1(ref sc) => sc, }; let block_num = signed_commitment.commitment.block_number; - let best_grandpa = self.best_grandpa_block(); - match self.voting_oracle().triage_round(block_num, best_grandpa)? { + match self.voting_oracle().triage_round(block_num)? { RoundAction::Process => { debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num); metric_inc!(self, beefy_imported_justifications); @@ -560,11 +555,7 @@ where &mut self, vote: VoteMessage, AuthorityId, Signature>, ) -> Result<(), Error> { - let rounds = self - .persisted_state - .voting_oracle - .active_rounds_mut() - .ok_or(Error::UninitSession)?; + let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?; let block_number = vote.commitment.block_number; match rounds.add_vote(vote) { @@ -608,7 +599,7 @@ where /// 3. Persist voter state, /// 4. Send best block hash and `finality_proof` to RPC worker. /// - /// Expects `finality proof` to be valid. + /// Expects `finality proof` to be valid and for a block > current-best-beefy. fn finalize(&mut self, finality_proof: BeefyVersionedFinalityProof) -> Result<(), Error> { let block_num = match finality_proof { VersionedFinalityProof::V1(ref sc) => sc.commitment.block_number, @@ -616,74 +607,62 @@ where // Finalize inner round and update voting_oracle state. self.persisted_state.voting_oracle.finalize(block_num)?; - self.gossip_validator.conclude_round(block_num); - if block_num > self.best_beefy_block() { - // Set new best BEEFY block number. - self.persisted_state.best_beefy_block = block_num; - crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) - .map_err(|e| Error::Backend(e.to_string()))?; + // Set new best BEEFY block number. + self.persisted_state.set_best_beefy(block_num); + crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) + .map_err(|e| Error::Backend(e.to_string()))?; - metric_set!(self, beefy_best_block, block_num); + metric_set!(self, beefy_best_block, block_num); - self.on_demand_justifications.cancel_requests_older_than(block_num); + self.on_demand_justifications.cancel_requests_older_than(block_num); - if let Err(e) = self - .backend - .blockchain() - .expect_block_hash_from_id(&BlockId::Number(block_num)) - .and_then(|hash| { - self.links - .to_rpc_best_block_sender - .notify(|| Ok::<_, ()>(hash)) - .expect("forwards closure result; the closure always returns Ok; qed."); + if let Err(e) = self + .backend + .blockchain() + .expect_block_hash_from_id(&BlockId::Number(block_num)) + .and_then(|hash| { + self.links + .to_rpc_best_block_sender + .notify(|| Ok::<_, ()>(hash)) + .expect("forwards closure result; the closure always returns Ok; qed."); - self.backend - .append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode())) - }) { - error!( - target: LOG_TARGET, - "🥩 Error {:?} on appending justification: {:?}", e, finality_proof - ); - } - - self.links - .to_rpc_justif_sender - .notify(|| Ok::<_, ()>(finality_proof)) - .expect("forwards closure result; the closure always returns Ok; qed."); - } else { - debug!(target: LOG_TARGET, "🥩 Can't set best beefy to old: {}", block_num); - metric_set!(self, beefy_best_block_set_last_failure, block_num); + self.backend + .append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode())) + }) { + error!( + target: LOG_TARGET, + "🥩 Error {:?} on appending justification: {:?}", e, finality_proof + ); } + + self.links + .to_rpc_justif_sender + .notify(|| Ok::<_, ()>(finality_proof)) + .expect("forwards closure result; the closure always returns Ok; qed."); + + // Update gossip validator votes filter. + self.persisted_state + .current_gossip_filter() + .map(|filter| self.gossip_validator.update_filter(filter))?; Ok(()) } - /// Handle previously buffered justifications and votes that now land in the voting interval. - fn try_pending_justif_and_votes(&mut self) -> Result<(), Error> { - let best_grandpa = self.best_grandpa_block(); - let _ph = PhantomData::::default(); - - fn to_process_for( - pending: &mut BTreeMap, T>, - (start, end): (NumberFor, NumberFor), - _: PhantomData, - ) -> BTreeMap, T> { - // These are still pending. - let still_pending = pending.split_off(&end.saturating_add(1u32.into())); - // These can be processed. - let to_handle = pending.split_off(&start); - // The rest can be dropped. - *pending = still_pending; - // Return ones to process. - to_handle - } + /// Handle previously buffered justifications, that now land in the voting interval. + fn try_pending_justififactions(&mut self) -> Result<(), Error> { // Interval of blocks for which we can process justifications and votes right now. - let mut interval = self.voting_oracle().accepted_interval(best_grandpa)?; - + let (start, end) = self.voting_oracle().accepted_interval()?; // Process pending justifications. if !self.pending_justifications.is_empty() { - let justifs_to_handle = to_process_for(&mut self.pending_justifications, interval, _ph); - for (num, justification) in justifs_to_handle.into_iter() { + // These are still pending. + let still_pending = + self.pending_justifications.split_off(&end.saturating_add(1u32.into())); + // These can be processed. + let justifs_to_process = self.pending_justifications.split_off(&start); + // The rest can be dropped. + self.pending_justifications = still_pending; + + for (num, justification) in justifs_to_process.into_iter() { debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num); metric_inc!(self, beefy_imported_justifications); if let Err(err) = self.finalize(justification) { @@ -691,27 +670,6 @@ where } } metric_set!(self, beefy_buffered_justifications, self.pending_justifications.len()); - // Possibly new interval after processing justifications. - interval = self.voting_oracle().accepted_interval(best_grandpa)?; - } - - // Process pending votes. - if !self.pending_votes.is_empty() { - let mut processed = 0u64; - let votes_to_handle = to_process_for(&mut self.pending_votes, interval, _ph); - for (num, votes) in votes_to_handle.into_iter() { - debug!(target: LOG_TARGET, "🥩 Handle buffered votes for: {:?}.", num); - processed += votes.len() as u64; - for v in votes.into_iter() { - if let Err(err) = self.handle_vote(v) { - error!(target: LOG_TARGET, "🥩 Error handling buffered vote: {}", err); - }; - } - } - if let Some(previous) = metric_get!(self, beefy_buffered_votes) { - previous.sub(processed); - metric_set!(self, beefy_buffered_votes, previous.get()); - } } Ok(()) } @@ -719,10 +677,7 @@ where /// Decide if should vote, then vote.. or don't.. fn try_to_vote(&mut self) -> Result<(), Error> { // Vote if there's now a new vote target. - if let Some(target) = self - .voting_oracle() - .voting_target(self.best_beefy_block(), self.best_grandpa_block()) - { + if let Some(target) = self.voting_oracle().voting_target() { metric_set!(self, beefy_should_vote_on, target); if target > self.persisted_state.best_voted { self.do_vote(target)?; @@ -740,7 +695,7 @@ where // Most of the time we get here, `target` is actually `best_grandpa`, // avoid getting header from backend in that case. let target_header = if target_number == self.best_grandpa_block() { - self.persisted_state.best_grandpa_block_header.clone() + self.persisted_state.voting_oracle.best_grandpa_block_header.clone() } else { let hash = self .backend @@ -771,11 +726,7 @@ where return Ok(()) }; - let rounds = self - .persisted_state - .voting_oracle - .active_rounds_mut() - .ok_or(Error::UninitSession)?; + let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?; let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id()); let authority_id = if let Some(id) = self.key_store.authority_id(validators) { @@ -830,7 +781,7 @@ where fn process_new_state(&mut self) { // Handle pending justifications and/or votes for now GRANDPA finalized blocks. - if let Err(err) = self.try_pending_justif_and_votes() { + if let Err(err) = self.try_pending_justififactions() { debug!(target: LOG_TARGET, "🥩 {}", err); } @@ -867,12 +818,12 @@ where self.gossip_engine .messages_for(topic::()) .filter_map(|notification| async move { - trace!(target: LOG_TARGET, "🥩 Got vote message: {:?}", notification); - - VoteMessage::, AuthorityId, Signature>::decode( + let vote = VoteMessage::, AuthorityId, Signature>::decode( &mut ¬ification.message[..], ) - .ok() + .ok(); + trace!(target: LOG_TARGET, "🥩 Got vote message: {:?}", vote); + vote }) .fuse(), ); @@ -946,8 +897,7 @@ where &self, proof: EquivocationProof, AuthorityId, Signature>, ) -> Result<(), Error> { - let rounds = - self.persisted_state.voting_oracle.active_rounds().ok_or(Error::UninitSession)?; + let rounds = self.persisted_state.voting_oracle.active_rounds()?; let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id()); let offender_id = proof.offender_id().clone(); @@ -1083,16 +1033,16 @@ pub(crate) mod tests { &self.voting_oracle } - pub fn active_round(&self) -> Option<&Rounds> { + pub fn active_round(&self) -> Result<&Rounds, Error> { self.voting_oracle.active_rounds() } pub fn best_beefy_block(&self) -> NumberFor { - self.best_beefy_block + self.voting_oracle.best_beefy_block } - pub fn best_grandpa_block(&self) -> NumberFor { - *self.best_grandpa_block_header.number() + pub fn best_grandpa_number(&self) -> NumberFor { + *self.voting_oracle.best_grandpa_block_header.number() } } @@ -1103,7 +1053,7 @@ pub(crate) mod tests { } fn create_beefy_worker( - peer: &BeefyPeer, + peer: &mut BeefyPeer, key: &Keyring, min_block_delta: u32, genesis_validator_set: ValidatorSet, @@ -1153,12 +1103,15 @@ pub(crate) mod tests { known_peers, None, ); - let genesis_header = backend + // Push 1 block - will start first session. + let hashes = peer.push_blocks(1, false); + backend.finalize_block(hashes[0], None).unwrap(); + let first_header = backend .blockchain() - .expect_header(backend.blockchain().info().genesis_hash) + .expect_header(backend.blockchain().info().best_hash) .unwrap(); let persisted_state = PersistedState::checked_new( - genesis_header, + first_header, Zero::zero(), vec![Rounds::new(One::one(), genesis_validator_set)].into(), min_block_delta, @@ -1275,10 +1228,30 @@ pub(crate) mod tests { #[test] fn should_vote_target() { - let mut oracle = VoterOracle:: { min_block_delta: 1, sessions: VecDeque::new() }; + let header = Header::new( + 1u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + let mut oracle = VoterOracle:: { + best_beefy_block: 0, + best_grandpa_block_header: header, + min_block_delta: 1, + sessions: VecDeque::new(), + }; + let voting_target_with = |oracle: &mut VoterOracle, + best_beefy: NumberFor, + best_grandpa: NumberFor| + -> Option> { + oracle.best_beefy_block = best_beefy; + oracle.best_grandpa_block_header.number = best_grandpa; + oracle.voting_target() + }; // rounds not initialized -> should vote: `None` - assert_eq!(oracle.voting_target(0, 1), None); + assert_eq!(voting_target_with(&mut oracle, 0, 1), None); let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); @@ -1287,29 +1260,29 @@ pub(crate) mod tests { // under min delta oracle.min_block_delta = 4; - assert_eq!(oracle.voting_target(1, 1), None); - assert_eq!(oracle.voting_target(2, 5), None); + assert_eq!(voting_target_with(&mut oracle, 1, 1), None); + assert_eq!(voting_target_with(&mut oracle, 2, 5), None); // vote on min delta - assert_eq!(oracle.voting_target(4, 9), Some(8)); + assert_eq!(voting_target_with(&mut oracle, 4, 9), Some(8)); oracle.min_block_delta = 8; - assert_eq!(oracle.voting_target(10, 18), Some(18)); + assert_eq!(voting_target_with(&mut oracle, 10, 18), Some(18)); // vote on power of two oracle.min_block_delta = 1; - assert_eq!(oracle.voting_target(1000, 1008), Some(1004)); - assert_eq!(oracle.voting_target(1000, 1016), Some(1008)); + assert_eq!(voting_target_with(&mut oracle, 1000, 1008), Some(1004)); + assert_eq!(voting_target_with(&mut oracle, 1000, 1016), Some(1008)); // nothing new to vote on - assert_eq!(oracle.voting_target(1000, 1000), None); + assert_eq!(voting_target_with(&mut oracle, 1000, 1000), None); // vote on mandatory oracle.sessions.clear(); oracle.add_session(Rounds::new(1000, validator_set.clone())); - assert_eq!(oracle.voting_target(0, 1008), Some(1000)); + assert_eq!(voting_target_with(&mut oracle, 0, 1008), Some(1000)); oracle.sessions.clear(); oracle.add_session(Rounds::new(1001, validator_set.clone())); - assert_eq!(oracle.voting_target(1000, 1008), Some(1001)); + assert_eq!(voting_target_with(&mut oracle, 1000, 1008), Some(1001)); } #[test] @@ -1317,16 +1290,34 @@ pub(crate) mod tests { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut oracle = VoterOracle:: { min_block_delta: 1, sessions: VecDeque::new() }; + let header = Header::new( + 1u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + let mut oracle = VoterOracle:: { + best_beefy_block: 0, + best_grandpa_block_header: header, + min_block_delta: 1, + sessions: VecDeque::new(), + }; + let accepted_interval_with = |oracle: &mut VoterOracle, + best_grandpa: NumberFor| + -> Result<(NumberFor, NumberFor), Error> { + oracle.best_grandpa_block_header.number = best_grandpa; + oracle.accepted_interval() + }; // rounds not initialized -> should accept votes: `None` - assert!(oracle.accepted_interval(1).is_err()); + assert!(accepted_interval_with(&mut oracle, 1).is_err()); let session_one = 1; oracle.add_session(Rounds::new(session_one, validator_set.clone())); // mandatory not done, only accept mandatory for i in 0..15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_one, session_one))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_one, session_one))); } // add more sessions, nothing changes @@ -1336,7 +1327,7 @@ pub(crate) mod tests { oracle.add_session(Rounds::new(session_three, validator_set.clone())); // mandatory not done, should accept mandatory for session_one for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_one, session_one))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_one, session_one))); } // simulate finish mandatory for session one, prune oracle @@ -1344,7 +1335,7 @@ pub(crate) mod tests { oracle.try_prune(); // session_one pruned, should accept mandatory for session_two for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_two, session_two))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_two, session_two))); } // simulate finish mandatory for session two, prune oracle @@ -1352,26 +1343,29 @@ pub(crate) mod tests { oracle.try_prune(); // session_two pruned, should accept mandatory for session_three for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_three, session_three))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, session_three))); } // simulate finish mandatory for session three oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true); // verify all other blocks in this session are now open to voting for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_three, i))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, i))); } // pruning does nothing in this case oracle.try_prune(); for i in session_three..session_three + 15 { - assert_eq!(oracle.accepted_interval(i), Ok((session_three, i))); + assert_eq!(accepted_interval_with(&mut oracle, i), Ok((session_three, i))); } // adding new session automatically prunes "finalized" previous session let session_four = 31; oracle.add_session(Rounds::new(session_four, validator_set.clone())); assert_eq!(oracle.sessions.front().unwrap().session_start(), session_four); - assert_eq!(oracle.accepted_interval(session_four + 10), Ok((session_four, session_four))); + assert_eq!( + accepted_interval_with(&mut oracle, session_four + 10), + Ok((session_four, session_four)) + ); } #[test] @@ -1405,7 +1399,7 @@ pub(crate) mod tests { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let mut net = BeefyTestNet::new(1); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); + let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); // keystore doesn't contain other keys than validators' assert_eq!(worker.verify_validator_set(&1, &validator_set), Ok(())); @@ -1429,7 +1423,7 @@ pub(crate) mod tests { let validator_set = ValidatorSet::new(make_beefy_ids(&keys), 0).unwrap(); let mut net = BeefyTestNet::new(1); let backend = net.peer(0).client().as_backend(); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); + let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); // remove default session, will manually add custom one. worker.persisted_state.voting_oracle.sessions.clear(); @@ -1449,7 +1443,7 @@ pub(crate) mod tests { }; // no 'best beefy block' or finality proofs - assert_eq!(worker.best_beefy_block(), 0); + assert_eq!(worker.persisted_state.best_beefy_block(), 0); poll_fn(move |cx| { assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); assert_eq!(finality_proof.poll_next_unpin(cx), Poll::Pending); @@ -1457,6 +1451,7 @@ pub(crate) mod tests { }) .await; + let client = net.peer(0).client().as_client(); // unknown hash for block #1 let (mut best_block_streams, mut finality_proofs) = get_beefy_streams(&mut net, keys.clone()); @@ -1471,10 +1466,16 @@ pub(crate) mod tests { // try to finalize block #1 worker.finalize(justif.clone()).unwrap(); // verify block finalized - assert_eq!(worker.best_beefy_block(), 1); + assert_eq!(worker.persisted_state.best_beefy_block(), 1); poll_fn(move |cx| { - // unknown hash -> nothing streamed - assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + // expect Some(hash-of-block-1) + match best_block_stream.poll_next_unpin(cx) { + Poll::Ready(Some(hash)) => { + let block_num = client.number(hash).unwrap(); + assert_eq!(block_num, Some(1)); + }, + v => panic!("unexpected value: {:?}", v), + } // commitment streamed match finality_proof.poll_next_unpin(cx) { // expect justification @@ -1488,11 +1489,9 @@ pub(crate) mod tests { // generate 2 blocks, try again expect success let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); - let hashes = net.peer(0).push_blocks(2, false); + let hashes = net.peer(0).push_blocks(1, false); // finalize 1 and 2 without justifications (hashes does not contain genesis) - let hashof1 = hashes[0]; - let hashof2 = hashes[1]; - backend.finalize_block(hashof1, None).unwrap(); + let hashof2 = hashes[0]; backend.finalize_block(hashof2, None).unwrap(); let justif = create_finality_proof(2); @@ -1504,7 +1503,7 @@ pub(crate) mod tests { // new session starting at #2 is in front assert_eq!(worker.active_rounds().unwrap().session_start(), 2); // verify block finalized - assert_eq!(worker.best_beefy_block(), 2); + assert_eq!(worker.persisted_state.best_beefy_block(), 2); poll_fn(move |cx| { match best_block_stream.poll_next_unpin(cx) { // expect Some(hash-of-block-2) @@ -1528,7 +1527,7 @@ pub(crate) mod tests { let keys = &[Keyring::Alice, Keyring::Bob]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let mut net = BeefyTestNet::new(1); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); + let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); let worker_rounds = worker.active_rounds().unwrap(); assert_eq!(worker_rounds.session_start(), 1); @@ -1554,77 +1553,6 @@ pub(crate) mod tests { assert_eq!(rounds.validator_set_id(), new_validator_set.id()); } - #[tokio::test] - async fn should_triage_votes_and_process_later() { - let keys = &[Keyring::Alice, Keyring::Bob]; - let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut net = BeefyTestNet::new(1); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); - // remove default session, will manually add custom one. - worker.persisted_state.voting_oracle.sessions.clear(); - - fn new_vote( - block_number: NumberFor, - ) -> VoteMessage, AuthorityId, Signature> { - let commitment = Commitment { - payload: Payload::from_single_entry(*b"BF", vec![]), - block_number, - validator_set_id: 0, - }; - VoteMessage { - commitment, - id: Keyring::Alice.public(), - signature: Keyring::Alice.sign(b"I am committed"), - } - } - - // best grandpa is 20 - let best_grandpa_header = Header::new( - 20u32.into(), - Default::default(), - Default::default(), - Default::default(), - Digest::default(), - ); - - worker - .persisted_state - .voting_oracle - .add_session(Rounds::new(10, validator_set.clone())); - worker.persisted_state.best_grandpa_block_header = best_grandpa_header; - - // triage votes for blocks 10..13 - worker.triage_incoming_vote(new_vote(10)).unwrap(); - worker.triage_incoming_vote(new_vote(11)).unwrap(); - worker.triage_incoming_vote(new_vote(12)).unwrap(); - // triage votes for blocks 20..23 - worker.triage_incoming_vote(new_vote(20)).unwrap(); - worker.triage_incoming_vote(new_vote(21)).unwrap(); - worker.triage_incoming_vote(new_vote(22)).unwrap(); - - // vote for 10 should have been handled, while the rest buffered for later processing - let mut votes = worker.pending_votes.values(); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 11); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 12); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 20); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22); - assert!(votes.next().is_none()); - - // simulate mandatory done, and retry buffered votes - worker - .persisted_state - .voting_oracle - .active_rounds_mut() - .unwrap() - .test_set_mandatory_done(true); - worker.try_pending_justif_and_votes().unwrap(); - // all blocks <= grandpa finalized should have been handled, rest still buffered - let mut votes = worker.pending_votes.values(); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21); - assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22); - } - #[tokio::test] async fn should_not_report_bad_old_or_self_equivocations() { let block_num = 1; @@ -1637,7 +1565,7 @@ pub(crate) mod tests { let api_alice = Arc::new(api_alice); let mut net = BeefyTestNet::new(1); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1, validator_set.clone()); + let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); worker.runtime = api_alice.clone(); // let there be a block with num = 1: