mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 18:01:03 +00:00
client/beefy: persist voter state (#12712)
* client/beefy: prepare worker for persisting state * client/beefy: persist voter state * client/beefy: initialize persistent state * client/beefy: try to vote from the very beginning Now that voter is initialized from persistent state, it makes sense that it can attempt voting right away. This also helps the genesis case when we consider block `One` as mandatory. * client/beefy: add tests for voter state db * client/beefy: persist voter state as soon as initialized * client/beefy: make sure min-block-delta is at least 1 * client/beefy: persist state after voting Persist state after handling self vote to avoid double voting in case of voter restarts. * client/beefy: persist state after handling mandatory block vote For mandatory blocks we want to make sure we're not losing votes in case of crashes or restarts, since voter will not make further progress without finalizing them. * frame/beefy: use GENESIS_AUTHORITY_SET_ID on pallet genesis * client/beefy: initialize voter at either genesis or last finalized To guarantee unbroken chain of mandatory blocks justifications, voter will always resume from either last BEEFY-justified block or `pallet-beefy` genesis, whichever is more recent. Initialization walks back the chain from latest GRANDPA finalized block looking for one of the above. Along the way, it also records and enqueues for processing any BEEFY mandatory blocks that have been already GRANDPA finalized but not BEEFY finalized. * client/beefy: decouple voter init from aux db state load * client/beefy: fix voter init tests * remove debug prints * gadget future must be type () * fix init from last justification Signed-off-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
@@ -16,35 +16,6 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use beefy_primitives::{BeefyApi, MmrRootHash, PayloadProvider};
|
||||
use parking_lot::Mutex;
|
||||
use prometheus::Registry;
|
||||
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, Finalizer};
|
||||
use sc_consensus::BlockImport;
|
||||
use sc_network::ProtocolName;
|
||||
use sc_network_common::service::NetworkRequest;
|
||||
use sc_network_gossip::Network as GossipNetwork;
|
||||
use sp_api::{NumberFor, ProvideRuntimeApi};
|
||||
use sp_blockchain::HeaderBackend;
|
||||
use sp_consensus::{Error as ConsensusError, SyncOracle};
|
||||
use sp_keystore::SyncCryptoStorePtr;
|
||||
use sp_mmr_primitives::MmrApi;
|
||||
use sp_runtime::traits::Block;
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
|
||||
mod error;
|
||||
mod keystore;
|
||||
mod metrics;
|
||||
mod round;
|
||||
mod worker;
|
||||
|
||||
pub mod communication;
|
||||
pub mod import;
|
||||
pub mod justification;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use crate::{
|
||||
communication::{
|
||||
notification::{
|
||||
@@ -57,12 +28,53 @@ use crate::{
|
||||
},
|
||||
},
|
||||
import::BeefyBlockImport,
|
||||
round::Rounds,
|
||||
worker::PersistedState,
|
||||
};
|
||||
use beefy_primitives::{
|
||||
crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet, BEEFY_ENGINE_ID,
|
||||
GENESIS_AUTHORITY_SET_ID,
|
||||
};
|
||||
use futures::{stream::Fuse, StreamExt};
|
||||
use log::{debug, error, info};
|
||||
use parking_lot::Mutex;
|
||||
use prometheus::Registry;
|
||||
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
|
||||
use sc_consensus::BlockImport;
|
||||
use sc_network::ProtocolName;
|
||||
use sc_network_common::service::NetworkRequest;
|
||||
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
|
||||
use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi};
|
||||
use sp_blockchain::{
|
||||
Backend as BlockchainBackend, Error as ClientError, HeaderBackend, Result as ClientResult,
|
||||
};
|
||||
use sp_consensus::{Error as ConsensusError, SyncOracle};
|
||||
use sp_keystore::SyncCryptoStorePtr;
|
||||
use sp_mmr_primitives::MmrApi;
|
||||
use sp_runtime::{
|
||||
generic::BlockId,
|
||||
traits::{Block, One, Zero},
|
||||
};
|
||||
use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
|
||||
|
||||
mod aux_schema;
|
||||
mod error;
|
||||
mod keystore;
|
||||
mod metrics;
|
||||
mod round;
|
||||
mod worker;
|
||||
|
||||
pub mod communication;
|
||||
pub mod import;
|
||||
pub mod justification;
|
||||
|
||||
pub use communication::beefy_protocol_name::{
|
||||
gossip_protocol_name, justifications_protocol_name as justifs_protocol_name,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client
|
||||
/// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as
|
||||
/// of today, Rust does not allow a type alias to be used as a trait bound. Tracking
|
||||
@@ -222,7 +234,7 @@ where
|
||||
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
|
||||
let gossip_validator =
|
||||
Arc::new(communication::gossip::GossipValidator::new(known_peers.clone()));
|
||||
let gossip_engine = sc_network_gossip::GossipEngine::new(
|
||||
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
|
||||
network.clone(),
|
||||
gossip_protocol_name,
|
||||
gossip_validator.clone(),
|
||||
@@ -240,21 +252,38 @@ where
|
||||
prometheus_registry.as_ref().map(metrics::Metrics::register).and_then(
|
||||
|result| match result {
|
||||
Ok(metrics) => {
|
||||
log::debug!(target: "beefy", "🥩 Registered metrics");
|
||||
debug!(target: "beefy", "🥩 Registered metrics");
|
||||
Some(metrics)
|
||||
},
|
||||
Err(err) => {
|
||||
log::debug!(target: "beefy", "🥩 Failed to register metrics: {:?}", err);
|
||||
debug!(target: "beefy", "🥩 Failed to register metrics: {:?}", err);
|
||||
None
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
// Subscribe to finality notifications and justifications before waiting for runtime pallet and
|
||||
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
|
||||
let mut finality_notifications = client.finality_notification_stream().fuse();
|
||||
let block_import_justif = links.from_block_import_justif_stream.subscribe().fuse();
|
||||
|
||||
// Wait for BEEFY pallet to be active before starting voter.
|
||||
let persisted_state =
|
||||
match wait_for_runtime_pallet(&*runtime, &mut gossip_engine, &mut finality_notifications)
|
||||
.await
|
||||
.and_then(|best_grandpa| {
|
||||
load_or_init_voter_state(&*backend, &*runtime, best_grandpa, min_block_delta)
|
||||
}) {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
error!(target: "beefy", "Error: {:?}. Terminating.", e);
|
||||
return
|
||||
},
|
||||
};
|
||||
|
||||
let worker_params = worker::WorkerParams {
|
||||
client,
|
||||
backend,
|
||||
payload_provider,
|
||||
runtime,
|
||||
network,
|
||||
key_store: key_store.into(),
|
||||
known_peers,
|
||||
@@ -263,10 +292,195 @@ where
|
||||
on_demand_justifications,
|
||||
links,
|
||||
metrics,
|
||||
min_block_delta,
|
||||
persisted_state,
|
||||
};
|
||||
|
||||
let worker = worker::BeefyWorker::<_, _, _, _, _, _>::new(worker_params);
|
||||
let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params);
|
||||
|
||||
futures::future::join(worker.run(), on_demand_justifications_handler.run()).await;
|
||||
futures::future::join(
|
||||
worker.run(block_import_justif, finality_notifications),
|
||||
on_demand_justifications_handler.run(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn load_or_init_voter_state<B, BE, R>(
|
||||
backend: &BE,
|
||||
runtime: &R,
|
||||
best_grandpa: <B as Block>::Header,
|
||||
min_block_delta: u32,
|
||||
) -> ClientResult<PersistedState<B>>
|
||||
where
|
||||
B: Block,
|
||||
BE: Backend<B>,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B>,
|
||||
{
|
||||
// Initialize voter state from AUX DB or from pallet genesis.
|
||||
if let Some(mut state) = crate::aux_schema::load_persistent(backend)? {
|
||||
// Overwrite persisted state with current best GRANDPA block.
|
||||
state.set_best_grandpa(best_grandpa);
|
||||
// Overwrite persisted data with newly provided `min_block_delta`.
|
||||
state.set_min_block_delta(min_block_delta);
|
||||
info!(target: "beefy", "🥩 Loading BEEFY voter state from db: {:?}.", state);
|
||||
Ok(state)
|
||||
} else {
|
||||
initialize_voter_state(backend, runtime, best_grandpa, min_block_delta)
|
||||
}
|
||||
}
|
||||
|
||||
// If no persisted state present, walk back the chain from first GRANDPA notification to either:
|
||||
// - latest BEEFY finalized block, or if none found on the way,
|
||||
// - BEEFY pallet genesis;
|
||||
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize.
|
||||
fn initialize_voter_state<B, BE, R>(
|
||||
backend: &BE,
|
||||
runtime: &R,
|
||||
best_grandpa: <B as Block>::Header,
|
||||
min_block_delta: u32,
|
||||
) -> ClientResult<PersistedState<B>>
|
||||
where
|
||||
B: Block,
|
||||
BE: Backend<B>,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B>,
|
||||
{
|
||||
// Walk back the imported blocks and initialize voter either, at the last block with
|
||||
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
|
||||
let blockchain = backend.blockchain();
|
||||
let mut sessions = VecDeque::new();
|
||||
let mut header = best_grandpa.clone();
|
||||
let state = loop {
|
||||
if let Some(true) = blockchain
|
||||
.justifications(header.hash())
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
|
||||
{
|
||||
info!(
|
||||
target: "beefy",
|
||||
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
|
||||
*header.number()
|
||||
);
|
||||
let best_beefy = *header.number();
|
||||
// If no session boundaries detected so far, just initialize new rounds here.
|
||||
if sessions.is_empty() {
|
||||
let active_set = expect_validator_set(runtime, BlockId::hash(header.hash()))?;
|
||||
let mut rounds = Rounds::new(best_beefy, active_set);
|
||||
// Mark the round as already finalized.
|
||||
rounds.conclude(best_beefy);
|
||||
sessions.push_front(rounds);
|
||||
}
|
||||
let state =
|
||||
PersistedState::checked_new(best_grandpa, best_beefy, sessions, min_block_delta)
|
||||
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
|
||||
break state
|
||||
}
|
||||
|
||||
// Check if we should move up the chain.
|
||||
let parent_hash = *header.parent_hash();
|
||||
if *header.number() == One::one() ||
|
||||
runtime
|
||||
.runtime_api()
|
||||
.validator_set(&BlockId::hash(parent_hash))
|
||||
.ok()
|
||||
.flatten()
|
||||
.is_none()
|
||||
{
|
||||
// We've reached pallet genesis, initialize voter here.
|
||||
let genesis_num = *header.number();
|
||||
let genesis_set = expect_validator_set(runtime, BlockId::hash(header.hash()))
|
||||
.and_then(genesis_set_sanity_check)?;
|
||||
info!(
|
||||
target: "beefy",
|
||||
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
|
||||
Starting voting rounds at block {:?}, genesis validator set {:?}.",
|
||||
genesis_num, genesis_set,
|
||||
);
|
||||
|
||||
sessions.push_front(Rounds::new(genesis_num, genesis_set));
|
||||
break PersistedState::checked_new(best_grandpa, Zero::zero(), sessions, min_block_delta)
|
||||
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
|
||||
}
|
||||
|
||||
if let Some(active) = worker::find_authorities_change::<B>(&header) {
|
||||
info!(target: "beefy", "🥩 Marking block {:?} as BEEFY Mandatory.", *header.number());
|
||||
sessions.push_front(Rounds::new(*header.number(), active));
|
||||
}
|
||||
|
||||
// Move up the chain.
|
||||
header = blockchain.expect_header(BlockId::Hash(parent_hash))?;
|
||||
};
|
||||
|
||||
aux_schema::write_current_version(backend)?;
|
||||
aux_schema::write_voter_state(backend, &state)?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
/// Wait for BEEFY runtime pallet to be available, return active validator set.
|
||||
/// Should be called only once during worker initialization.
|
||||
async fn wait_for_runtime_pallet<B, R>(
|
||||
runtime: &R,
|
||||
mut gossip_engine: &mut GossipEngine<B>,
|
||||
finality: &mut Fuse<FinalityNotifications<B>>,
|
||||
) -> ClientResult<<B as Block>::Header>
|
||||
where
|
||||
B: Block,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B>,
|
||||
{
|
||||
info!(target: "beefy", "🥩 BEEFY gadget waiting for BEEFY pallet to become available...");
|
||||
loop {
|
||||
futures::select! {
|
||||
notif = finality.next() => {
|
||||
let notif = match notif {
|
||||
Some(notif) => notif,
|
||||
None => break
|
||||
};
|
||||
let at = BlockId::hash(notif.header.hash());
|
||||
if let Some(active) = runtime.runtime_api().validator_set(&at).ok().flatten() {
|
||||
// Beefy pallet available, return best grandpa at the time.
|
||||
info!(
|
||||
target: "beefy", "🥩 BEEFY pallet available: block {:?} validator set {:?}",
|
||||
notif.header.number(), active
|
||||
);
|
||||
return Ok(notif.header)
|
||||
}
|
||||
},
|
||||
_ = gossip_engine => {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
let err_msg = "🥩 Gossip engine has unexpectedly terminated.".into();
|
||||
error!(target: "beefy", "{}", err_msg);
|
||||
Err(ClientError::Backend(err_msg))
|
||||
}
|
||||
|
||||
fn genesis_set_sanity_check(
|
||||
active: ValidatorSet<AuthorityId>,
|
||||
) -> ClientResult<ValidatorSet<AuthorityId>> {
|
||||
if active.id() == GENESIS_AUTHORITY_SET_ID {
|
||||
Ok(active)
|
||||
} else {
|
||||
error!(target: "beefy", "🥩 Unexpected ID for genesis validator set {:?}.", active);
|
||||
Err(ClientError::Backend("BEEFY Genesis sanity check failed.".into()))
|
||||
}
|
||||
}
|
||||
|
||||
fn expect_validator_set<B, R>(
|
||||
runtime: &R,
|
||||
at: BlockId<B>,
|
||||
) -> ClientResult<ValidatorSet<AuthorityId>>
|
||||
where
|
||||
B: Block,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B>,
|
||||
{
|
||||
runtime
|
||||
.runtime_api()
|
||||
.validator_set(&at)
|
||||
.ok()
|
||||
.flatten()
|
||||
.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user