BEEFY: Support compatibility with Warp Sync - Allow Warp Sync for Validators (#2689)

Resolves https://github.com/paritytech/polkadot-sdk/issues/2627

Initializes voter _after_ headers sync finishes in the background.

This enables the BEEFY gadget to work with `--sync warp` (GRANDPA warp
sync).

Co-authored-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
Serban Iorga
2023-12-27 18:18:33 +01:00
committed by GitHub
parent dcbc36a1c4
commit 5c0b8e0bb5
9 changed files with 152 additions and 72 deletions
+94 -34
View File
@@ -33,7 +33,7 @@ use crate::{
worker::PersistedState,
};
use futures::{stream::Fuse, StreamExt};
use log::{debug, error, info};
use log::{debug, error, info, warn};
use parking_lot::Mutex;
use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
@@ -56,6 +56,7 @@ use std::{
collections::{BTreeMap, VecDeque},
marker::PhantomData,
sync::Arc,
time::Duration,
};
mod aux_schema;
@@ -78,6 +79,8 @@ mod tests;
const LOG_TARGET: &str = "beefy";
const HEADER_SYNC_DELAY: Duration = Duration::from_secs(60);
/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client
/// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as
/// of today, Rust does not allow a type alias to be used as a trait bound. Tracking
@@ -292,21 +295,29 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
// select recoverable errors.
loop {
// Wait for BEEFY pallet to be active before starting voter.
let persisted_state = match wait_for_runtime_pallet(
let (beefy_genesis, best_grandpa) = match wait_for_runtime_pallet(
&*runtime,
&mut beefy_comms.gossip_engine,
&mut finality_notifications,
)
.await
.and_then(|(beefy_genesis, best_grandpa)| {
load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
}) {
{
Ok(res) => res,
Err(e) => {
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
},
};
let persisted_state = match load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
.await
{
Ok(state) => state,
Err(e) => {
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
@@ -357,7 +368,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
}
}
fn load_or_init_voter_state<B, BE, R>(
async fn load_or_init_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
@@ -371,28 +382,70 @@ where
R::Api: BeefyApi<B, AuthorityId>,
{
// Initialize voter state from AUX DB if compatible.
crate::aux_schema::load_persistent(backend)?
if let Some(mut state) = crate::aux_schema::load_persistent(backend)?
// Verify state pallet genesis matches runtime.
.filter(|state| state.pallet_genesis() == beefy_genesis)
.and_then(|mut state| {
// Overwrite persisted state with current best GRANDPA block.
state.set_best_grandpa(best_grandpa.clone());
// Overwrite persisted data with newly provided `min_block_delta`.
state.set_min_block_delta(min_block_delta);
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
Some(Ok(state))
})
// No valid voter-state persisted, re-initialize from pallet genesis.
.unwrap_or_else(|| {
initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta)
})
{
// Overwrite persisted state with current best GRANDPA block.
state.set_best_grandpa(best_grandpa.clone());
// Overwrite persisted data with newly provided `min_block_delta`.
state.set_min_block_delta(min_block_delta);
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
// Make sure that all the headers that we need have been synced.
let mut header = best_grandpa.clone();
while *header.number() > state.best_beefy() {
header =
wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?;
}
return Ok(state);
}
// No valid voter-state persisted, re-initialize from pallet genesis.
initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta).await
}
/// Waits until the parent header of `current` is available and returns it.
///
/// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers.
/// The rest of the headers (gap sync) are lazily downloaded later. But the BEEFY voter also needs
/// the headers in range `[beefy_genesis..=best_grandpa]` to be available. This helper method
/// enables us to wait until these headers have been synced.
async fn wait_for_parent_header<B, BC>(
blockchain: &BC,
current: <B as Block>::Header,
delay: Duration,
) -> ClientResult<<B as Block>::Header>
where
B: Block,
BC: BlockchainBackend<B>,
{
if *current.number() == Zero::zero() {
let msg = format!("header {} is Genesis, there is no parent for it", current.hash());
warn!(target: LOG_TARGET, "{}", msg);
return Err(ClientError::UnknownBlock(msg))
}
loop {
match blockchain.header(*current.parent_hash())? {
Some(parent) => return Ok(parent),
None => {
info!(
target: LOG_TARGET,
"🥩 Parent of header number {} not found. \
BEEFY gadget waiting for header sync to finish ...",
current.number()
);
tokio::time::sleep(delay).await;
},
}
}
}
// If no persisted state present, walk back the chain from first GRANDPA notification to either:
// - latest BEEFY finalized block, or if none found on the way,
// - BEEFY pallet genesis;
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize.
fn initialize_voter_state<B, BE, R>(
async fn initialize_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
@@ -405,6 +458,8 @@ where
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId>,
{
let blockchain = backend.blockchain();
let beefy_genesis = runtime
.runtime_api()
.beefy_genesis(best_grandpa.hash())
@@ -414,7 +469,6 @@ where
.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))?;
// Walk back the imported blocks and initialize voter either, at the last block with
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
let blockchain = backend.blockchain();
let mut sessions = VecDeque::new();
let mut header = best_grandpa.clone();
let state = loop {
@@ -432,7 +486,7 @@ where
let best_beefy = *header.number();
// If no session boundaries detected so far, just initialize new rounds here.
if sessions.is_empty() {
let active_set = expect_validator_set(runtime, backend, &header)?;
let active_set = expect_validator_set(runtime, backend, &header).await?;
let mut rounds = Rounds::new(best_beefy, active_set);
// Mark the round as already finalized.
rounds.conclude(best_beefy);
@@ -451,7 +505,7 @@ where
if *header.number() == beefy_genesis {
// We've reached BEEFY genesis, initialize voter here.
let genesis_set = expect_validator_set(runtime, backend, &header)?;
let genesis_set = expect_validator_set(runtime, backend, &header).await?;
info!(
target: LOG_TARGET,
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
@@ -481,7 +535,7 @@ where
}
// Move up the chain.
header = blockchain.expect_header(*header.parent_hash())?;
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
};
aux_schema::write_current_version(backend)?;
@@ -532,7 +586,12 @@ where
Err(ClientError::Backend(err_msg))
}
fn expect_validator_set<B, BE, R>(
/// Provides validator set active `at_header`. It tries to get it from state, otherwise falls
/// back to walk up the chain looking the validator set enactment in header digests.
///
/// Note: function will `async::sleep()` when walking back the chain if some needed header hasn't
/// been synced yet (as it happens when warp syncing when headers are synced in the background).
async fn expect_validator_set<B, BE, R>(
runtime: &R,
backend: &BE,
at_header: &B::Header,
@@ -550,15 +609,16 @@ where
debug!(target: LOG_TARGET, "🥩 Trying to find validator set active at header: {:?}", at_header);
let mut header = at_header.clone();
loop {
debug!(target: LOG_TARGET, "🥩 Looking for auth set change at block number: {:?}", *header.number());
if let Ok(Some(active)) = runtime.runtime_api().validator_set(header.hash()) {
return Ok(active)
} else {
debug!(target: LOG_TARGET, "🥩 Looking for auth set change at block number: {:?}", *header.number());
match worker::find_authorities_change::<B>(&header) {
Some(active) => return Ok(active),
// Move up the chain. Ultimately we'll get it from chain genesis state, or error out
// here.
None => header = blockchain.expect_header(*header.parent_hash())?,
// there.
None =>
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?,
}
}
}