mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 12:11:09 +00:00
[BEEFY] Avoid missing voting sessions during node restart (#3074)
Related to https://github.com/paritytech/polkadot-sdk/issues/3003 and https://github.com/paritytech/polkadot-sdk/issues/2842 --------- Co-authored-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
@@ -18,11 +18,10 @@
|
||||
|
||||
//! Schema for BEEFY state persisted in the aux-db.
|
||||
|
||||
use crate::{worker::PersistedState, LOG_TARGET};
|
||||
use crate::{error::Error, worker::PersistedState, LOG_TARGET};
|
||||
use codec::{Decode, Encode};
|
||||
use log::{info, trace};
|
||||
use sc_client_api::{backend::AuxStore, Backend};
|
||||
use sp_blockchain::{Error as ClientError, Result as ClientResult};
|
||||
use sp_runtime::traits::Block as BlockT;
|
||||
|
||||
const VERSION_KEY: &[u8] = b"beefy_auxschema_version";
|
||||
@@ -30,31 +29,33 @@ const WORKER_STATE_KEY: &[u8] = b"beefy_voter_state";
|
||||
|
||||
const CURRENT_VERSION: u32 = 4;
|
||||
|
||||
pub(crate) fn write_current_version<BE: AuxStore>(backend: &BE) -> ClientResult<()> {
|
||||
pub(crate) fn write_current_version<BE: AuxStore>(backend: &BE) -> Result<(), Error> {
|
||||
info!(target: LOG_TARGET, "🥩 write aux schema version {:?}", CURRENT_VERSION);
|
||||
AuxStore::insert_aux(backend, &[(VERSION_KEY, CURRENT_VERSION.encode().as_slice())], &[])
|
||||
.map_err(|e| Error::Backend(e.to_string()))
|
||||
}
|
||||
|
||||
/// Write voter state.
|
||||
pub(crate) fn write_voter_state<B: BlockT, BE: AuxStore>(
|
||||
backend: &BE,
|
||||
state: &PersistedState<B>,
|
||||
) -> ClientResult<()> {
|
||||
) -> Result<(), Error> {
|
||||
trace!(target: LOG_TARGET, "🥩 persisting {:?}", state);
|
||||
AuxStore::insert_aux(backend, &[(WORKER_STATE_KEY, state.encode().as_slice())], &[])
|
||||
.map_err(|e| Error::Backend(e.to_string()))
|
||||
}
|
||||
|
||||
fn load_decode<BE: AuxStore, T: Decode>(backend: &BE, key: &[u8]) -> ClientResult<Option<T>> {
|
||||
match backend.get_aux(key)? {
|
||||
fn load_decode<BE: AuxStore, T: Decode>(backend: &BE, key: &[u8]) -> Result<Option<T>, Error> {
|
||||
match backend.get_aux(key).map_err(|e| Error::Backend(e.to_string()))? {
|
||||
None => Ok(None),
|
||||
Some(t) => T::decode(&mut &t[..])
|
||||
.map_err(|e| ClientError::Backend(format!("BEEFY DB is corrupted: {}", e)))
|
||||
.map_err(|e| Error::Backend(format!("BEEFY DB is corrupted: {}", e)))
|
||||
.map(Some),
|
||||
}
|
||||
}
|
||||
|
||||
/// Load or initialize persistent data from backend.
|
||||
pub(crate) fn load_persistent<B, BE>(backend: &BE) -> ClientResult<Option<PersistedState<B>>>
|
||||
pub(crate) fn load_persistent<B, BE>(backend: &BE) -> Result<Option<PersistedState<B>>, Error>
|
||||
where
|
||||
B: BlockT,
|
||||
BE: Backend<B>,
|
||||
@@ -65,8 +66,7 @@ where
|
||||
None => (),
|
||||
Some(1) | Some(2) | Some(3) => (), // versions 1, 2 & 3 are obsolete and should be ignored
|
||||
Some(4) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE_KEY),
|
||||
other =>
|
||||
return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))),
|
||||
other => return Err(Error::Backend(format!("Unsupported BEEFY DB version: {:?}", other))),
|
||||
}
|
||||
|
||||
// No persistent state found in DB.
|
||||
|
||||
+2
-2
@@ -201,7 +201,7 @@ where
|
||||
let peer = request.peer;
|
||||
match self.handle_request(request) {
|
||||
Ok(()) => {
|
||||
metric_inc!(self, beefy_successful_justification_responses);
|
||||
metric_inc!(self.metrics, beefy_successful_justification_responses);
|
||||
debug!(
|
||||
target: BEEFY_SYNC_LOG_TARGET,
|
||||
"🥩 Handled BEEFY justification request from {:?}.", peer
|
||||
@@ -209,7 +209,7 @@ where
|
||||
},
|
||||
Err(e) => {
|
||||
// peer reputation changes already applied in `self.handle_request()`
|
||||
metric_inc!(self, beefy_failed_justification_responses);
|
||||
metric_inc!(self.metrics, beefy_failed_justification_responses);
|
||||
debug!(
|
||||
target: BEEFY_SYNC_LOG_TARGET,
|
||||
"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
|
||||
|
||||
+5
-5
@@ -148,7 +148,7 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
|
||||
if let Some(peer) = self.try_next_peer() {
|
||||
self.request_from_peer(peer, RequestInfo { block, active_set });
|
||||
} else {
|
||||
metric_inc!(self, beefy_on_demand_justification_no_peer_to_request_from);
|
||||
metric_inc!(self.metrics, beefy_on_demand_justification_no_peer_to_request_from);
|
||||
debug!(
|
||||
target: BEEFY_SYNC_LOG_TARGET,
|
||||
"🥩 no good peers to request justif #{:?} from", block
|
||||
@@ -194,13 +194,13 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
|
||||
);
|
||||
match e {
|
||||
RequestFailure::Refused => {
|
||||
metric_inc!(self, beefy_on_demand_justification_peer_refused);
|
||||
metric_inc!(self.metrics, beefy_on_demand_justification_peer_refused);
|
||||
let peer_report =
|
||||
PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE };
|
||||
Error::InvalidResponse(peer_report)
|
||||
},
|
||||
_ => {
|
||||
metric_inc!(self, beefy_on_demand_justification_peer_error);
|
||||
metric_inc!(self.metrics, beefy_on_demand_justification_peer_error);
|
||||
Error::ResponseError
|
||||
},
|
||||
}
|
||||
@@ -212,7 +212,7 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
|
||||
&req_info.active_set,
|
||||
)
|
||||
.map_err(|(err, signatures_checked)| {
|
||||
metric_inc!(self, beefy_on_demand_justification_invalid_proof);
|
||||
metric_inc!(self.metrics, beefy_on_demand_justification_invalid_proof);
|
||||
debug!(
|
||||
target: BEEFY_SYNC_LOG_TARGET,
|
||||
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
|
||||
@@ -261,7 +261,7 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
|
||||
}
|
||||
},
|
||||
Ok(proof) => {
|
||||
metric_inc!(self, beefy_on_demand_justification_good_proof);
|
||||
metric_inc!(self.metrics, beefy_on_demand_justification_good_proof);
|
||||
debug!(
|
||||
target: BEEFY_SYNC_LOG_TARGET,
|
||||
"🥩 received valid on-demand justif #{:?} from {:?}", block, peer
|
||||
|
||||
@@ -165,7 +165,7 @@ where
|
||||
self.justification_sender
|
||||
.notify(|| Ok::<_, ()>(proof))
|
||||
.expect("the closure always returns Ok; qed.");
|
||||
metric_inc!(self, beefy_good_justification_imports);
|
||||
metric_inc!(self.metrics, beefy_good_justification_imports);
|
||||
},
|
||||
Err(err) => {
|
||||
debug!(
|
||||
@@ -174,7 +174,7 @@ where
|
||||
number,
|
||||
err,
|
||||
);
|
||||
metric_inc!(self, beefy_bad_justification_imports);
|
||||
metric_inc!(self.metrics, beefy_bad_justification_imports);
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
@@ -27,10 +27,9 @@ use crate::{
|
||||
outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler,
|
||||
},
|
||||
},
|
||||
error::Error,
|
||||
import::BeefyBlockImport,
|
||||
metrics::register_metrics,
|
||||
round::Rounds,
|
||||
worker::PersistedState,
|
||||
};
|
||||
use futures::{stream::Fuse, StreamExt};
|
||||
use log::{debug, error, info, warn};
|
||||
@@ -47,17 +46,11 @@ use sp_blockchain::{
|
||||
use sp_consensus::{Error as ConsensusError, SyncOracle};
|
||||
use sp_consensus_beefy::{
|
||||
ecdsa_crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet,
|
||||
BEEFY_ENGINE_ID,
|
||||
};
|
||||
use sp_keystore::KeystorePtr;
|
||||
use sp_mmr_primitives::MmrApi;
|
||||
use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero};
|
||||
use std::{
|
||||
collections::{BTreeMap, VecDeque},
|
||||
marker::PhantomData,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
|
||||
|
||||
mod aux_schema;
|
||||
mod error;
|
||||
@@ -309,14 +302,17 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
|
||||
},
|
||||
};
|
||||
|
||||
let persisted_state = match load_or_init_voter_state(
|
||||
&*backend,
|
||||
&*runtime,
|
||||
beefy_genesis,
|
||||
best_grandpa,
|
||||
min_block_delta,
|
||||
)
|
||||
.await
|
||||
let mut worker_base = worker::BeefyWorkerBase {
|
||||
backend: backend.clone(),
|
||||
runtime: runtime.clone(),
|
||||
key_store: key_store.clone().into(),
|
||||
metrics: metrics.clone(),
|
||||
_phantom: Default::default(),
|
||||
};
|
||||
|
||||
let persisted_state = match worker_base
|
||||
.load_or_init_state(beefy_genesis, best_grandpa, min_block_delta)
|
||||
.await
|
||||
{
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
@@ -334,14 +330,11 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
|
||||
}
|
||||
|
||||
let worker = worker::BeefyWorker {
|
||||
backend: backend.clone(),
|
||||
base: worker_base,
|
||||
payload_provider: payload_provider.clone(),
|
||||
runtime: runtime.clone(),
|
||||
sync: sync.clone(),
|
||||
key_store: key_store.clone().into(),
|
||||
comms: beefy_comms,
|
||||
links: links.clone(),
|
||||
metrics: metrics.clone(),
|
||||
pending_justifications: BTreeMap::new(),
|
||||
persisted_state,
|
||||
};
|
||||
@@ -368,43 +361,6 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_or_init_voter_state<B, BE, R>(
|
||||
backend: &BE,
|
||||
runtime: &R,
|
||||
beefy_genesis: NumberFor<B>,
|
||||
best_grandpa: <B as Block>::Header,
|
||||
min_block_delta: u32,
|
||||
) -> ClientResult<PersistedState<B>>
|
||||
where
|
||||
B: Block,
|
||||
BE: Backend<B>,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B, AuthorityId>,
|
||||
{
|
||||
// Initialize voter state from AUX DB if compatible.
|
||||
if let Some(mut state) = crate::aux_schema::load_persistent(backend)?
|
||||
// Verify state pallet genesis matches runtime.
|
||||
.filter(|state| state.pallet_genesis() == beefy_genesis)
|
||||
{
|
||||
// Overwrite persisted state with current best GRANDPA block.
|
||||
state.set_best_grandpa(best_grandpa.clone());
|
||||
// Overwrite persisted data with newly provided `min_block_delta`.
|
||||
state.set_min_block_delta(min_block_delta);
|
||||
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
|
||||
|
||||
// Make sure that all the headers that we need have been synced.
|
||||
let mut header = best_grandpa.clone();
|
||||
while *header.number() > state.best_beefy() {
|
||||
header =
|
||||
wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?;
|
||||
}
|
||||
return Ok(state)
|
||||
}
|
||||
|
||||
// No valid voter-state persisted, re-initialize from pallet genesis.
|
||||
initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta).await
|
||||
}
|
||||
|
||||
/// Waits until the parent header of `current` is available and returns it.
|
||||
///
|
||||
/// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers.
|
||||
@@ -415,7 +371,7 @@ async fn wait_for_parent_header<B, BC>(
|
||||
blockchain: &BC,
|
||||
current: <B as Block>::Header,
|
||||
delay: Duration,
|
||||
) -> ClientResult<<B as Block>::Header>
|
||||
) -> Result<<B as Block>::Header, Error>
|
||||
where
|
||||
B: Block,
|
||||
BC: BlockchainBackend<B>,
|
||||
@@ -423,10 +379,13 @@ where
|
||||
if *current.number() == Zero::zero() {
|
||||
let msg = format!("header {} is Genesis, there is no parent for it", current.hash());
|
||||
warn!(target: LOG_TARGET, "{}", msg);
|
||||
return Err(ClientError::UnknownBlock(msg))
|
||||
return Err(Error::Backend(msg));
|
||||
}
|
||||
loop {
|
||||
match blockchain.header(*current.parent_hash())? {
|
||||
match blockchain
|
||||
.header(*current.parent_hash())
|
||||
.map_err(|e| Error::Backend(e.to_string()))?
|
||||
{
|
||||
Some(parent) => return Ok(parent),
|
||||
None => {
|
||||
info!(
|
||||
@@ -441,108 +400,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// If no persisted state present, walk back the chain from first GRANDPA notification to either:
|
||||
// - latest BEEFY finalized block, or if none found on the way,
|
||||
// - BEEFY pallet genesis;
|
||||
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize.
|
||||
async fn initialize_voter_state<B, BE, R>(
|
||||
backend: &BE,
|
||||
runtime: &R,
|
||||
beefy_genesis: NumberFor<B>,
|
||||
best_grandpa: <B as Block>::Header,
|
||||
min_block_delta: u32,
|
||||
) -> ClientResult<PersistedState<B>>
|
||||
where
|
||||
B: Block,
|
||||
BE: Backend<B>,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B, AuthorityId>,
|
||||
{
|
||||
let blockchain = backend.blockchain();
|
||||
|
||||
let beefy_genesis = runtime
|
||||
.runtime_api()
|
||||
.beefy_genesis(best_grandpa.hash())
|
||||
.ok()
|
||||
.flatten()
|
||||
.filter(|genesis| *genesis == beefy_genesis)
|
||||
.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))?;
|
||||
// Walk back the imported blocks and initialize voter either, at the last block with
|
||||
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
|
||||
let mut sessions = VecDeque::new();
|
||||
let mut header = best_grandpa.clone();
|
||||
let state = loop {
|
||||
if let Some(true) = blockchain
|
||||
.justifications(header.hash())
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
|
||||
{
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
|
||||
*header.number()
|
||||
);
|
||||
let best_beefy = *header.number();
|
||||
// If no session boundaries detected so far, just initialize new rounds here.
|
||||
if sessions.is_empty() {
|
||||
let active_set = expect_validator_set(runtime, backend, &header).await?;
|
||||
let mut rounds = Rounds::new(best_beefy, active_set);
|
||||
// Mark the round as already finalized.
|
||||
rounds.conclude(best_beefy);
|
||||
sessions.push_front(rounds);
|
||||
}
|
||||
let state = PersistedState::checked_new(
|
||||
best_grandpa,
|
||||
best_beefy,
|
||||
sessions,
|
||||
min_block_delta,
|
||||
beefy_genesis,
|
||||
)
|
||||
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
|
||||
break state
|
||||
}
|
||||
|
||||
if *header.number() == beefy_genesis {
|
||||
// We've reached BEEFY genesis, initialize voter here.
|
||||
let genesis_set = expect_validator_set(runtime, backend, &header).await?;
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
|
||||
Starting voting rounds at block {:?}, genesis validator set {:?}.",
|
||||
beefy_genesis,
|
||||
genesis_set,
|
||||
);
|
||||
|
||||
sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
|
||||
break PersistedState::checked_new(
|
||||
best_grandpa,
|
||||
Zero::zero(),
|
||||
sessions,
|
||||
min_block_delta,
|
||||
beefy_genesis,
|
||||
)
|
||||
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
|
||||
}
|
||||
|
||||
if let Some(active) = worker::find_authorities_change::<B>(&header) {
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Marking block {:?} as BEEFY Mandatory.",
|
||||
*header.number()
|
||||
);
|
||||
sessions.push_front(Rounds::new(*header.number(), active));
|
||||
}
|
||||
|
||||
// Move up the chain.
|
||||
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
|
||||
};
|
||||
|
||||
aux_schema::write_current_version(backend)?;
|
||||
aux_schema::write_voter_state(backend, &state)?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
/// Wait for BEEFY runtime pallet to be available, return active validator set.
|
||||
/// Should be called only once during worker initialization.
|
||||
async fn wait_for_runtime_pallet<B, R>(
|
||||
@@ -595,7 +452,7 @@ async fn expect_validator_set<B, BE, R>(
|
||||
runtime: &R,
|
||||
backend: &BE,
|
||||
at_header: &B::Header,
|
||||
) -> ClientResult<ValidatorSet<AuthorityId>>
|
||||
) -> Result<ValidatorSet<AuthorityId>, Error>
|
||||
where
|
||||
B: Block,
|
||||
BE: Backend<B>,
|
||||
@@ -618,7 +475,9 @@ where
|
||||
// Move up the chain. Ultimately we'll get it from chain genesis state, or error out
|
||||
// there.
|
||||
None =>
|
||||
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?,
|
||||
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY)
|
||||
.await
|
||||
.map_err(|e| Error::Backend(e.to_string()))?,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -305,10 +305,10 @@ pub(crate) fn register_metrics<T: PrometheusRegister>(
|
||||
// if expr does not derive `Display`.
|
||||
#[macro_export]
|
||||
macro_rules! metric_set {
|
||||
($self:ident, $m:ident, $v:expr) => {{
|
||||
($metrics:expr, $m:ident, $v:expr) => {{
|
||||
let val: u64 = format!("{}", $v).parse().unwrap();
|
||||
|
||||
if let Some(metrics) = $self.metrics.as_ref() {
|
||||
if let Some(metrics) = $metrics.as_ref() {
|
||||
metrics.$m.set(val);
|
||||
}
|
||||
}};
|
||||
@@ -316,8 +316,8 @@ macro_rules! metric_set {
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! metric_inc {
|
||||
($self:ident, $m:ident) => {{
|
||||
if let Some(metrics) = $self.metrics.as_ref() {
|
||||
($metrics:expr, $m:ident) => {{
|
||||
if let Some(metrics) = $metrics.as_ref() {
|
||||
metrics.$m.inc();
|
||||
}
|
||||
}};
|
||||
@@ -325,8 +325,8 @@ macro_rules! metric_inc {
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! metric_get {
|
||||
($self:ident, $m:ident) => {{
|
||||
$self.metrics.as_ref().map(|metrics| metrics.$m.clone())
|
||||
($metrics:expr, $m:ident) => {{
|
||||
$metrics.as_ref().map(|metrics| metrics.$m.clone())
|
||||
}};
|
||||
}
|
||||
|
||||
|
||||
@@ -28,10 +28,12 @@ use crate::{
|
||||
},
|
||||
request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler},
|
||||
},
|
||||
error::Error,
|
||||
gossip_protocol_name,
|
||||
justification::*,
|
||||
load_or_init_voter_state, wait_for_runtime_pallet, BeefyRPCLinks, BeefyVoterLinks, KnownPeers,
|
||||
PersistedState,
|
||||
wait_for_runtime_pallet,
|
||||
worker::{BeefyWorkerBase, PersistedState},
|
||||
BeefyRPCLinks, BeefyVoterLinks, KnownPeers,
|
||||
};
|
||||
use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt};
|
||||
use parking_lot::Mutex;
|
||||
@@ -363,7 +365,7 @@ async fn voter_init_setup(
|
||||
net: &mut BeefyTestNet,
|
||||
finality: &mut futures::stream::Fuse<FinalityNotifications<Block>>,
|
||||
api: &TestApi,
|
||||
) -> sp_blockchain::Result<PersistedState<Block>> {
|
||||
) -> Result<PersistedState<Block>, Error> {
|
||||
let backend = net.peer(0).client().as_backend();
|
||||
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
|
||||
let (gossip_validator, _) = GossipValidator::new(known_peers);
|
||||
@@ -378,7 +380,14 @@ async fn voter_init_setup(
|
||||
);
|
||||
let (beefy_genesis, best_grandpa) =
|
||||
wait_for_runtime_pallet(api, &mut gossip_engine, finality).await.unwrap();
|
||||
load_or_init_voter_state(&*backend, api, beefy_genesis, best_grandpa, 1).await
|
||||
let mut worker_base = BeefyWorkerBase {
|
||||
backend,
|
||||
runtime: Arc::new(api.clone()),
|
||||
key_store: None.into(),
|
||||
metrics: None,
|
||||
_phantom: Default::default(),
|
||||
};
|
||||
worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await
|
||||
}
|
||||
|
||||
// Spawns beefy voters. Returns a future to spawn on the runtime.
|
||||
@@ -1072,9 +1081,15 @@ async fn should_initialize_voter_at_custom_genesis() {
|
||||
);
|
||||
let (beefy_genesis, best_grandpa) =
|
||||
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
|
||||
let persisted_state = load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut worker_base = BeefyWorkerBase {
|
||||
backend: backend.clone(),
|
||||
runtime: Arc::new(api),
|
||||
key_store: None.into(),
|
||||
metrics: None,
|
||||
_phantom: Default::default(),
|
||||
};
|
||||
let persisted_state =
|
||||
worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
|
||||
|
||||
// Test initialization at session boundary.
|
||||
// verify voter initialized with single session starting at block `custom_pallet_genesis` (7)
|
||||
@@ -1107,10 +1122,15 @@ async fn should_initialize_voter_at_custom_genesis() {
|
||||
// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
|
||||
let (beefy_genesis, best_grandpa) =
|
||||
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
|
||||
let mut worker_base = BeefyWorkerBase {
|
||||
backend: backend.clone(),
|
||||
runtime: Arc::new(api),
|
||||
key_store: None.into(),
|
||||
metrics: None,
|
||||
_phantom: Default::default(),
|
||||
};
|
||||
let new_persisted_state =
|
||||
load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1)
|
||||
.await
|
||||
.unwrap();
|
||||
worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
|
||||
|
||||
// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
|
||||
let sessions = new_persisted_state.voting_oracle().sessions();
|
||||
@@ -1285,6 +1305,104 @@ async fn should_initialize_voter_at_custom_genesis_when_state_unavailable() {
|
||||
assert_eq!(state, persisted_state);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn should_catch_up_when_loading_saved_voter_state() {
|
||||
let keys = &[BeefyKeyring::Alice];
|
||||
let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
|
||||
let mut net = BeefyTestNet::new(1);
|
||||
let backend = net.peer(0).client().as_backend();
|
||||
|
||||
// push 30 blocks with `AuthorityChange` digests every 10 blocks
|
||||
let hashes = net.generate_blocks_and_sync(30, 10, &validator_set, false).await;
|
||||
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
|
||||
// finalize 13 without justifications
|
||||
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();
|
||||
|
||||
let api = TestApi::with_validator_set(&validator_set);
|
||||
|
||||
// load persistent state - nothing in DB, should init at genesis
|
||||
//
|
||||
// NOTE: code from `voter_init_setup()` is moved here because the new network event system
|
||||
// doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the
|
||||
// first `GossipEngine`
|
||||
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
|
||||
let (gossip_validator, _) = GossipValidator::new(known_peers);
|
||||
let gossip_validator = Arc::new(gossip_validator);
|
||||
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
|
||||
net.peer(0).network_service().clone(),
|
||||
net.peer(0).sync_service().clone(),
|
||||
net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
|
||||
"/beefy/whatever",
|
||||
gossip_validator,
|
||||
None,
|
||||
);
|
||||
let (beefy_genesis, best_grandpa) =
|
||||
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
|
||||
let mut worker_base = BeefyWorkerBase {
|
||||
backend: backend.clone(),
|
||||
runtime: Arc::new(api.clone()),
|
||||
key_store: None.into(),
|
||||
metrics: None,
|
||||
_phantom: Default::default(),
|
||||
};
|
||||
let persisted_state =
|
||||
worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
|
||||
|
||||
// Test initialization at session boundary.
|
||||
// verify voter initialized with two sessions starting at blocks 1 and 10
|
||||
let sessions = persisted_state.voting_oracle().sessions();
|
||||
assert_eq!(sessions.len(), 2);
|
||||
assert_eq!(sessions[0].session_start(), 1);
|
||||
assert_eq!(sessions[1].session_start(), 10);
|
||||
let rounds = persisted_state.active_round().unwrap();
|
||||
assert_eq!(rounds.session_start(), 1);
|
||||
assert_eq!(rounds.validator_set_id(), validator_set.id());
|
||||
|
||||
// verify next vote target is mandatory block 1
|
||||
assert_eq!(persisted_state.best_beefy(), 0);
|
||||
assert_eq!(persisted_state.best_grandpa_number(), 13);
|
||||
assert_eq!(persisted_state.voting_oracle().voting_target(), Some(1));
|
||||
|
||||
// verify state also saved to db
|
||||
assert!(verify_persisted_version(&*backend));
|
||||
let state = load_persistent(&*backend).unwrap().unwrap();
|
||||
assert_eq!(state, persisted_state);
|
||||
|
||||
// now let's consider that the node goes offline, and then it restarts after a while
|
||||
|
||||
// finalize 25 without justifications
|
||||
net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap();
|
||||
// load persistent state - state preset in DB
|
||||
// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
|
||||
let (beefy_genesis, best_grandpa) =
|
||||
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
|
||||
let mut worker_base = BeefyWorkerBase {
|
||||
backend: backend.clone(),
|
||||
runtime: Arc::new(api),
|
||||
key_store: None.into(),
|
||||
metrics: None,
|
||||
_phantom: Default::default(),
|
||||
};
|
||||
let persisted_state =
|
||||
worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
|
||||
|
||||
// Verify voter initialized with old sessions plus a new one starting at block 20.
|
||||
// There shouldn't be any duplicates.
|
||||
let sessions = persisted_state.voting_oracle().sessions();
|
||||
assert_eq!(sessions.len(), 3);
|
||||
assert_eq!(sessions[0].session_start(), 1);
|
||||
assert_eq!(sessions[1].session_start(), 10);
|
||||
assert_eq!(sessions[2].session_start(), 20);
|
||||
let rounds = persisted_state.active_round().unwrap();
|
||||
assert_eq!(rounds.session_start(), 1);
|
||||
assert_eq!(rounds.validator_set_id(), validator_set.id());
|
||||
|
||||
// verify next vote target is mandatory block 1
|
||||
assert_eq!(persisted_state.best_beefy(), 0);
|
||||
assert_eq!(persisted_state.best_grandpa_number(), 25);
|
||||
assert_eq!(persisted_state.voting_oracle().voting_target(), Some(1));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn beefy_finalizing_after_pallet_genesis() {
|
||||
sp_tracing::try_init_simple();
|
||||
|
||||
@@ -17,18 +17,20 @@
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::{
|
||||
aux_schema,
|
||||
communication::{
|
||||
gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator},
|
||||
peers::PeerReport,
|
||||
request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo},
|
||||
},
|
||||
error::Error,
|
||||
expect_validator_set,
|
||||
justification::BeefyVersionedFinalityProof,
|
||||
keystore::{BeefyKeystore, BeefySignatureHasher},
|
||||
metric_inc, metric_set,
|
||||
metrics::VoterMetrics,
|
||||
round::{Rounds, VoteImportResult},
|
||||
BeefyVoterLinks, LOG_TARGET,
|
||||
wait_for_parent_header, BeefyVoterLinks, HEADER_SYNC_DELAY, LOG_TARGET,
|
||||
};
|
||||
use codec::{Codec, Decode, DecodeAll, Encode};
|
||||
use futures::{stream::Fuse, FutureExt, StreamExt};
|
||||
@@ -38,6 +40,7 @@ use sc_network_gossip::GossipEngine;
|
||||
use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver};
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
|
||||
use sp_blockchain::Backend as BlockchainBackend;
|
||||
use sp_consensus::SyncOracle;
|
||||
use sp_consensus_beefy::{
|
||||
check_equivocation_proof,
|
||||
@@ -53,6 +56,7 @@ use sp_runtime::{
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, VecDeque},
|
||||
fmt::Debug,
|
||||
marker::PhantomData,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
@@ -176,6 +180,13 @@ impl<B: Block> VoterOracle<B> {
|
||||
}
|
||||
}
|
||||
|
||||
// Check if an observed session can be added to the Oracle.
|
||||
fn can_add_session(&self, session_start: NumberFor<B>) -> bool {
|
||||
let latest_known_session_start =
|
||||
self.sessions.back().map(|session| session.session_start());
|
||||
Some(session_start) > latest_known_session_start
|
||||
}
|
||||
|
||||
/// Add new observed session to the Oracle.
|
||||
pub fn add_session(&mut self, rounds: Rounds<B>) {
|
||||
self.sessions.push_back(rounds);
|
||||
@@ -236,12 +247,10 @@ impl<B: Block> VoterOracle<B> {
|
||||
/// Return `Some(number)` if we should be voting on block `number`,
|
||||
/// return `None` if there is no block we should vote on.
|
||||
pub fn voting_target(&self) -> Option<NumberFor<B>> {
|
||||
let rounds = if let Some(r) = self.sessions.front() {
|
||||
r
|
||||
} else {
|
||||
let rounds = self.sessions.front().or_else(|| {
|
||||
debug!(target: LOG_TARGET, "🥩 No voting round started");
|
||||
return None
|
||||
};
|
||||
None
|
||||
})?;
|
||||
let best_grandpa = *self.best_grandpa_block_header.number();
|
||||
let best_beefy = self.best_beefy_block;
|
||||
|
||||
@@ -327,50 +336,171 @@ pub(crate) struct BeefyComms<B: Block> {
|
||||
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
|
||||
}
|
||||
|
||||
/// A BEEFY worker plays the BEEFY protocol
|
||||
pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
|
||||
pub(crate) struct BeefyWorkerBase<B: Block, BE, RuntimeApi> {
|
||||
// utilities
|
||||
pub backend: Arc<BE>,
|
||||
pub payload_provider: P,
|
||||
pub runtime: Arc<RuntimeApi>,
|
||||
pub sync: Arc<S>,
|
||||
pub key_store: BeefyKeystore,
|
||||
|
||||
// communication (created once, but returned and reused if worker is restarted/reinitialized)
|
||||
pub comms: BeefyComms<B>,
|
||||
|
||||
// channels
|
||||
/// Links between the block importer, the background voter and the RPC layer.
|
||||
pub links: BeefyVoterLinks<B>,
|
||||
|
||||
// voter state
|
||||
/// BEEFY client metrics.
|
||||
pub metrics: Option<VoterMetrics>,
|
||||
/// Buffer holding justifications for future processing.
|
||||
pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
|
||||
/// Persisted voter state.
|
||||
pub persisted_state: PersistedState<B>,
|
||||
|
||||
pub _phantom: PhantomData<B>,
|
||||
}
|
||||
|
||||
impl<B, BE, P, R, S> BeefyWorker<B, BE, P, R, S>
|
||||
impl<B, BE, R> BeefyWorkerBase<B, BE, R>
|
||||
where
|
||||
B: Block + Codec,
|
||||
BE: Backend<B>,
|
||||
P: PayloadProvider<B>,
|
||||
S: SyncOracle,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B, AuthorityId>,
|
||||
{
|
||||
fn best_grandpa_block(&self) -> NumberFor<B> {
|
||||
*self.persisted_state.voting_oracle.best_grandpa_block_header.number()
|
||||
// If no persisted state present, walk back the chain from first GRANDPA notification to either:
|
||||
// - latest BEEFY finalized block, or if none found on the way,
|
||||
// - BEEFY pallet genesis;
|
||||
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to
|
||||
// finalize.
|
||||
async fn init_state(
|
||||
&self,
|
||||
beefy_genesis: NumberFor<B>,
|
||||
best_grandpa: <B as Block>::Header,
|
||||
min_block_delta: u32,
|
||||
) -> Result<PersistedState<B>, Error> {
|
||||
let blockchain = self.backend.blockchain();
|
||||
|
||||
let beefy_genesis = self
|
||||
.runtime
|
||||
.runtime_api()
|
||||
.beefy_genesis(best_grandpa.hash())
|
||||
.ok()
|
||||
.flatten()
|
||||
.filter(|genesis| *genesis == beefy_genesis)
|
||||
.ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?;
|
||||
// Walk back the imported blocks and initialize voter either, at the last block with
|
||||
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
|
||||
let mut sessions = VecDeque::new();
|
||||
let mut header = best_grandpa.clone();
|
||||
let state = loop {
|
||||
if let Some(true) = blockchain
|
||||
.justifications(header.hash())
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
|
||||
{
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
|
||||
*header.number()
|
||||
);
|
||||
let best_beefy = *header.number();
|
||||
// If no session boundaries detected so far, just initialize new rounds here.
|
||||
if sessions.is_empty() {
|
||||
let active_set =
|
||||
expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header)
|
||||
.await?;
|
||||
let mut rounds = Rounds::new(best_beefy, active_set);
|
||||
// Mark the round as already finalized.
|
||||
rounds.conclude(best_beefy);
|
||||
sessions.push_front(rounds);
|
||||
}
|
||||
let state = PersistedState::checked_new(
|
||||
best_grandpa,
|
||||
best_beefy,
|
||||
sessions,
|
||||
min_block_delta,
|
||||
beefy_genesis,
|
||||
)
|
||||
.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
|
||||
break state
|
||||
}
|
||||
|
||||
if *header.number() == beefy_genesis {
|
||||
// We've reached BEEFY genesis, initialize voter here.
|
||||
let genesis_set =
|
||||
expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header)
|
||||
.await?;
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
|
||||
Starting voting rounds at block {:?}, genesis validator set {:?}.",
|
||||
beefy_genesis,
|
||||
genesis_set,
|
||||
);
|
||||
|
||||
sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
|
||||
break PersistedState::checked_new(
|
||||
best_grandpa,
|
||||
Zero::zero(),
|
||||
sessions,
|
||||
min_block_delta,
|
||||
beefy_genesis,
|
||||
)
|
||||
.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?
|
||||
}
|
||||
|
||||
if let Some(active) = find_authorities_change::<B>(&header) {
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Marking block {:?} as BEEFY Mandatory.",
|
||||
*header.number()
|
||||
);
|
||||
sessions.push_front(Rounds::new(*header.number(), active));
|
||||
}
|
||||
|
||||
// Move up the chain.
|
||||
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
|
||||
};
|
||||
|
||||
aux_schema::write_current_version(self.backend.as_ref())?;
|
||||
aux_schema::write_voter_state(self.backend.as_ref(), &state)?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
fn voting_oracle(&self) -> &VoterOracle<B> {
|
||||
&self.persisted_state.voting_oracle
|
||||
}
|
||||
pub async fn load_or_init_state(
|
||||
&mut self,
|
||||
beefy_genesis: NumberFor<B>,
|
||||
best_grandpa: <B as Block>::Header,
|
||||
min_block_delta: u32,
|
||||
) -> Result<PersistedState<B>, Error> {
|
||||
// Initialize voter state from AUX DB if compatible.
|
||||
if let Some(mut state) = crate::aux_schema::load_persistent(self.backend.as_ref())?
|
||||
// Verify state pallet genesis matches runtime.
|
||||
.filter(|state| state.pallet_genesis() == beefy_genesis)
|
||||
{
|
||||
// Overwrite persisted state with current best GRANDPA block.
|
||||
state.set_best_grandpa(best_grandpa.clone());
|
||||
// Overwrite persisted data with newly provided `min_block_delta`.
|
||||
state.set_min_block_delta(min_block_delta);
|
||||
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
|
||||
|
||||
fn active_rounds(&mut self) -> Result<&Rounds<B>, Error> {
|
||||
self.persisted_state.voting_oracle.active_rounds()
|
||||
// Make sure that all the headers that we need have been synced.
|
||||
let mut new_sessions = vec![];
|
||||
let mut header = best_grandpa.clone();
|
||||
while *header.number() > state.best_beefy() {
|
||||
if state.voting_oracle.can_add_session(*header.number()) {
|
||||
if let Some(active) = find_authorities_change::<B>(&header) {
|
||||
new_sessions.push((active, *header.number()));
|
||||
}
|
||||
}
|
||||
header =
|
||||
wait_for_parent_header(self.backend.blockchain(), header, HEADER_SYNC_DELAY)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Make sure we didn't miss any sessions during node restart.
|
||||
for (validator_set, new_session_start) in new_sessions.drain(..).rev() {
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Handling missed BEEFY session after node restart: {:?}.",
|
||||
new_session_start
|
||||
);
|
||||
self.init_session_at(&mut state, validator_set, new_session_start);
|
||||
}
|
||||
return Ok(state)
|
||||
}
|
||||
|
||||
// No valid voter-state persisted, re-initialize from pallet genesis.
|
||||
self.init_state(beefy_genesis, best_grandpa, min_block_delta).await
|
||||
}
|
||||
|
||||
/// Verify `active` validator set for `block` against the key store
|
||||
@@ -394,7 +524,7 @@ where
|
||||
if store.intersection(&active).count() == 0 {
|
||||
let msg = "no authority public key found in store".to_string();
|
||||
debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg);
|
||||
metric_inc!(self, beefy_no_authority_found_in_store);
|
||||
metric_inc!(self.metrics, beefy_no_authority_found_in_store);
|
||||
Err(Error::Keystore(msg))
|
||||
} else {
|
||||
Ok(())
|
||||
@@ -404,13 +534,14 @@ where
|
||||
/// Handle session changes by starting new voting round for mandatory blocks.
|
||||
fn init_session_at(
|
||||
&mut self,
|
||||
persisted_state: &mut PersistedState<B>,
|
||||
validator_set: ValidatorSet<AuthorityId>,
|
||||
new_session_start: NumberFor<B>,
|
||||
) {
|
||||
debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set);
|
||||
|
||||
// BEEFY should finalize a mandatory block during each session.
|
||||
if let Ok(active_session) = self.active_rounds() {
|
||||
if let Ok(active_session) = persisted_state.voting_oracle.active_rounds() {
|
||||
if !active_session.mandatory_done() {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
@@ -418,7 +549,7 @@ where
|
||||
validator_set.id(),
|
||||
active_session.validator_set_id(),
|
||||
);
|
||||
metric_inc!(self, beefy_lagging_sessions);
|
||||
metric_inc!(self.metrics, beefy_lagging_sessions);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -428,10 +559,10 @@ where
|
||||
}
|
||||
|
||||
let id = validator_set.id();
|
||||
self.persisted_state
|
||||
persisted_state
|
||||
.voting_oracle
|
||||
.add_session(Rounds::new(new_session_start, validator_set));
|
||||
metric_set!(self, beefy_validator_set_id, id);
|
||||
metric_set!(self.metrics, beefy_validator_set_id, id);
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 New Rounds for validator set id: {:?} with session_start {:?}",
|
||||
@@ -439,6 +570,61 @@ where
|
||||
new_session_start
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// A BEEFY worker/voter that follows the BEEFY protocol
|
||||
pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
|
||||
pub base: BeefyWorkerBase<B, BE, RuntimeApi>,
|
||||
|
||||
// utils
|
||||
pub payload_provider: P,
|
||||
pub sync: Arc<S>,
|
||||
|
||||
// communication (created once, but returned and reused if worker is restarted/reinitialized)
|
||||
pub comms: BeefyComms<B>,
|
||||
|
||||
// channels
|
||||
/// Links between the block importer, the background voter and the RPC layer.
|
||||
pub links: BeefyVoterLinks<B>,
|
||||
|
||||
// voter state
|
||||
/// Buffer holding justifications for future processing.
|
||||
pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
|
||||
/// Persisted voter state.
|
||||
pub persisted_state: PersistedState<B>,
|
||||
}
|
||||
|
||||
impl<B, BE, P, R, S> BeefyWorker<B, BE, P, R, S>
|
||||
where
|
||||
B: Block + Codec,
|
||||
BE: Backend<B>,
|
||||
P: PayloadProvider<B>,
|
||||
S: SyncOracle,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B, AuthorityId>,
|
||||
{
|
||||
fn best_grandpa_block(&self) -> NumberFor<B> {
|
||||
*self.persisted_state.voting_oracle.best_grandpa_block_header.number()
|
||||
}
|
||||
|
||||
fn voting_oracle(&self) -> &VoterOracle<B> {
|
||||
&self.persisted_state.voting_oracle
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn active_rounds(&mut self) -> Result<&Rounds<B>, Error> {
|
||||
self.persisted_state.voting_oracle.active_rounds()
|
||||
}
|
||||
|
||||
/// Handle session changes by starting new voting round for mandatory blocks.
|
||||
fn init_session_at(
|
||||
&mut self,
|
||||
validator_set: ValidatorSet<AuthorityId>,
|
||||
new_session_start: NumberFor<B>,
|
||||
) {
|
||||
self.base
|
||||
.init_session_at(&mut self.persisted_state, validator_set, new_session_start);
|
||||
}
|
||||
|
||||
fn handle_finality_notification(
|
||||
&mut self,
|
||||
@@ -452,7 +638,8 @@ where
|
||||
);
|
||||
let header = ¬ification.header;
|
||||
|
||||
self.runtime
|
||||
self.base
|
||||
.runtime
|
||||
.runtime_api()
|
||||
.beefy_genesis(header.hash())
|
||||
.ok()
|
||||
@@ -466,7 +653,7 @@ where
|
||||
self.persisted_state.set_best_grandpa(header.clone());
|
||||
|
||||
// Check all (newly) finalized blocks for new session(s).
|
||||
let backend = self.backend.clone();
|
||||
let backend = self.base.backend.clone();
|
||||
for header in notification
|
||||
.tree_route
|
||||
.iter()
|
||||
@@ -485,7 +672,7 @@ where
|
||||
}
|
||||
|
||||
if new_session_added {
|
||||
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
|
||||
crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
|
||||
.map_err(|e| Error::Backend(e.to_string()))?;
|
||||
}
|
||||
|
||||
@@ -519,7 +706,7 @@ where
|
||||
true,
|
||||
);
|
||||
},
|
||||
RoundAction::Drop => metric_inc!(self, beefy_stale_votes),
|
||||
RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_votes),
|
||||
RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
|
||||
};
|
||||
Ok(())
|
||||
@@ -539,23 +726,23 @@ where
|
||||
match self.voting_oracle().triage_round(block_num)? {
|
||||
RoundAction::Process => {
|
||||
debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num);
|
||||
metric_inc!(self, beefy_imported_justifications);
|
||||
metric_inc!(self.base.metrics, beefy_imported_justifications);
|
||||
self.finalize(justification)?
|
||||
},
|
||||
RoundAction::Enqueue => {
|
||||
debug!(target: LOG_TARGET, "🥩 Buffer justification for round: {:?}.", block_num);
|
||||
if self.pending_justifications.len() < MAX_BUFFERED_JUSTIFICATIONS {
|
||||
self.pending_justifications.entry(block_num).or_insert(justification);
|
||||
metric_inc!(self, beefy_buffered_justifications);
|
||||
metric_inc!(self.base.metrics, beefy_buffered_justifications);
|
||||
} else {
|
||||
metric_inc!(self, beefy_buffered_justifications_dropped);
|
||||
metric_inc!(self.base.metrics, beefy_buffered_justifications_dropped);
|
||||
warn!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Buffer justification dropped for round: {:?}.", block_num
|
||||
);
|
||||
}
|
||||
},
|
||||
RoundAction::Drop => metric_inc!(self, beefy_stale_justifications),
|
||||
RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_justifications),
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
@@ -577,7 +764,7 @@ where
|
||||
// We created the `finality_proof` and know to be valid.
|
||||
// New state is persisted after finalization.
|
||||
self.finalize(finality_proof.clone())?;
|
||||
metric_inc!(self, beefy_good_votes_processed);
|
||||
metric_inc!(self.base.metrics, beefy_good_votes_processed);
|
||||
return Ok(Some(finality_proof))
|
||||
},
|
||||
VoteImportResult::Ok => {
|
||||
@@ -588,17 +775,20 @@ where
|
||||
.map(|(mandatory_num, _)| mandatory_num == block_number)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
|
||||
.map_err(|e| Error::Backend(e.to_string()))?;
|
||||
crate::aux_schema::write_voter_state(
|
||||
&*self.base.backend,
|
||||
&self.persisted_state,
|
||||
)
|
||||
.map_err(|e| Error::Backend(e.to_string()))?;
|
||||
}
|
||||
metric_inc!(self, beefy_good_votes_processed);
|
||||
metric_inc!(self.base.metrics, beefy_good_votes_processed);
|
||||
},
|
||||
VoteImportResult::Equivocation(proof) => {
|
||||
metric_inc!(self, beefy_equivocation_votes);
|
||||
metric_inc!(self.base.metrics, beefy_equivocation_votes);
|
||||
self.report_equivocation(proof)?;
|
||||
},
|
||||
VoteImportResult::Invalid => metric_inc!(self, beefy_invalid_votes),
|
||||
VoteImportResult::Stale => metric_inc!(self, beefy_stale_votes),
|
||||
VoteImportResult::Invalid => metric_inc!(self.base.metrics, beefy_invalid_votes),
|
||||
VoteImportResult::Stale => metric_inc!(self.base.metrics, beefy_stale_votes),
|
||||
};
|
||||
Ok(None)
|
||||
}
|
||||
@@ -625,14 +815,15 @@ where
|
||||
|
||||
// Set new best BEEFY block number.
|
||||
self.persisted_state.set_best_beefy(block_num);
|
||||
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
|
||||
crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
|
||||
.map_err(|e| Error::Backend(e.to_string()))?;
|
||||
|
||||
metric_set!(self, beefy_best_block, block_num);
|
||||
metric_set!(self.base.metrics, beefy_best_block, block_num);
|
||||
|
||||
self.comms.on_demand_justifications.cancel_requests_older_than(block_num);
|
||||
|
||||
if let Err(e) = self
|
||||
.base
|
||||
.backend
|
||||
.blockchain()
|
||||
.expect_block_hash_from_id(&BlockId::Number(block_num))
|
||||
@@ -642,7 +833,8 @@ where
|
||||
.notify(|| Ok::<_, ()>(hash))
|
||||
.expect("forwards closure result; the closure always returns Ok; qed.");
|
||||
|
||||
self.backend
|
||||
self.base
|
||||
.backend
|
||||
.append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode()))
|
||||
}) {
|
||||
debug!(
|
||||
@@ -679,12 +871,16 @@ where
|
||||
|
||||
for (num, justification) in justifs_to_process.into_iter() {
|
||||
debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num);
|
||||
metric_inc!(self, beefy_imported_justifications);
|
||||
metric_inc!(self.base.metrics, beefy_imported_justifications);
|
||||
if let Err(err) = self.finalize(justification) {
|
||||
error!(target: LOG_TARGET, "🥩 Error finalizing block: {}", err);
|
||||
}
|
||||
}
|
||||
metric_set!(self, beefy_buffered_justifications, self.pending_justifications.len());
|
||||
metric_set!(
|
||||
self.base.metrics,
|
||||
beefy_buffered_justifications,
|
||||
self.pending_justifications.len()
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -693,7 +889,7 @@ where
|
||||
fn try_to_vote(&mut self) -> Result<(), Error> {
|
||||
// Vote if there's now a new vote target.
|
||||
if let Some(target) = self.voting_oracle().voting_target() {
|
||||
metric_set!(self, beefy_should_vote_on, target);
|
||||
metric_set!(self.base.metrics, beefy_should_vote_on, target);
|
||||
if target > self.persisted_state.best_voted {
|
||||
self.do_vote(target)?;
|
||||
}
|
||||
@@ -713,6 +909,7 @@ where
|
||||
self.persisted_state.voting_oracle.best_grandpa_block_header.clone()
|
||||
} else {
|
||||
let hash = self
|
||||
.base
|
||||
.backend
|
||||
.blockchain()
|
||||
.expect_block_hash_from_id(&BlockId::Number(target_number))
|
||||
@@ -724,7 +921,7 @@ where
|
||||
Error::Backend(err_msg)
|
||||
})?;
|
||||
|
||||
self.backend.blockchain().expect_header(hash).map_err(|err| {
|
||||
self.base.backend.blockchain().expect_header(hash).map_err(|err| {
|
||||
let err_msg = format!(
|
||||
"Couldn't get header for block #{:?} ({:?}) (error: {:?}), skipping vote..",
|
||||
target_number, hash, err
|
||||
@@ -744,7 +941,7 @@ where
|
||||
let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
|
||||
let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id());
|
||||
|
||||
let authority_id = if let Some(id) = self.key_store.authority_id(validators) {
|
||||
let authority_id = if let Some(id) = self.base.key_store.authority_id(validators) {
|
||||
debug!(target: LOG_TARGET, "🥩 Local authority id: {:?}", id);
|
||||
id
|
||||
} else {
|
||||
@@ -758,7 +955,7 @@ where
|
||||
let commitment = Commitment { payload, block_number: target_number, validator_set_id };
|
||||
let encoded_commitment = commitment.encode();
|
||||
|
||||
let signature = match self.key_store.sign(&authority_id, &encoded_commitment) {
|
||||
let signature = match self.base.key_store.sign(&authority_id, &encoded_commitment) {
|
||||
Ok(sig) => sig,
|
||||
Err(err) => {
|
||||
warn!(target: LOG_TARGET, "🥩 Error signing commitment: {:?}", err);
|
||||
@@ -783,7 +980,7 @@ where
|
||||
.gossip_engine
|
||||
.gossip_message(proofs_topic::<B>(), encoded_proof, true);
|
||||
} else {
|
||||
metric_inc!(self, beefy_votes_sent);
|
||||
metric_inc!(self.base.metrics, beefy_votes_sent);
|
||||
debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
|
||||
let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
|
||||
self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
|
||||
@@ -791,8 +988,8 @@ where
|
||||
|
||||
// Persist state after vote to avoid double voting in case of voter restarts.
|
||||
self.persisted_state.best_voted = target_number;
|
||||
metric_set!(self, beefy_best_voted, target_number);
|
||||
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
|
||||
metric_set!(self.base.metrics, beefy_best_voted, target_number);
|
||||
crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
|
||||
.map_err(|e| Error::Backend(e.to_string()))
|
||||
}
|
||||
|
||||
@@ -966,7 +1163,7 @@ where
|
||||
if !check_equivocation_proof::<_, _, BeefySignatureHasher>(&proof) {
|
||||
debug!(target: LOG_TARGET, "🥩 Skip report for bad equivocation {:?}", proof);
|
||||
return Ok(())
|
||||
} else if let Some(local_id) = self.key_store.authority_id(validators) {
|
||||
} else if let Some(local_id) = self.base.key_store.authority_id(validators) {
|
||||
if offender_id == local_id {
|
||||
debug!(target: LOG_TARGET, "🥩 Skip equivocation report for own equivocation");
|
||||
return Ok(())
|
||||
@@ -975,6 +1172,7 @@ where
|
||||
|
||||
let number = *proof.round_number();
|
||||
let hash = self
|
||||
.base
|
||||
.backend
|
||||
.blockchain()
|
||||
.expect_block_hash_from_id(&BlockId::Number(number))
|
||||
@@ -985,7 +1183,7 @@ where
|
||||
);
|
||||
Error::Backend(err_msg)
|
||||
})?;
|
||||
let runtime_api = self.runtime.runtime_api();
|
||||
let runtime_api = self.base.runtime.runtime_api();
|
||||
// generate key ownership proof at that block
|
||||
let key_owner_proof = match runtime_api
|
||||
.generate_key_ownership_proof(hash, validator_set_id, offender_id)
|
||||
@@ -1002,7 +1200,7 @@ where
|
||||
};
|
||||
|
||||
// submit equivocation report at **best** block
|
||||
let best_block_hash = self.backend.blockchain().info().best_hash;
|
||||
let best_block_hash = self.base.backend.blockchain().info().best_hash;
|
||||
runtime_api
|
||||
.submit_report_equivocation_unsigned_extrinsic(best_block_hash, proof, key_owner_proof)
|
||||
.map_err(Error::RuntimeApi)?;
|
||||
@@ -1028,7 +1226,7 @@ where
|
||||
|
||||
/// Calculate next block number to vote on.
|
||||
///
|
||||
/// Return `None` if there is no voteable target yet.
|
||||
/// Return `None` if there is no votable target yet.
|
||||
fn vote_target<N>(best_grandpa: N, best_beefy: N, session_start: N, min_delta: u32) -> Option<N>
|
||||
where
|
||||
N: AtLeast32Bit + Copy + Debug,
|
||||
@@ -1189,14 +1387,17 @@ pub(crate) mod tests {
|
||||
on_demand_justifications,
|
||||
};
|
||||
BeefyWorker {
|
||||
backend,
|
||||
base: BeefyWorkerBase {
|
||||
backend,
|
||||
runtime: api,
|
||||
key_store: Some(keystore).into(),
|
||||
metrics,
|
||||
_phantom: Default::default(),
|
||||
},
|
||||
payload_provider,
|
||||
runtime: api,
|
||||
key_store: Some(keystore).into(),
|
||||
sync: Arc::new(sync),
|
||||
links,
|
||||
comms,
|
||||
metrics,
|
||||
sync: Arc::new(sync),
|
||||
pending_justifications: BTreeMap::new(),
|
||||
persisted_state,
|
||||
}
|
||||
@@ -1470,19 +1671,19 @@ pub(crate) mod tests {
|
||||
let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
|
||||
|
||||
// keystore doesn't contain other keys than validators'
|
||||
assert_eq!(worker.verify_validator_set(&1, &validator_set), Ok(()));
|
||||
assert_eq!(worker.base.verify_validator_set(&1, &validator_set), Ok(()));
|
||||
|
||||
// unknown `Bob` key
|
||||
let keys = &[Keyring::Bob];
|
||||
let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
|
||||
let err_msg = "no authority public key found in store".to_string();
|
||||
let expected = Err(Error::Keystore(err_msg));
|
||||
assert_eq!(worker.verify_validator_set(&1, &validator_set), expected);
|
||||
assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected);
|
||||
|
||||
// worker has no keystore
|
||||
worker.key_store = None.into();
|
||||
worker.base.key_store = None.into();
|
||||
let expected_err = Err(Error::Keystore("no Keystore".into()));
|
||||
assert_eq!(worker.verify_validator_set(&1, &validator_set), expected_err);
|
||||
assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected_err);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1634,7 +1835,7 @@ pub(crate) mod tests {
|
||||
|
||||
let mut net = BeefyTestNet::new(1);
|
||||
let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
|
||||
worker.runtime = api_alice.clone();
|
||||
worker.base.runtime = api_alice.clone();
|
||||
|
||||
// let there be a block with num = 1:
|
||||
let _ = net.peer(0).push_blocks(1, false);
|
||||
|
||||
Reference in New Issue
Block a user