beefy: initialize voter from genesis and fix initial sync (#11959)

* client/beefy: use backend instead of client where possible

* client/beefy: initialize voter from genesis

Now that we have justifications import, we can drop the "lean beefy"
behaviour and start building justifications chain from Genesis with
containing all past sessions' mandatory blocks justifications.

* client/beefy: walk finality tree_route to catch session changes

* client/beefy: fix block import

During initial block import blocks are not finalized, so trying to
validate and append justifications within block import fails (for
initial network sync imported blocks).

Changes:

- Move justification validation to _after_ `inner.block_import()`,
  so block is imported in backend and runtime api can be called to
  get the BEEFY authorities for said block.
- Move append-to-backend for imported BEEFY justification to voter,
  because it already has the required logic to BEEFY-finalize blocks
  only after GRANDPA finalized them.
- Mark voting rounds as concluded when finalizing through
  imported justifications as well as when finalizing through voting.

* client/beefy: valid justifications are one per block number

The only way we'd get _different_ _validated_ justifications for same
block number is if authorities are double voting, which will be handled
later.

* client/beefy: process incoming justifs during major sync

* client/beefy: correct voter initialization

BEEFY voter should resume voting from either:
  - last BEEFY finalized block,
  - session start,
whichever is closest to head.

* client/beefy: test voter initialization

* client/beefy: impl review suggestions

Signed-off-by: acatangiu <adrian@parity.io>
This commit is contained in:
Adrian Catangiu
2022-09-05 13:47:15 +03:00
committed by GitHub
parent 1fabf067d0
commit 84acfd5f50
4 changed files with 342 additions and 147 deletions
+20 -55
View File
@@ -17,12 +17,11 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use beefy_primitives::{BeefyApi, BEEFY_ENGINE_ID}; use beefy_primitives::{BeefyApi, BEEFY_ENGINE_ID};
use codec::Encode; use log::debug;
use log::error;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use sp_api::{ProvideRuntimeApi, TransactionFor}; use sp_api::{ProvideRuntimeApi, TransactionFor};
use sp_blockchain::{well_known_cache_keys, HeaderBackend}; use sp_blockchain::well_known_cache_keys;
use sp_consensus::Error as ConsensusError; use sp_consensus::Error as ConsensusError;
use sp_runtime::{ use sp_runtime::{
generic::BlockId, generic::BlockId,
@@ -97,29 +96,6 @@ where
decode_and_verify_finality_proof::<Block>(&encoded[..], number, &validator_set) decode_and_verify_finality_proof::<Block>(&encoded[..], number, &validator_set)
} }
/// Import BEEFY justification: Send it to worker for processing and also append it to backend.
///
/// This function assumes:
/// - `justification` is verified and valid,
/// - the block referred by `justification` has been imported _and_ finalized.
fn import_beefy_justification_unchecked(
&self,
number: NumberFor<Block>,
justification: BeefyVersionedFinalityProof<Block>,
) {
// Append the justification to the block in the backend.
if let Err(e) = self.backend.append_justification(
BlockId::Number(number),
(BEEFY_ENGINE_ID, justification.encode()),
) {
error!(target: "beefy", "🥩 Error {:?} on appending justification: {:?}", e, justification);
}
// Send the justification to the BEEFY voter for processing.
self.justification_sender
.notify(|| Ok::<_, ()>(justification))
.expect("forwards closure result; the closure always returns Ok; qed.");
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@@ -147,42 +123,31 @@ where
let hash = block.post_hash(); let hash = block.post_hash();
let number = *block.header.number(); let number = *block.header.number();
let beefy_proof = block let beefy_encoded = block.justifications.as_mut().and_then(|just| {
.justifications let encoded = just.get(BEEFY_ENGINE_ID).cloned();
.as_mut() // Remove BEEFY justification from the list before giving to `inner`; we send it to the
.and_then(|just| { // voter (beefy-gadget) and it will append it to the backend after block is finalized.
let decoded = just just.remove(BEEFY_ENGINE_ID);
.get(BEEFY_ENGINE_ID) encoded
.map(|encoded| self.decode_and_verify(encoded, number, hash)); });
// Remove BEEFY justification from the list before giving to `inner`;
// we will append it to backend ourselves at the end if all goes well.
just.remove(BEEFY_ENGINE_ID);
decoded
})
.transpose()
.unwrap_or(None);
// Run inner block import. // Run inner block import.
let inner_import_result = self.inner.import_block(block, new_cache).await?; let inner_import_result = self.inner.import_block(block, new_cache).await?;
match (beefy_proof, &inner_import_result) { match (beefy_encoded, &inner_import_result) {
(Some(proof), ImportResult::Imported(_)) => { (Some(encoded), ImportResult::Imported(_)) => {
let status = self.backend.blockchain().info(); if let Ok(proof) = self.decode_and_verify(&encoded, number, hash) {
if number <= status.finalized_number &&
Some(hash) ==
self.backend
.blockchain()
.hash(number)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
// The proof is valid and the block is imported and final, we can import. // The proof is valid and the block is imported and final, we can import.
self.import_beefy_justification_unchecked(number, proof); debug!(target: "beefy", "🥩 import justif {:?} for block number {:?}.", proof, number);
// Send the justification to the BEEFY voter for processing.
self.justification_sender
.notify(|| Ok::<_, ()>(proof))
.expect("forwards closure result; the closure always returns Ok; qed.");
} else { } else {
error!( debug!(
target: "beefy", target: "beefy",
"🥩 Cannot import justification: {:?} for, not yet final, block number {:?}", "🥩 error decoding justification: {:?} for imported block {:?}",
proof, encoded, number,
number,
); );
} }
}, },
+18 -10
View File
@@ -147,13 +147,8 @@ where
trace!(target: "beefy", "🥩 Round #{} done: {}", round.1, done); trace!(target: "beefy", "🥩 Round #{} done: {}", round.1, done);
if done { if done {
// remove this and older (now stale) rounds
let signatures = self.rounds.remove(round)?.votes; let signatures = self.rounds.remove(round)?.votes;
self.rounds.retain(|&(_, number), _| number > round.1); self.conclude(round.1);
self.mandatory_done = self.mandatory_done || round.1 == self.session_start;
self.best_done = self.best_done.max(Some(round.1));
debug!(target: "beefy", "🥩 Concluded round #{}", round.1);
Some( Some(
self.validators() self.validators()
.iter() .iter()
@@ -165,9 +160,12 @@ where
} }
} }
#[cfg(test)] pub(crate) fn conclude(&mut self, round_num: NumberFor<B>) {
pub(crate) fn test_set_mandatory_done(&mut self, done: bool) { // Remove this and older (now stale) rounds.
self.mandatory_done = done; self.rounds.retain(|&(_, number), _| number > round_num);
self.mandatory_done = self.mandatory_done || round_num == self.session_start;
self.best_done = self.best_done.max(Some(round_num));
debug!(target: "beefy", "🥩 Concluded round #{}", round_num);
} }
} }
@@ -178,9 +176,19 @@ mod tests {
use beefy_primitives::{crypto::Public, ValidatorSet}; use beefy_primitives::{crypto::Public, ValidatorSet};
use super::{threshold, RoundTracker, Rounds}; use super::{threshold, Block as BlockT, Hash, RoundTracker, Rounds};
use crate::keystore::tests::Keyring; use crate::keystore::tests::Keyring;
impl<P, B> Rounds<P, B>
where
P: Ord + Hash + Clone,
B: BlockT,
{
pub(crate) fn test_set_mandatory_done(&mut self, done: bool) {
self.mandatory_done = done;
}
}
#[test] #[test]
fn round_tracker() { fn round_tracker() {
let mut rt = RoundTracker::default(); let mut rt = RoundTracker::default();
+5 -7
View File
@@ -145,7 +145,7 @@ impl BeefyTestNet {
}) })
} }
pub(crate) fn generate_blocks( pub(crate) fn generate_blocks_and_sync(
&mut self, &mut self,
count: usize, count: usize,
session_length: u64, session_length: u64,
@@ -168,6 +168,7 @@ impl BeefyTestNet {
block block
}); });
self.block_until_sync();
} }
} }
@@ -528,8 +529,7 @@ fn beefy_finalizing_blocks() {
runtime.spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); runtime.spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta));
// push 42 blocks including `AuthorityChange` digests every 10 blocks. // push 42 blocks including `AuthorityChange` digests every 10 blocks.
net.generate_blocks(42, session_len, &validator_set, true); net.generate_blocks_and_sync(42, session_len, &validator_set, true);
net.block_until_sync();
let net = Arc::new(Mutex::new(net)); let net = Arc::new(Mutex::new(net));
@@ -567,8 +567,7 @@ fn lagging_validators() {
runtime.spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); runtime.spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta));
// push 62 blocks including `AuthorityChange` digests every 30 blocks. // push 62 blocks including `AuthorityChange` digests every 30 blocks.
net.generate_blocks(62, session_len, &validator_set, true); net.generate_blocks_and_sync(62, session_len, &validator_set, true);
net.block_until_sync();
let net = Arc::new(Mutex::new(net)); let net = Arc::new(Mutex::new(net));
@@ -644,8 +643,7 @@ fn correct_beefy_payload() {
runtime.spawn(initialize_beefy(&mut net, bad_peers, min_block_delta)); runtime.spawn(initialize_beefy(&mut net, bad_peers, min_block_delta));
// push 10 blocks // push 10 blocks
net.generate_blocks(12, session_len, &validator_set, false); net.generate_blocks_and_sync(12, session_len, &validator_set, false);
net.block_until_sync();
let net = Arc::new(Mutex::new(net)); let net = Arc::new(Mutex::new(net));
// with 3 good voters and 1 bad one, consensus should happen and best blocks produced. // with 3 good voters and 1 bad one, consensus should happen and best blocks produced.
+299 -75
View File
@@ -27,11 +27,12 @@ use codec::{Codec, Decode, Encode};
use futures::StreamExt; use futures::StreamExt;
use log::{debug, error, info, log_enabled, trace, warn}; use log::{debug, error, info, log_enabled, trace, warn};
use sc_client_api::{Backend, FinalityNotification}; use sc_client_api::{Backend, FinalityNotification, HeaderBackend};
use sc_network_gossip::GossipEngine; use sc_network_gossip::GossipEngine;
use sp_api::{BlockId, ProvideRuntimeApi}; use sp_api::{BlockId, ProvideRuntimeApi};
use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
use sp_blockchain::Backend as BlockchainBackend;
use sp_consensus::SyncOracle; use sp_consensus::SyncOracle;
use sp_mmr_primitives::MmrApi; use sp_mmr_primitives::MmrApi;
use sp_runtime::{ use sp_runtime::{
@@ -212,7 +213,7 @@ pub(crate) struct BeefyWorker<B: Block, BE, C, R, SO> {
/// Buffer holding votes for future processing. /// Buffer holding votes for future processing.
pending_votes: BTreeMap<NumberFor<B>, Vec<VoteMessage<NumberFor<B>, AuthorityId, Signature>>>, pending_votes: BTreeMap<NumberFor<B>, Vec<VoteMessage<NumberFor<B>, AuthorityId, Signature>>>,
/// Buffer holding justifications for future processing. /// Buffer holding justifications for future processing.
pending_justifications: BTreeMap<NumberFor<B>, Vec<BeefyVersionedFinalityProof<B>>>, pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
/// Chooses which incoming votes to accept and which votes to generate. /// Chooses which incoming votes to accept and which votes to generate.
voting_oracle: VoterOracle<B>, voting_oracle: VoterOracle<B>,
} }
@@ -246,8 +247,9 @@ where
min_block_delta, min_block_delta,
} = worker_params; } = worker_params;
let last_finalized_header = client let last_finalized_header = backend
.expect_header(BlockId::number(client.info().finalized_number)) .blockchain()
.expect_header(BlockId::number(backend.blockchain().info().finalized_number))
.expect("latest block always has header available; qed."); .expect("latest block always has header available; qed.");
BeefyWorker { BeefyWorker {
@@ -313,9 +315,8 @@ where
new_session_start: NumberFor<B>, new_session_start: NumberFor<B>,
) { ) {
debug!(target: "beefy", "🥩 New active validator set: {:?}", validator_set); debug!(target: "beefy", "🥩 New active validator set: {:?}", validator_set);
metric_set!(self, beefy_validator_set_id, validator_set.id());
// BEEFY should produce the mandatory block of each session. // BEEFY should finalize a mandatory block during each session.
if let Some(active_session) = self.voting_oracle.rounds_mut() { if let Some(active_session) = self.voting_oracle.rounds_mut() {
if !active_session.mandatory_done() { if !active_session.mandatory_done() {
debug!( debug!(
@@ -334,7 +335,12 @@ where
let id = validator_set.id(); let id = validator_set.id();
self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set)); self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set));
info!(target: "beefy", "🥩 New Rounds for validator set id: {:?} with session_start {:?}", id, new_session_start); metric_set!(self, beefy_validator_set_id, id);
info!(
target: "beefy",
"🥩 New Rounds for validator set id: {:?} with session_start {:?}",
id, new_session_start
);
} }
fn handle_finality_notification(&mut self, notification: &FinalityNotification<B>) { fn handle_finality_notification(&mut self, notification: &FinalityNotification<B>) {
@@ -345,11 +351,24 @@ where
// update best GRANDPA finalized block we have seen // update best GRANDPA finalized block we have seen
self.best_grandpa_block_header = header.clone(); self.best_grandpa_block_header = header.clone();
// Check for and enqueue potential new session. // Check all (newly) finalized blocks for new session(s).
if let Some(new_validator_set) = find_authorities_change::<B>(header) { let backend = self.backend.clone();
self.init_session_at(new_validator_set, *header.number()); for header in notification
// TODO: when adding SYNC protocol, fire up a request for justification for this .tree_route
// mandatory block here. .iter()
.map(|hash| {
backend
.blockchain()
.expect_header(BlockId::hash(*hash))
.expect("just finalized block should be available; qed.")
})
.chain(std::iter::once(header.clone()))
{
if let Some(new_validator_set) = find_authorities_change::<B>(&header) {
self.init_session_at(new_validator_set, *header.number());
// TODO (grandpa-bridge-gadget/issues/20): when adding SYNC protocol,
// fire up a request for justification for this mandatory block here.
}
} }
} }
} }
@@ -389,10 +408,10 @@ where
let block_num = signed_commitment.commitment.block_number; let block_num = signed_commitment.commitment.block_number;
let best_grandpa = *self.best_grandpa_block_header.number(); let best_grandpa = *self.best_grandpa_block_header.number();
match self.voting_oracle.triage_round(block_num, best_grandpa)? { match self.voting_oracle.triage_round(block_num, best_grandpa)? {
RoundAction::Process => self.finalize(justification), RoundAction::Process => self.finalize(justification)?,
RoundAction::Enqueue => { RoundAction::Enqueue => {
debug!(target: "beefy", "🥩 Buffer justification for round: {:?}.", block_num); debug!(target: "beefy", "🥩 Buffer justification for round: {:?}.", block_num);
self.pending_justifications.entry(block_num).or_default().push(justification) self.pending_justifications.entry(block_num).or_insert(justification);
}, },
RoundAction::Drop => (), RoundAction::Drop => (),
}; };
@@ -427,15 +446,8 @@ where
info!(target: "beefy", "🥩 Round #{} concluded, finality_proof: {:?}.", round.1, finality_proof); info!(target: "beefy", "🥩 Round #{} concluded, finality_proof: {:?}.", round.1, finality_proof);
if let Err(e) = self.backend.append_justification(
BlockId::Number(block_num),
(BEEFY_ENGINE_ID, finality_proof.clone().encode()),
) {
debug!(target: "beefy", "🥩 Error {:?} on appending justification: {:?}", e, finality_proof);
}
// We created the `finality_proof` and know to be valid. // We created the `finality_proof` and know to be valid.
self.finalize(finality_proof); self.finalize(finality_proof)?;
} }
} }
Ok(()) Ok(())
@@ -447,19 +459,29 @@ where
/// 3. Send best block hash and `finality_proof` to RPC worker. /// 3. Send best block hash and `finality_proof` to RPC worker.
/// ///
/// Expects `finality proof` to be valid. /// Expects `finality proof` to be valid.
fn finalize(&mut self, finality_proof: BeefyVersionedFinalityProof<B>) { fn finalize(&mut self, finality_proof: BeefyVersionedFinalityProof<B>) -> Result<(), Error> {
let block_num = match finality_proof {
VersionedFinalityProof::V1(ref sc) => sc.commitment.block_number,
};
// Conclude voting round for this block.
self.voting_oracle.rounds_mut().ok_or(Error::UninitSession)?.conclude(block_num);
// Prune any now "finalized" sessions from queue. // Prune any now "finalized" sessions from queue.
self.voting_oracle.try_prune(); self.voting_oracle.try_prune();
let signed_commitment = match finality_proof {
VersionedFinalityProof::V1(ref sc) => sc,
};
let block_num = signed_commitment.commitment.block_number;
if Some(block_num) > self.best_beefy_block { if Some(block_num) > self.best_beefy_block {
// Set new best BEEFY block number. // Set new best BEEFY block number.
self.best_beefy_block = Some(block_num); self.best_beefy_block = Some(block_num);
metric_set!(self, beefy_best_block, block_num); metric_set!(self, beefy_best_block, block_num);
self.client.hash(block_num).ok().flatten().map(|hash| { if let Err(e) = self.backend.append_justification(
BlockId::Number(block_num),
(BEEFY_ENGINE_ID, finality_proof.clone().encode()),
) {
error!(target: "beefy", "🥩 Error {:?} on appending justification: {:?}", e, finality_proof);
}
self.backend.blockchain().hash(block_num).ok().flatten().map(|hash| {
self.links self.links
.to_rpc_best_block_sender .to_rpc_best_block_sender
.notify(|| Ok::<_, ()>(hash)) .notify(|| Ok::<_, ()>(hash))
@@ -473,6 +495,7 @@ where
} else { } else {
debug!(target: "beefy", "🥩 Can't set best beefy to older: {}", block_num); debug!(target: "beefy", "🥩 Can't set best beefy to older: {}", block_num);
} }
Ok(())
} }
/// Handle previously buffered justifications and votes that now land in the voting interval. /// Handle previously buffered justifications and votes that now land in the voting interval.
@@ -481,10 +504,10 @@ where
let _ph = PhantomData::<B>::default(); let _ph = PhantomData::<B>::default();
fn to_process_for<B: Block, T>( fn to_process_for<B: Block, T>(
pending: &mut BTreeMap<NumberFor<B>, Vec<T>>, pending: &mut BTreeMap<NumberFor<B>, T>,
(start, end): (NumberFor<B>, NumberFor<B>), (start, end): (NumberFor<B>, NumberFor<B>),
_: PhantomData<B>, _: PhantomData<B>,
) -> BTreeMap<NumberFor<B>, Vec<T>> { ) -> BTreeMap<NumberFor<B>, T> {
// These are still pending. // These are still pending.
let still_pending = pending.split_off(&end.saturating_add(1u32.into())); let still_pending = pending.split_off(&end.saturating_add(1u32.into()));
// These can be processed. // These can be processed.
@@ -494,21 +517,23 @@ where
// Return ones to process. // Return ones to process.
to_handle to_handle
} }
// Interval of blocks for which we can process justifications and votes right now.
let mut interval = self.voting_oracle.accepted_interval(best_grandpa)?;
// Process pending justifications. // Process pending justifications.
let interval = self.voting_oracle.accepted_interval(best_grandpa)?;
if !self.pending_justifications.is_empty() { if !self.pending_justifications.is_empty() {
let justifs_to_handle = to_process_for(&mut self.pending_justifications, interval, _ph); let justifs_to_handle = to_process_for(&mut self.pending_justifications, interval, _ph);
for (num, justifications) in justifs_to_handle.into_iter() { for (num, justification) in justifs_to_handle.into_iter() {
debug!(target: "beefy", "🥩 Handle buffered justifications for: {:?}.", num); debug!(target: "beefy", "🥩 Handle buffered justification for: {:?}.", num);
for justif in justifications.into_iter() { if let Err(err) = self.finalize(justification) {
self.finalize(justif); error!(target: "beefy", "🥩 Error finalizing block: {}", err);
} }
} }
// Possibly new interval after processing justifications.
interval = self.voting_oracle.accepted_interval(best_grandpa)?;
} }
// Process pending votes. // Process pending votes.
let interval = self.voting_oracle.accepted_interval(best_grandpa)?;
if !self.pending_votes.is_empty() { if !self.pending_votes.is_empty() {
let votes_to_handle = to_process_for(&mut self.pending_votes, interval, _ph); let votes_to_handle = to_process_for(&mut self.pending_votes, interval, _ph);
for (num, votes) in votes_to_handle.into_iter() { for (num, votes) in votes_to_handle.into_iter() {
@@ -547,17 +572,20 @@ where
debug!(target: "beefy", "🥩 Try voting on {}", target_number); debug!(target: "beefy", "🥩 Try voting on {}", target_number);
// Most of the time we get here, `target` is actually `best_grandpa`, // Most of the time we get here, `target` is actually `best_grandpa`,
// avoid asking `client` for header in that case. // avoid getting header from backend in that case.
let target_header = if target_number == *self.best_grandpa_block_header.number() { let target_header = if target_number == *self.best_grandpa_block_header.number() {
self.best_grandpa_block_header.clone() self.best_grandpa_block_header.clone()
} else { } else {
self.client.expect_header(BlockId::Number(target_number)).map_err(|err| { self.backend
let err_msg = format!( .blockchain()
"Couldn't get header for block #{:?} (error: {:?}), skipping vote..", .expect_header(BlockId::Number(target_number))
target_number, err .map_err(|err| {
); let err_msg = format!(
Error::Backend(err_msg) "Couldn't get header for block #{:?} (error: {:?}), skipping vote..",
})? target_number, err
);
Error::Backend(err_msg)
})?
}; };
let target_hash = target_header.hash(); let target_hash = target_header.hash();
@@ -623,7 +651,78 @@ where
Ok(()) Ok(())
} }
/// Initialize BEEFY voter state.
///
/// Should be called only once during worker initialization with latest GRANDPA finalized
/// `header` and the validator set `active` at that point.
fn initialize_voter(&mut self, header: &B::Header, active: ValidatorSet<AuthorityId>) {
// just a sanity check.
if let Some(rounds) = self.voting_oracle.rounds_mut() {
error!(
target: "beefy",
"🥩 Voting session already initialized at: {:?}, validator set id {}.",
rounds.session_start(),
rounds.validator_set_id(),
);
return
}
self.best_grandpa_block_header = header.clone();
if active.id() == GENESIS_AUTHORITY_SET_ID {
// When starting from genesis, there is no session boundary digest.
// Just initialize `rounds` to Block #1 as BEEFY mandatory block.
info!(target: "beefy", "🥩 Initialize voting session at genesis, block 1.");
self.init_session_at(active, 1u32.into());
} else {
// TODO (issue #11837): persist local progress to avoid following look-up during init.
let blockchain = self.backend.blockchain();
let mut header = header.clone();
// Walk back the imported blocks and initialize voter either, at the last block with
// a BEEFY justification, or at this session's boundary; voter will resume from there.
loop {
if let Some(true) = blockchain
.justifications(BlockId::hash(header.hash()))
.ok()
.flatten()
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
{
info!(
target: "beefy",
"🥩 Initialize voting session at last BEEFY finalized block: {:?}.",
*header.number()
);
self.init_session_at(active, *header.number());
// Mark the round as already finalized.
if let Some(round) = self.voting_oracle.rounds_mut() {
round.conclude(*header.number());
}
self.best_beefy_block = Some(*header.number());
break
}
if let Some(validator_set) = find_authorities_change::<B>(&header) {
info!(
target: "beefy",
"🥩 Initialize voting session at current session boundary: {:?}.",
*header.number()
);
self.init_session_at(validator_set, *header.number());
break
}
// Move up the chain.
header = self
.client
.expect_header(BlockId::Hash(*header.parent_hash()))
// in case of db failure here we want to kill the worker
.expect("db failure, voter going down.");
}
}
}
/// Wait for BEEFY runtime pallet to be available. /// Wait for BEEFY runtime pallet to be available.
/// Should be called only once during worker initialization.
async fn wait_for_runtime_pallet(&mut self) { async fn wait_for_runtime_pallet(&mut self) {
let mut gossip_engine = &mut self.gossip_engine; let mut gossip_engine = &mut self.gossip_engine;
let mut finality_stream = self.client.finality_notification_stream().fuse(); let mut finality_stream = self.client.finality_notification_stream().fuse();
@@ -635,25 +734,19 @@ where
None => break None => break
}; };
let at = BlockId::hash(notif.header.hash()); let at = BlockId::hash(notif.header.hash());
if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() { if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() {
if active.id() == GENESIS_AUTHORITY_SET_ID { self.initialize_voter(&notif.header, active);
// When starting from genesis, there is no session boundary digest. if !self.sync_oracle.is_major_syncing() {
// Just initialize `rounds` to Block #1 as BEEFY mandatory block. if let Err(err) = self.try_to_vote() {
self.init_session_at(active, 1u32.into()); debug!(target: "beefy", "🥩 {}", err);
}
}
// Beefy pallet available and voter initialized.
break
} else {
trace!(target: "beefy", "🥩 Finality notification: {:?}", notif);
debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available...");
} }
// In all other cases, we just go without `rounds` initialized, meaning the
// worker won't vote until it witnesses a session change.
// Once we'll implement 'initial sync' (catch-up), the worker will be able to
// start voting right away.
self.handle_finality_notification(&notif);
if let Err(err) = self.try_to_vote() {
debug!(target: "beefy", "🥩 {}", err);
}
break
} else {
trace!(target: "beefy", "🥩 Finality notification: {:?}", notif);
debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available...");
}
}, },
_ = gossip_engine => { _ = gossip_engine => {
break break
@@ -668,7 +761,10 @@ where
/// which is driven by finality notifications and gossiped votes. /// which is driven by finality notifications and gossiped votes.
pub(crate) async fn run(mut self) { pub(crate) async fn run(mut self) {
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number()); info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number());
let mut block_import_justif = self.links.from_block_import_justif_stream.subscribe().fuse();
self.wait_for_runtime_pallet().await; self.wait_for_runtime_pallet().await;
trace!(target: "beefy", "🥩 BEEFY pallet available, starting voter.");
let mut finality_notifications = self.client.finality_notification_stream().fuse(); let mut finality_notifications = self.client.finality_notification_stream().fuse();
let mut votes = Box::pin( let mut votes = Box::pin(
@@ -684,7 +780,6 @@ where
}) })
.fuse(), .fuse(),
); );
let mut block_import_justif = self.links.from_block_import_justif_stream.subscribe().fuse();
loop { loop {
let mut gossip_engine = &mut self.gossip_engine; let mut gossip_engine = &mut self.gossip_engine;
@@ -728,17 +823,19 @@ where
} }
} }
// Don't bother acting on 'state' changes during major sync. // Handle pending justifications and/or votes for now GRANDPA finalized blocks.
if !self.sync_oracle.is_major_syncing() { if let Err(err) = self.try_pending_justif_and_votes() {
// Handle pending justifications and/or votes for now GRANDPA finalized blocks. debug!(target: "beefy", "🥩 {}", err);
if let Err(err) = self.try_pending_justif_and_votes() { }
debug!(target: "beefy", "🥩 {}", err);
}
// Don't bother voting during major sync.
if !self.sync_oracle.is_major_syncing() {
// There were external events, 'state' is changed, author a vote if needed/possible. // There were external events, 'state' is changed, author a vote if needed/possible.
if let Err(err) = self.try_to_vote() { if let Err(err) = self.try_to_vote() {
debug!(target: "beefy", "🥩 {}", err); debug!(target: "beefy", "🥩 {}", err);
} }
} else {
debug!(target: "beefy", "🥩 Skipping voting while major syncing.");
} }
} }
} }
@@ -845,13 +942,14 @@ pub(crate) mod tests {
use futures::{executor::block_on, future::poll_fn, task::Poll}; use futures::{executor::block_on, future::poll_fn, task::Poll};
use sc_client_api::HeaderBackend; use sc_client_api::{Backend as BackendT, HeaderBackend};
use sc_network::NetworkService; use sc_network::NetworkService;
use sc_network_test::{PeersFullClient, TestNetFactory}; use sc_network_test::{PeersFullClient, TestNetFactory};
use sp_api::HeaderT; use sp_api::HeaderT;
use sp_blockchain::Backend as BlockchainBackendT;
use substrate_test_runtime_client::{ use substrate_test_runtime_client::{
runtime::{Block, Digest, DigestItem, Header, H256}, runtime::{Block, Digest, DigestItem, Header, H256},
Backend, Backend, ClientExt,
}; };
fn create_beefy_worker( fn create_beefy_worker(
@@ -1166,10 +1264,11 @@ pub(crate) mod tests {
} }
#[test] #[test]
fn test_finalize() { fn should_finalize_correctly() {
let keys = &[Keyring::Alice]; let keys = &[Keyring::Alice];
let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
let mut net = BeefyTestNet::new(1, 0); let mut net = BeefyTestNet::new(1, 0);
let backend = net.peer(0).client().as_backend();
let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1);
let (mut best_block_streams, mut finality_proofs) = get_beefy_streams(&mut net, keys); let (mut best_block_streams, mut finality_proofs) = get_beefy_streams(&mut net, keys);
@@ -1198,10 +1297,16 @@ pub(crate) mod tests {
let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); let mut best_block_stream = best_block_streams.drain(..).next().unwrap();
let mut finality_proof = finality_proofs.drain(..).next().unwrap(); let mut finality_proof = finality_proofs.drain(..).next().unwrap();
let justif = create_finality_proof(1); let justif = create_finality_proof(1);
worker.finalize(justif.clone()); // create new session at block #1
worker.voting_oracle.add_session(Rounds::new(1, validator_set.clone()));
// try to finalize block #1
worker.finalize(justif.clone()).unwrap();
// verify block finalized
assert_eq!(worker.best_beefy_block, Some(1)); assert_eq!(worker.best_beefy_block, Some(1));
block_on(poll_fn(move |cx| { block_on(poll_fn(move |cx| {
// unknown hash -> nothing streamed
assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending);
// commitment streamed
match finality_proof.poll_next_unpin(cx) { match finality_proof.poll_next_unpin(cx) {
// expect justification // expect justification
Poll::Ready(Some(received)) => assert_eq!(received, justif), Poll::Ready(Some(received)) => assert_eq!(received, justif),
@@ -1213,10 +1318,20 @@ pub(crate) mod tests {
// generate 2 blocks, try again expect success // generate 2 blocks, try again expect success
let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys);
let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); let mut best_block_stream = best_block_streams.drain(..).next().unwrap();
net.generate_blocks(2, 10, &validator_set, false); net.peer(0).push_blocks(2, false);
// finalize 1 and 2 without justifications
backend.finalize_block(BlockId::number(1), None).unwrap();
backend.finalize_block(BlockId::number(2), None).unwrap();
let justif = create_finality_proof(2); let justif = create_finality_proof(2);
worker.finalize(justif); // create new session at block #2
worker.voting_oracle.add_session(Rounds::new(2, validator_set));
worker.finalize(justif).unwrap();
// verify old session pruned
assert_eq!(worker.voting_oracle.sessions.len(), 1);
// new session starting at #2 is in front
assert_eq!(worker.voting_oracle.rounds_mut().unwrap().session_start(), 2);
// verify block finalized
assert_eq!(worker.best_beefy_block, Some(2)); assert_eq!(worker.best_beefy_block, Some(2));
block_on(poll_fn(move |cx| { block_on(poll_fn(move |cx| {
match best_block_stream.poll_next_unpin(cx) { match best_block_stream.poll_next_unpin(cx) {
@@ -1229,6 +1344,10 @@ pub(crate) mod tests {
} }
Poll::Ready(()) Poll::Ready(())
})); }));
// check BEEFY justifications are also appended to backend
let justifs = backend.blockchain().justifications(BlockId::number(2)).unwrap().unwrap();
assert!(justifs.get(BEEFY_ENGINE_ID).is_some())
} }
#[test] #[test]
@@ -1325,4 +1444,109 @@ pub(crate) mod tests {
assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21); assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21);
assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22); assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22);
} }
#[test]
fn should_initialize_correct_voter() {
let keys = &[Keyring::Alice];
let validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap();
let mut net = BeefyTestNet::new(1, 0);
let backend = net.peer(0).client().as_backend();
// push 15 blocks with `AuthorityChange` digests every 10 blocks
net.generate_blocks_and_sync(15, 10, &validator_set, false);
// finalize 13 without justifications
net.peer(0)
.client()
.as_client()
.finalize_block(BlockId::number(13), None)
.unwrap();
// Test initialization at session boundary.
{
let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1);
// initialize voter at block 13, expect rounds initialized at session_start = 10
let header = backend.blockchain().header(BlockId::number(13)).unwrap().unwrap();
worker.initialize_voter(&header, validator_set.clone());
// verify voter initialized with single session starting at block 10
assert_eq!(worker.voting_oracle.sessions.len(), 1);
let rounds = worker.voting_oracle.rounds_mut().unwrap();
assert_eq!(rounds.session_start(), 10);
assert_eq!(rounds.validator_set_id(), validator_set.id());
// verify next vote target is mandatory block 10
assert_eq!(worker.best_beefy_block, None);
assert_eq!(*worker.best_grandpa_block_header.number(), 13);
assert_eq!(worker.voting_oracle.voting_target(worker.best_beefy_block, 13), Some(10));
}
// Test corner-case where session boundary == last beefy finalized.
{
let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1);
// import/append BEEFY justification for session boundary block 10
let commitment = Commitment {
payload: Payload::new(known_payload_ids::MMR_ROOT_ID, vec![]),
block_number: 10,
validator_set_id: validator_set.id(),
};
let justif = VersionedFinalityProof::<_, Signature>::V1(SignedCommitment {
commitment,
signatures: vec![None],
});
backend
.append_justification(BlockId::Number(10), (BEEFY_ENGINE_ID, justif.encode()))
.unwrap();
// initialize voter at block 13, expect rounds initialized at last beefy finalized 10
let header = backend.blockchain().header(BlockId::number(13)).unwrap().unwrap();
worker.initialize_voter(&header, validator_set.clone());
// verify voter initialized with single session starting at block 10
assert_eq!(worker.voting_oracle.sessions.len(), 1);
let rounds = worker.voting_oracle.rounds_mut().unwrap();
assert_eq!(rounds.session_start(), 10);
assert_eq!(rounds.validator_set_id(), validator_set.id());
// verify next vote target is mandatory block 10
assert_eq!(worker.best_beefy_block, Some(10));
assert_eq!(*worker.best_grandpa_block_header.number(), 13);
assert_eq!(worker.voting_oracle.voting_target(worker.best_beefy_block, 13), Some(12));
}
// Test initialization at last BEEFY finalized.
{
let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1);
// import/append BEEFY justification for block 12
let commitment = Commitment {
payload: Payload::new(known_payload_ids::MMR_ROOT_ID, vec![]),
block_number: 12,
validator_set_id: validator_set.id(),
};
let justif = VersionedFinalityProof::<_, Signature>::V1(SignedCommitment {
commitment,
signatures: vec![None],
});
backend
.append_justification(BlockId::Number(12), (BEEFY_ENGINE_ID, justif.encode()))
.unwrap();
// initialize voter at block 13, expect rounds initialized at last beefy finalized 12
let header = backend.blockchain().header(BlockId::number(13)).unwrap().unwrap();
worker.initialize_voter(&header, validator_set.clone());
// verify voter initialized with single session starting at block 12
assert_eq!(worker.voting_oracle.sessions.len(), 1);
let rounds = worker.voting_oracle.rounds_mut().unwrap();
assert_eq!(rounds.session_start(), 12);
assert_eq!(rounds.validator_set_id(), validator_set.id());
// verify next vote target is 13
assert_eq!(worker.best_beefy_block, Some(12));
assert_eq!(*worker.best_grandpa_block_header.number(), 13);
assert_eq!(worker.voting_oracle.voting_target(worker.best_beefy_block, 13), Some(13));
}
}
} }