mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 15:41:02 +00:00
sc-consensus-beefy: pump gossip engine while waiting for initialization conditions (#3435)
As part of BEEFY worker/voter initialization the task waits for certain chain and backend conditions to be fulfilled: - BEEFY consensus enabled on-chain & GRANDPA best finalized higher than on-chain BEEFY genesis block, - backend has synced headers for BEEFY mandatory blocks between best BEEFY and best GRANDPA. During this waiting time, any messages gossiped on the BEEFY topic for current chain get enqueued in the gossip engine, leading to RAM bloating and output warning/error messages when the wait time is non-negligible (like during a clean sync). This PR adds logic to pump the gossip engine while waiting for other things to make sure gossiped messages get consumed (practically discarded until worker is fully initialized). Also raises the warning threshold for enqueued messages from 10k to 100k. This is in line with the other gossip protocols on the node. Fixes https://github.com/paritytech/polkadot-sdk/issues/3390 --------- Signed-off-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
@@ -17,46 +17,42 @@
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::{
|
||||
aux_schema,
|
||||
communication::{
|
||||
gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator},
|
||||
gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage},
|
||||
peers::PeerReport,
|
||||
request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo},
|
||||
request_response::outgoing_requests_engine::ResponseInfo,
|
||||
},
|
||||
error::Error,
|
||||
expect_validator_set,
|
||||
find_authorities_change,
|
||||
justification::BeefyVersionedFinalityProof,
|
||||
keystore::BeefyKeystore,
|
||||
metric_inc, metric_set,
|
||||
metrics::VoterMetrics,
|
||||
round::{Rounds, VoteImportResult},
|
||||
wait_for_parent_header, BeefyVoterLinks, HEADER_SYNC_DELAY, LOG_TARGET,
|
||||
BeefyComms, BeefyVoterLinks, LOG_TARGET,
|
||||
};
|
||||
use codec::{Codec, Decode, DecodeAll, Encode};
|
||||
use futures::{stream::Fuse, FutureExt, StreamExt};
|
||||
use log::{debug, error, info, log_enabled, trace, warn};
|
||||
use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
|
||||
use sc_network_gossip::GossipEngine;
|
||||
use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver};
|
||||
use sc_utils::notification::NotificationReceiver;
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
|
||||
use sp_blockchain::Backend as BlockchainBackend;
|
||||
use sp_consensus::SyncOracle;
|
||||
use sp_consensus_beefy::{
|
||||
check_equivocation_proof,
|
||||
ecdsa_crypto::{AuthorityId, Signature},
|
||||
BeefyApi, BeefySignatureHasher, Commitment, ConsensusLog, EquivocationProof, PayloadProvider,
|
||||
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
|
||||
BeefyApi, BeefySignatureHasher, Commitment, EquivocationProof, PayloadProvider, ValidatorSet,
|
||||
VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
|
||||
};
|
||||
use sp_runtime::{
|
||||
generic::{BlockId, OpaqueDigestItemId},
|
||||
generic::BlockId,
|
||||
traits::{Block, Header, NumberFor, Zero},
|
||||
SaturatedConversion,
|
||||
};
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, VecDeque},
|
||||
fmt::Debug,
|
||||
marker::PhantomData,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
@@ -180,8 +176,8 @@ impl<B: Block> VoterOracle<B> {
|
||||
}
|
||||
}
|
||||
|
||||
// Check if an observed session can be added to the Oracle.
|
||||
fn can_add_session(&self, session_start: NumberFor<B>) -> bool {
|
||||
/// Check if an observed session can be added to the Oracle.
|
||||
pub fn can_add_session(&self, session_start: NumberFor<B>) -> bool {
|
||||
let latest_known_session_start =
|
||||
self.sessions.back().map(|session| session.session_start());
|
||||
Some(session_start) > latest_known_session_start
|
||||
@@ -319,229 +315,28 @@ impl<B: Block> PersistedState<B> {
|
||||
self.voting_oracle.best_grandpa_block_header = best_grandpa;
|
||||
}
|
||||
|
||||
pub fn voting_oracle(&self) -> &VoterOracle<B> {
|
||||
&self.voting_oracle
|
||||
}
|
||||
|
||||
pub(crate) fn gossip_filter_config(&self) -> Result<GossipFilterCfg<B>, Error> {
|
||||
let (start, end) = self.voting_oracle.accepted_interval()?;
|
||||
let validator_set = self.voting_oracle.current_validator_set()?;
|
||||
Ok(GossipFilterCfg { start, end, validator_set })
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper object holding BEEFY worker communication/gossip components.
|
||||
///
|
||||
/// These are created once, but will be reused if worker is restarted/reinitialized.
|
||||
pub(crate) struct BeefyComms<B: Block> {
|
||||
pub gossip_engine: GossipEngine<B>,
|
||||
pub gossip_validator: Arc<GossipValidator<B>>,
|
||||
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
|
||||
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
|
||||
}
|
||||
|
||||
pub(crate) struct BeefyWorkerBase<B: Block, BE, RuntimeApi> {
|
||||
// utilities
|
||||
pub backend: Arc<BE>,
|
||||
pub runtime: Arc<RuntimeApi>,
|
||||
pub key_store: BeefyKeystore<AuthorityId>,
|
||||
|
||||
/// BEEFY client metrics.
|
||||
pub metrics: Option<VoterMetrics>,
|
||||
|
||||
pub _phantom: PhantomData<B>,
|
||||
}
|
||||
|
||||
impl<B, BE, R> BeefyWorkerBase<B, BE, R>
|
||||
where
|
||||
B: Block + Codec,
|
||||
BE: Backend<B>,
|
||||
R: ProvideRuntimeApi<B>,
|
||||
R::Api: BeefyApi<B, AuthorityId>,
|
||||
{
|
||||
// If no persisted state present, walk back the chain from first GRANDPA notification to either:
|
||||
// - latest BEEFY finalized block, or if none found on the way,
|
||||
// - BEEFY pallet genesis;
|
||||
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to
|
||||
// finalize.
|
||||
async fn init_state(
|
||||
&self,
|
||||
beefy_genesis: NumberFor<B>,
|
||||
best_grandpa: <B as Block>::Header,
|
||||
min_block_delta: u32,
|
||||
) -> Result<PersistedState<B>, Error> {
|
||||
let blockchain = self.backend.blockchain();
|
||||
|
||||
let beefy_genesis = self
|
||||
.runtime
|
||||
.runtime_api()
|
||||
.beefy_genesis(best_grandpa.hash())
|
||||
.ok()
|
||||
.flatten()
|
||||
.filter(|genesis| *genesis == beefy_genesis)
|
||||
.ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?;
|
||||
// Walk back the imported blocks and initialize voter either, at the last block with
|
||||
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
|
||||
let mut sessions = VecDeque::new();
|
||||
let mut header = best_grandpa.clone();
|
||||
let state = loop {
|
||||
if let Some(true) = blockchain
|
||||
.justifications(header.hash())
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
|
||||
{
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
|
||||
*header.number()
|
||||
);
|
||||
let best_beefy = *header.number();
|
||||
// If no session boundaries detected so far, just initialize new rounds here.
|
||||
if sessions.is_empty() {
|
||||
let active_set =
|
||||
expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header)
|
||||
.await?;
|
||||
let mut rounds = Rounds::new(best_beefy, active_set);
|
||||
// Mark the round as already finalized.
|
||||
rounds.conclude(best_beefy);
|
||||
sessions.push_front(rounds);
|
||||
}
|
||||
let state = PersistedState::checked_new(
|
||||
best_grandpa,
|
||||
best_beefy,
|
||||
sessions,
|
||||
min_block_delta,
|
||||
beefy_genesis,
|
||||
)
|
||||
.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
|
||||
break state
|
||||
}
|
||||
|
||||
if *header.number() == beefy_genesis {
|
||||
// We've reached BEEFY genesis, initialize voter here.
|
||||
let genesis_set =
|
||||
expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header)
|
||||
.await?;
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
|
||||
Starting voting rounds at block {:?}, genesis validator set {:?}.",
|
||||
beefy_genesis,
|
||||
genesis_set,
|
||||
);
|
||||
|
||||
sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
|
||||
break PersistedState::checked_new(
|
||||
best_grandpa,
|
||||
Zero::zero(),
|
||||
sessions,
|
||||
min_block_delta,
|
||||
beefy_genesis,
|
||||
)
|
||||
.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?
|
||||
}
|
||||
|
||||
if let Some(active) = find_authorities_change::<B>(&header) {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Marking block {:?} as BEEFY Mandatory.",
|
||||
*header.number()
|
||||
);
|
||||
sessions.push_front(Rounds::new(*header.number(), active));
|
||||
}
|
||||
|
||||
// Move up the chain.
|
||||
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
|
||||
};
|
||||
|
||||
aux_schema::write_current_version(self.backend.as_ref())?;
|
||||
aux_schema::write_voter_state(self.backend.as_ref(), &state)?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
pub async fn load_or_init_state(
|
||||
&mut self,
|
||||
beefy_genesis: NumberFor<B>,
|
||||
best_grandpa: <B as Block>::Header,
|
||||
min_block_delta: u32,
|
||||
) -> Result<PersistedState<B>, Error> {
|
||||
// Initialize voter state from AUX DB if compatible.
|
||||
if let Some(mut state) = crate::aux_schema::load_persistent(self.backend.as_ref())?
|
||||
// Verify state pallet genesis matches runtime.
|
||||
.filter(|state| state.pallet_genesis() == beefy_genesis)
|
||||
{
|
||||
// Overwrite persisted state with current best GRANDPA block.
|
||||
state.set_best_grandpa(best_grandpa.clone());
|
||||
// Overwrite persisted data with newly provided `min_block_delta`.
|
||||
state.set_min_block_delta(min_block_delta);
|
||||
debug!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
|
||||
|
||||
// Make sure that all the headers that we need have been synced.
|
||||
let mut new_sessions = vec![];
|
||||
let mut header = best_grandpa.clone();
|
||||
while *header.number() > state.best_beefy() {
|
||||
if state.voting_oracle.can_add_session(*header.number()) {
|
||||
if let Some(active) = find_authorities_change::<B>(&header) {
|
||||
new_sessions.push((active, *header.number()));
|
||||
}
|
||||
}
|
||||
header =
|
||||
wait_for_parent_header(self.backend.blockchain(), header, HEADER_SYNC_DELAY)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Make sure we didn't miss any sessions during node restart.
|
||||
for (validator_set, new_session_start) in new_sessions.drain(..).rev() {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Handling missed BEEFY session after node restart: {:?}.",
|
||||
new_session_start
|
||||
);
|
||||
self.init_session_at(&mut state, validator_set, new_session_start);
|
||||
}
|
||||
return Ok(state)
|
||||
}
|
||||
|
||||
// No valid voter-state persisted, re-initialize from pallet genesis.
|
||||
self.init_state(beefy_genesis, best_grandpa, min_block_delta).await
|
||||
}
|
||||
|
||||
/// Verify `active` validator set for `block` against the key store
|
||||
///
|
||||
/// We want to make sure that we have _at least one_ key in our keystore that
|
||||
/// is part of the validator set, that's because if there are no local keys
|
||||
/// then we can't perform our job as a validator.
|
||||
///
|
||||
/// Note that for a non-authority node there will be no keystore, and we will
|
||||
/// return an error and don't check. The error can usually be ignored.
|
||||
fn verify_validator_set(
|
||||
&self,
|
||||
block: &NumberFor<B>,
|
||||
active: &ValidatorSet<AuthorityId>,
|
||||
) -> Result<(), Error> {
|
||||
let active: BTreeSet<&AuthorityId> = active.validators().iter().collect();
|
||||
|
||||
let public_keys = self.key_store.public_keys()?;
|
||||
let store: BTreeSet<&AuthorityId> = public_keys.iter().collect();
|
||||
|
||||
if store.intersection(&active).count() == 0 {
|
||||
let msg = "no authority public key found in store".to_string();
|
||||
debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg);
|
||||
metric_inc!(self.metrics, beefy_no_authority_found_in_store);
|
||||
Err(Error::Keystore(msg))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle session changes by starting new voting round for mandatory blocks.
|
||||
fn init_session_at(
|
||||
pub fn init_session_at(
|
||||
&mut self,
|
||||
persisted_state: &mut PersistedState<B>,
|
||||
validator_set: ValidatorSet<AuthorityId>,
|
||||
new_session_start: NumberFor<B>,
|
||||
validator_set: ValidatorSet<AuthorityId>,
|
||||
key_store: &BeefyKeystore<AuthorityId>,
|
||||
metrics: &Option<VoterMetrics>,
|
||||
) {
|
||||
debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set);
|
||||
|
||||
// BEEFY should finalize a mandatory block during each session.
|
||||
if let Ok(active_session) = persisted_state.voting_oracle.active_rounds() {
|
||||
if let Ok(active_session) = self.voting_oracle.active_rounds() {
|
||||
if !active_session.mandatory_done() {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
@@ -549,20 +344,20 @@ where
|
||||
validator_set.id(),
|
||||
active_session.validator_set_id(),
|
||||
);
|
||||
metric_inc!(self.metrics, beefy_lagging_sessions);
|
||||
metric_inc!(metrics, beefy_lagging_sessions);
|
||||
}
|
||||
}
|
||||
|
||||
if log_enabled!(target: LOG_TARGET, log::Level::Debug) {
|
||||
// verify the new validator set - only do it if we're also logging the warning
|
||||
let _ = self.verify_validator_set(&new_session_start, &validator_set);
|
||||
if verify_validator_set::<B>(&new_session_start, &validator_set, key_store).is_err() {
|
||||
metric_inc!(metrics, beefy_no_authority_found_in_store);
|
||||
}
|
||||
}
|
||||
|
||||
let id = validator_set.id();
|
||||
persisted_state
|
||||
.voting_oracle
|
||||
.add_session(Rounds::new(new_session_start, validator_set));
|
||||
metric_set!(self.metrics, beefy_validator_set_id, id);
|
||||
self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set));
|
||||
metric_set!(metrics, beefy_validator_set_id, id);
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 New Rounds for validator set id: {:?} with session_start {:?}",
|
||||
@@ -574,9 +369,10 @@ where
|
||||
|
||||
/// A BEEFY worker/voter that follows the BEEFY protocol
|
||||
pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
|
||||
pub base: BeefyWorkerBase<B, BE, RuntimeApi>,
|
||||
|
||||
// utils
|
||||
// utilities
|
||||
pub backend: Arc<BE>,
|
||||
pub runtime: Arc<RuntimeApi>,
|
||||
pub key_store: BeefyKeystore<AuthorityId>,
|
||||
pub payload_provider: P,
|
||||
pub sync: Arc<S>,
|
||||
|
||||
@@ -592,6 +388,8 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
|
||||
pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
|
||||
/// Persisted voter state.
|
||||
pub persisted_state: PersistedState<B>,
|
||||
/// BEEFY voter metrics
|
||||
pub metrics: Option<VoterMetrics>,
|
||||
}
|
||||
|
||||
impl<B, BE, P, R, S> BeefyWorker<B, BE, P, R, S>
|
||||
@@ -622,8 +420,12 @@ where
|
||||
validator_set: ValidatorSet<AuthorityId>,
|
||||
new_session_start: NumberFor<B>,
|
||||
) {
|
||||
self.base
|
||||
.init_session_at(&mut self.persisted_state, validator_set, new_session_start);
|
||||
self.persisted_state.init_session_at(
|
||||
new_session_start,
|
||||
validator_set,
|
||||
&self.key_store,
|
||||
&self.metrics,
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_finality_notification(
|
||||
@@ -639,8 +441,7 @@ where
|
||||
notification.tree_route,
|
||||
);
|
||||
|
||||
self.base
|
||||
.runtime
|
||||
self.runtime
|
||||
.runtime_api()
|
||||
.beefy_genesis(header.hash())
|
||||
.ok()
|
||||
@@ -654,7 +455,7 @@ where
|
||||
self.persisted_state.set_best_grandpa(header.clone());
|
||||
|
||||
// Check all (newly) finalized blocks for new session(s).
|
||||
let backend = self.base.backend.clone();
|
||||
let backend = self.backend.clone();
|
||||
for header in notification
|
||||
.tree_route
|
||||
.iter()
|
||||
@@ -673,7 +474,7 @@ where
|
||||
}
|
||||
|
||||
if new_session_added {
|
||||
crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
|
||||
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
|
||||
.map_err(|e| Error::Backend(e.to_string()))?;
|
||||
}
|
||||
|
||||
@@ -707,7 +508,7 @@ where
|
||||
true,
|
||||
);
|
||||
},
|
||||
RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_votes),
|
||||
RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_votes),
|
||||
RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
|
||||
};
|
||||
Ok(())
|
||||
@@ -727,23 +528,23 @@ where
|
||||
match self.voting_oracle().triage_round(block_num)? {
|
||||
RoundAction::Process => {
|
||||
debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num);
|
||||
metric_inc!(self.base.metrics, beefy_imported_justifications);
|
||||
metric_inc!(self.metrics, beefy_imported_justifications);
|
||||
self.finalize(justification)?
|
||||
},
|
||||
RoundAction::Enqueue => {
|
||||
debug!(target: LOG_TARGET, "🥩 Buffer justification for round: {:?}.", block_num);
|
||||
if self.pending_justifications.len() < MAX_BUFFERED_JUSTIFICATIONS {
|
||||
self.pending_justifications.entry(block_num).or_insert(justification);
|
||||
metric_inc!(self.base.metrics, beefy_buffered_justifications);
|
||||
metric_inc!(self.metrics, beefy_buffered_justifications);
|
||||
} else {
|
||||
metric_inc!(self.base.metrics, beefy_buffered_justifications_dropped);
|
||||
metric_inc!(self.metrics, beefy_buffered_justifications_dropped);
|
||||
warn!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Buffer justification dropped for round: {:?}.", block_num
|
||||
);
|
||||
}
|
||||
},
|
||||
RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_justifications),
|
||||
RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_justifications),
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
@@ -765,7 +566,7 @@ where
|
||||
// We created the `finality_proof` and know to be valid.
|
||||
// New state is persisted after finalization.
|
||||
self.finalize(finality_proof.clone())?;
|
||||
metric_inc!(self.base.metrics, beefy_good_votes_processed);
|
||||
metric_inc!(self.metrics, beefy_good_votes_processed);
|
||||
return Ok(Some(finality_proof))
|
||||
},
|
||||
VoteImportResult::Ok => {
|
||||
@@ -776,20 +577,17 @@ where
|
||||
.map(|(mandatory_num, _)| mandatory_num == block_number)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
crate::aux_schema::write_voter_state(
|
||||
&*self.base.backend,
|
||||
&self.persisted_state,
|
||||
)
|
||||
.map_err(|e| Error::Backend(e.to_string()))?;
|
||||
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
|
||||
.map_err(|e| Error::Backend(e.to_string()))?;
|
||||
}
|
||||
metric_inc!(self.base.metrics, beefy_good_votes_processed);
|
||||
metric_inc!(self.metrics, beefy_good_votes_processed);
|
||||
},
|
||||
VoteImportResult::Equivocation(proof) => {
|
||||
metric_inc!(self.base.metrics, beefy_equivocation_votes);
|
||||
metric_inc!(self.metrics, beefy_equivocation_votes);
|
||||
self.report_equivocation(proof)?;
|
||||
},
|
||||
VoteImportResult::Invalid => metric_inc!(self.base.metrics, beefy_invalid_votes),
|
||||
VoteImportResult::Stale => metric_inc!(self.base.metrics, beefy_stale_votes),
|
||||
VoteImportResult::Invalid => metric_inc!(self.metrics, beefy_invalid_votes),
|
||||
VoteImportResult::Stale => metric_inc!(self.metrics, beefy_stale_votes),
|
||||
};
|
||||
Ok(None)
|
||||
}
|
||||
@@ -816,15 +614,14 @@ where
|
||||
|
||||
// Set new best BEEFY block number.
|
||||
self.persisted_state.set_best_beefy(block_num);
|
||||
crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
|
||||
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
|
||||
.map_err(|e| Error::Backend(e.to_string()))?;
|
||||
|
||||
metric_set!(self.base.metrics, beefy_best_block, block_num);
|
||||
metric_set!(self.metrics, beefy_best_block, block_num);
|
||||
|
||||
self.comms.on_demand_justifications.cancel_requests_older_than(block_num);
|
||||
|
||||
if let Err(e) = self
|
||||
.base
|
||||
.backend
|
||||
.blockchain()
|
||||
.expect_block_hash_from_id(&BlockId::Number(block_num))
|
||||
@@ -834,8 +631,7 @@ where
|
||||
.notify(|| Ok::<_, ()>(hash))
|
||||
.expect("forwards closure result; the closure always returns Ok; qed.");
|
||||
|
||||
self.base
|
||||
.backend
|
||||
self.backend
|
||||
.append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode()))
|
||||
}) {
|
||||
debug!(
|
||||
@@ -872,13 +668,13 @@ where
|
||||
|
||||
for (num, justification) in justifs_to_process.into_iter() {
|
||||
debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num);
|
||||
metric_inc!(self.base.metrics, beefy_imported_justifications);
|
||||
metric_inc!(self.metrics, beefy_imported_justifications);
|
||||
if let Err(err) = self.finalize(justification) {
|
||||
error!(target: LOG_TARGET, "🥩 Error finalizing block: {}", err);
|
||||
}
|
||||
}
|
||||
metric_set!(
|
||||
self.base.metrics,
|
||||
self.metrics,
|
||||
beefy_buffered_justifications,
|
||||
self.pending_justifications.len()
|
||||
);
|
||||
@@ -890,7 +686,7 @@ where
|
||||
fn try_to_vote(&mut self) -> Result<(), Error> {
|
||||
// Vote if there's now a new vote target.
|
||||
if let Some(target) = self.voting_oracle().voting_target() {
|
||||
metric_set!(self.base.metrics, beefy_should_vote_on, target);
|
||||
metric_set!(self.metrics, beefy_should_vote_on, target);
|
||||
if target > self.persisted_state.best_voted {
|
||||
self.do_vote(target)?;
|
||||
}
|
||||
@@ -910,7 +706,6 @@ where
|
||||
self.persisted_state.voting_oracle.best_grandpa_block_header.clone()
|
||||
} else {
|
||||
let hash = self
|
||||
.base
|
||||
.backend
|
||||
.blockchain()
|
||||
.expect_block_hash_from_id(&BlockId::Number(target_number))
|
||||
@@ -922,7 +717,7 @@ where
|
||||
Error::Backend(err_msg)
|
||||
})?;
|
||||
|
||||
self.base.backend.blockchain().expect_header(hash).map_err(|err| {
|
||||
self.backend.blockchain().expect_header(hash).map_err(|err| {
|
||||
let err_msg = format!(
|
||||
"Couldn't get header for block #{:?} ({:?}) (error: {:?}), skipping vote..",
|
||||
target_number, hash, err
|
||||
@@ -942,7 +737,7 @@ where
|
||||
let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
|
||||
let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id());
|
||||
|
||||
let authority_id = if let Some(id) = self.base.key_store.authority_id(validators) {
|
||||
let authority_id = if let Some(id) = self.key_store.authority_id(validators) {
|
||||
debug!(target: LOG_TARGET, "🥩 Local authority id: {:?}", id);
|
||||
id
|
||||
} else {
|
||||
@@ -956,7 +751,7 @@ where
|
||||
let commitment = Commitment { payload, block_number: target_number, validator_set_id };
|
||||
let encoded_commitment = commitment.encode();
|
||||
|
||||
let signature = match self.base.key_store.sign(&authority_id, &encoded_commitment) {
|
||||
let signature = match self.key_store.sign(&authority_id, &encoded_commitment) {
|
||||
Ok(sig) => sig,
|
||||
Err(err) => {
|
||||
warn!(target: LOG_TARGET, "🥩 Error signing commitment: {:?}", err);
|
||||
@@ -981,7 +776,7 @@ where
|
||||
.gossip_engine
|
||||
.gossip_message(proofs_topic::<B>(), encoded_proof, true);
|
||||
} else {
|
||||
metric_inc!(self.base.metrics, beefy_votes_sent);
|
||||
metric_inc!(self.metrics, beefy_votes_sent);
|
||||
debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
|
||||
let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
|
||||
self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
|
||||
@@ -989,8 +784,8 @@ where
|
||||
|
||||
// Persist state after vote to avoid double voting in case of voter restarts.
|
||||
self.persisted_state.best_voted = target_number;
|
||||
metric_set!(self.base.metrics, beefy_best_voted, target_number);
|
||||
crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
|
||||
metric_set!(self.metrics, beefy_best_voted, target_number);
|
||||
crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
|
||||
.map_err(|e| Error::Backend(e.to_string()))
|
||||
}
|
||||
|
||||
@@ -1164,7 +959,7 @@ where
|
||||
if !check_equivocation_proof::<_, _, BeefySignatureHasher>(&proof) {
|
||||
debug!(target: LOG_TARGET, "🥩 Skip report for bad equivocation {:?}", proof);
|
||||
return Ok(())
|
||||
} else if let Some(local_id) = self.base.key_store.authority_id(validators) {
|
||||
} else if let Some(local_id) = self.key_store.authority_id(validators) {
|
||||
if offender_id == local_id {
|
||||
warn!(target: LOG_TARGET, "🥩 Skip equivocation report for own equivocation");
|
||||
return Ok(())
|
||||
@@ -1173,7 +968,6 @@ where
|
||||
|
||||
let number = *proof.round_number();
|
||||
let hash = self
|
||||
.base
|
||||
.backend
|
||||
.blockchain()
|
||||
.expect_block_hash_from_id(&BlockId::Number(number))
|
||||
@@ -1184,7 +978,7 @@ where
|
||||
);
|
||||
Error::Backend(err_msg)
|
||||
})?;
|
||||
let runtime_api = self.base.runtime.runtime_api();
|
||||
let runtime_api = self.runtime.runtime_api();
|
||||
// generate key ownership proof at that block
|
||||
let key_owner_proof = match runtime_api
|
||||
.generate_key_ownership_proof(hash, validator_set_id, offender_id)
|
||||
@@ -1201,7 +995,7 @@ where
|
||||
};
|
||||
|
||||
// submit equivocation report at **best** block
|
||||
let best_block_hash = self.base.backend.blockchain().info().best_hash;
|
||||
let best_block_hash = self.backend.blockchain().info().best_hash;
|
||||
runtime_api
|
||||
.submit_report_equivocation_unsigned_extrinsic(best_block_hash, proof, key_owner_proof)
|
||||
.map_err(Error::RuntimeApi)?;
|
||||
@@ -1210,21 +1004,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Scan the `header` digest log for a BEEFY validator set change. Return either the new
|
||||
/// validator set or `None` in case no validator set change has been signaled.
|
||||
pub(crate) fn find_authorities_change<B>(header: &B::Header) -> Option<ValidatorSet<AuthorityId>>
|
||||
where
|
||||
B: Block,
|
||||
{
|
||||
let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID);
|
||||
|
||||
let filter = |log: ConsensusLog<AuthorityId>| match log {
|
||||
ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set),
|
||||
_ => None,
|
||||
};
|
||||
header.digest().convert_first(|l| l.try_to(id).and_then(filter))
|
||||
}
|
||||
|
||||
/// Calculate next block number to vote on.
|
||||
///
|
||||
/// Return `None` if there is no votable target yet.
|
||||
@@ -1261,11 +1040,42 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Verify `active` validator set for `block` against the key store
|
||||
///
|
||||
/// We want to make sure that we have _at least one_ key in our keystore that
|
||||
/// is part of the validator set, that's because if there are no local keys
|
||||
/// then we can't perform our job as a validator.
|
||||
///
|
||||
/// Note that for a non-authority node there will be no keystore, and we will
|
||||
/// return an error and don't check. The error can usually be ignored.
|
||||
fn verify_validator_set<B: Block>(
|
||||
block: &NumberFor<B>,
|
||||
active: &ValidatorSet<AuthorityId>,
|
||||
key_store: &BeefyKeystore<AuthorityId>,
|
||||
) -> Result<(), Error> {
|
||||
let active: BTreeSet<&AuthorityId> = active.validators().iter().collect();
|
||||
|
||||
let public_keys = key_store.public_keys()?;
|
||||
let store: BTreeSet<&AuthorityId> = public_keys.iter().collect();
|
||||
|
||||
if store.intersection(&active).count() == 0 {
|
||||
let msg = "no authority public key found in store".to_string();
|
||||
debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg);
|
||||
Err(Error::Keystore(msg))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
communication::notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream},
|
||||
communication::{
|
||||
gossip::GossipValidator,
|
||||
notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream},
|
||||
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
|
||||
},
|
||||
tests::{
|
||||
create_beefy_keystore, get_beefy_streams, make_beefy_ids, BeefyPeer, BeefyTestNet,
|
||||
TestApi,
|
||||
@@ -1275,6 +1085,7 @@ pub(crate) mod tests {
|
||||
use futures::{future::poll_fn, task::Poll};
|
||||
use parking_lot::Mutex;
|
||||
use sc_client_api::{Backend as BackendT, HeaderBackend};
|
||||
use sc_network_gossip::GossipEngine;
|
||||
use sc_network_sync::SyncingService;
|
||||
use sc_network_test::TestNetFactory;
|
||||
use sp_blockchain::Backend as BlockchainBackendT;
|
||||
@@ -1283,7 +1094,7 @@ pub(crate) mod tests {
|
||||
known_payloads::MMR_ROOT_ID,
|
||||
mmr::MmrRootProvider,
|
||||
test_utils::{generate_equivocation_proof, Keyring},
|
||||
Payload, SignedCommitment,
|
||||
ConsensusLog, Payload, SignedCommitment,
|
||||
};
|
||||
use sp_runtime::traits::{Header as HeaderT, One};
|
||||
use substrate_test_runtime_client::{
|
||||
@@ -1292,10 +1103,6 @@ pub(crate) mod tests {
|
||||
};
|
||||
|
||||
impl<B: super::Block> PersistedState<B> {
|
||||
pub fn voting_oracle(&self) -> &VoterOracle<B> {
|
||||
&self.voting_oracle
|
||||
}
|
||||
|
||||
pub fn active_round(&self) -> Result<&Rounds<B>, Error> {
|
||||
self.voting_oracle.active_rounds()
|
||||
}
|
||||
@@ -1391,13 +1198,10 @@ pub(crate) mod tests {
|
||||
on_demand_justifications,
|
||||
};
|
||||
BeefyWorker {
|
||||
base: BeefyWorkerBase {
|
||||
backend,
|
||||
runtime: api,
|
||||
key_store: Some(keystore).into(),
|
||||
metrics,
|
||||
_phantom: Default::default(),
|
||||
},
|
||||
backend,
|
||||
runtime: api,
|
||||
key_store: Some(keystore).into(),
|
||||
metrics,
|
||||
payload_provider,
|
||||
sync: Arc::new(sync),
|
||||
links,
|
||||
@@ -1675,19 +1479,22 @@ pub(crate) mod tests {
|
||||
let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
|
||||
|
||||
// keystore doesn't contain other keys than validators'
|
||||
assert_eq!(worker.base.verify_validator_set(&1, &validator_set), Ok(()));
|
||||
assert_eq!(verify_validator_set::<Block>(&1, &validator_set, &worker.key_store), Ok(()));
|
||||
|
||||
// unknown `Bob` key
|
||||
let keys = &[Keyring::Bob];
|
||||
let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
|
||||
let err_msg = "no authority public key found in store".to_string();
|
||||
let expected = Err(Error::Keystore(err_msg));
|
||||
assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected);
|
||||
assert_eq!(verify_validator_set::<Block>(&1, &validator_set, &worker.key_store), expected);
|
||||
|
||||
// worker has no keystore
|
||||
worker.base.key_store = None.into();
|
||||
worker.key_store = None.into();
|
||||
let expected_err = Err(Error::Keystore("no Keystore".into()));
|
||||
assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected_err);
|
||||
assert_eq!(
|
||||
verify_validator_set::<Block>(&1, &validator_set, &worker.key_store),
|
||||
expected_err
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1839,7 +1646,7 @@ pub(crate) mod tests {
|
||||
|
||||
let mut net = BeefyTestNet::new(1);
|
||||
let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
|
||||
worker.base.runtime = api_alice.clone();
|
||||
worker.runtime = api_alice.clone();
|
||||
|
||||
// let there be a block with num = 1:
|
||||
let _ = net.peer(0).push_blocks(1, false);
|
||||
|
||||
Reference in New Issue
Block a user