// This file is part of Substrate.
// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//! # BABE (Blind Assignment for Blockchain Extension)
//!
//! BABE is a slot-based block production mechanism which uses a VRF PRNG to
//! randomly perform the slot allocation. On every slot, all the authorities
//! generate a new random number with the VRF function and if it is lower than a
//! given threshold (which is proportional to their weight/stake) they have a
//! right to produce a block. The proof of the VRF function execution will be
//! used by other peer to validate the legitimacy of the slot claim.
//!
//! The engine is also responsible for collecting entropy on-chain which will be
//! used to seed the given VRF PRNG. An epoch is a contiguous number of slots
//! under which we will be using the same authority set. During an epoch all VRF
//! outputs produced as a result of block production will be collected on an
//! on-chain randomness pool. Epoch changes are announced one epoch in advance,
//! i.e. when ending epoch N, we announce the parameters (randomness,
//! authorities, etc.) for epoch N+2.
//!
//! Since the slot assignment is randomized, it is possible that a slot is
//! assigned to multiple validators in which case we will have a temporary fork,
//! or that a slot is assigned to no validator in which case no block is
//! produced. Which means that block times are not deterministic.
//!
//! The protocol has a parameter `c` [0, 1] for which `1 - c` is the probability
//! of a slot being empty. The choice of this parameter affects the security of
//! the protocol relating to maximum tolerable network delays.
//!
//! In addition to the VRF-based slot assignment described above, which we will
//! call primary slots, the engine also supports a deterministic secondary slot
//! assignment. Primary slots take precedence over secondary slots, when
//! authoring the node starts by trying to claim a primary slot and falls back
//! to a secondary slot claim attempt. The secondary slot assignment is done
//! by picking the authority at index:
//!
//! `blake2_256(epoch_randomness ++ slot_number) % authorities_len`.
//!
//! The secondary slots supports either a `SecondaryPlain` or `SecondaryVRF`
//! variant. Comparing with `SecondaryPlain` variant, the `SecondaryVRF` variant
//! generates an additional VRF output. The output is not included in beacon
//! randomness, but can be consumed by parachains.
//!
//! The fork choice rule is weight-based, where weight equals the number of
//! primary blocks in the chain. We will pick the heaviest chain (more primary
//! blocks) and will go with the longest one in case of a tie.
//!
//! An in-depth description and analysis of the protocol can be found here:
//!
#![forbid(unsafe_code)]
#![warn(missing_docs)]
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use codec::{Decode, Encode};
use futures::{
channel::{
mpsc::{channel, Receiver, Sender},
oneshot,
},
prelude::*,
};
use log::{debug, info, log, trace, warn};
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use retain_mut::RetainMut;
use schnorrkel::SignatureError;
use sc_client_api::{
backend::AuxStore, AuxDataOperations, Backend as BackendT, BlockchainEvents,
FinalityNotification, PreCommitActions, ProvideUncles, UsageProvider,
};
use sc_consensus::{
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
StateAction,
},
import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
};
use sc_consensus_epochs::{
descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpochDescriptor,
};
use sc_consensus_slots::{
check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
SlotInfo, StorageChanges,
};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_application_crypto::AppKey;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{
Backend as _, Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult,
};
use sp_consensus::{
BlockOrigin, CacheKeyId, CanAuthorWith, Environment, Error as ConsensusError, Proposer,
SelectChain,
};
use sp_consensus_babe::inherents::BabeInherentData;
use sp_consensus_slots::{Slot, SlotDuration};
use sp_core::{crypto::ByteArray, ExecutionContext};
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
generic::{BlockId, OpaqueDigestItemId},
traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Saturating, Zero},
DigestItem,
};
pub use sc_consensus_slots::SlotProportion;
pub use sp_consensus::SyncOracle;
pub use sp_consensus_babe::{
digests::{
CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
PrimaryPreDigest, SecondaryPlainPreDigest,
},
AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
BabeEpochConfiguration, BabeGenesisConfiguration, ConsensusLog, BABE_ENGINE_ID,
VRF_OUTPUT_LENGTH,
};
pub use aux_schema::load_block_weight as block_weight;
mod migration;
mod verification;
pub mod authorship;
pub mod aux_schema;
#[cfg(test)]
mod tests;
/// BABE epoch information
#[derive(Decode, Encode, PartialEq, Eq, Clone, Debug)]
pub struct Epoch {
/// The epoch index.
pub epoch_index: u64,
/// The starting slot of the epoch.
pub start_slot: Slot,
/// The duration of this epoch.
pub duration: u64,
/// The authorities and their weights.
pub authorities: Vec<(AuthorityId, BabeAuthorityWeight)>,
/// Randomness for this epoch.
pub randomness: [u8; VRF_OUTPUT_LENGTH],
/// Configuration of the epoch.
pub config: BabeEpochConfiguration,
}
impl EpochT for Epoch {
type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
type Slot = Slot;
fn increment(
&self,
(descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
) -> Epoch {
Epoch {
epoch_index: self.epoch_index + 1,
start_slot: self.start_slot + self.duration,
duration: self.duration,
authorities: descriptor.authorities,
randomness: descriptor.randomness,
config,
}
}
fn start_slot(&self) -> Slot {
self.start_slot
}
fn end_slot(&self) -> Slot {
self.start_slot + self.duration
}
}
impl From for Epoch {
fn from(epoch: sp_consensus_babe::Epoch) -> Self {
Epoch {
epoch_index: epoch.epoch_index,
start_slot: epoch.start_slot,
duration: epoch.duration,
authorities: epoch.authorities,
randomness: epoch.randomness,
config: epoch.config,
}
}
}
impl Epoch {
/// Create the genesis epoch (epoch #0). This is defined to start at the slot of
/// the first block, so that has to be provided.
pub fn genesis(genesis_config: &BabeGenesisConfiguration, slot: Slot) -> Epoch {
Epoch {
epoch_index: 0,
start_slot: slot,
duration: genesis_config.epoch_length,
authorities: genesis_config.genesis_authorities.clone(),
randomness: genesis_config.randomness,
config: BabeEpochConfiguration {
c: genesis_config.c,
allowed_slots: genesis_config.allowed_slots,
},
}
}
}
/// Errors encountered by the babe authorship task.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Multiple BABE pre-runtime digests
#[error("Multiple BABE pre-runtime digests, rejecting!")]
MultiplePreRuntimeDigests,
/// No BABE pre-runtime digest found
#[error("No BABE pre-runtime digest found")]
NoPreRuntimeDigest,
/// Multiple BABE epoch change digests
#[error("Multiple BABE epoch change digests, rejecting!")]
MultipleEpochChangeDigests,
/// Multiple BABE config change digests
#[error("Multiple BABE config change digests, rejecting!")]
MultipleConfigChangeDigests,
/// Could not extract timestamp and slot
#[error("Could not extract timestamp and slot: {0}")]
Extraction(sp_consensus::Error),
/// Could not fetch epoch
#[error("Could not fetch epoch at {0:?}")]
FetchEpoch(B::Hash),
/// Header rejected: too far in the future
#[error("Header {0:?} rejected: too far in the future")]
TooFarInFuture(B::Hash),
/// Parent unavailable. Cannot import
#[error("Parent ({0}) of {1} unavailable. Cannot import")]
ParentUnavailable(B::Hash, B::Hash),
/// Slot number must increase
#[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
SlotMustIncrease(Slot, Slot),
/// Header has a bad seal
#[error("Header {0:?} has a bad seal")]
HeaderBadSeal(B::Hash),
/// Header is unsealed
#[error("Header {0:?} is unsealed")]
HeaderUnsealed(B::Hash),
/// Slot author not found
#[error("Slot author not found")]
SlotAuthorNotFound,
/// Secondary slot assignments are disabled for the current epoch.
#[error("Secondary slot assignments are disabled for the current epoch.")]
SecondarySlotAssignmentsDisabled,
/// Bad signature
#[error("Bad signature on {0:?}")]
BadSignature(B::Hash),
/// Invalid author: Expected secondary author
#[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
InvalidAuthor(AuthorityId, AuthorityId),
/// No secondary author expected.
#[error("No secondary author expected.")]
NoSecondaryAuthorExpected,
/// VRF verification of block by author failed
#[error("VRF verification of block by author {0:?} failed: threshold {1} exceeded")]
VRFVerificationOfBlockFailed(AuthorityId, u128),
/// VRF verification failed
#[error("VRF verification failed: {0:?}")]
VRFVerificationFailed(SignatureError),
/// Could not fetch parent header
#[error("Could not fetch parent header: {0}")]
FetchParentHeader(sp_blockchain::Error),
/// Expected epoch change to happen.
#[error("Expected epoch change to happen at {0:?}, s{1}")]
ExpectedEpochChange(B::Hash, Slot),
/// Unexpected config change.
#[error("Unexpected config change")]
UnexpectedConfigChange,
/// Unexpected epoch change
#[error("Unexpected epoch change")]
UnexpectedEpochChange,
/// Parent block has no associated weight
#[error("Parent block of {0} has no associated weight")]
ParentBlockNoAssociatedWeight(B::Hash),
/// Check inherents error
#[error("Checking inherents failed: {0}")]
CheckInherents(sp_inherents::Error),
/// Unhandled check inherents error
#[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
CheckInherentsUnhandled(sp_inherents::InherentIdentifier),
/// Create inherents error.
#[error("Creating inherents failed: {0}")]
CreateInherents(sp_inherents::Error),
/// Client error
#[error(transparent)]
Client(sp_blockchain::Error),
/// Runtime Api error.
#[error(transparent)]
RuntimeApi(sp_api::ApiError),
/// Fork tree error
#[error(transparent)]
ForkTree(Box>),
}
impl From> for String {
fn from(error: Error) -> String {
error.to_string()
}
}
fn babe_err(error: Error) -> Error {
debug!(target: "babe", "{}", error);
error
}
/// Intermediate value passed to block importer.
pub struct BabeIntermediate {
/// The epoch descriptor.
pub epoch_descriptor: ViableEpochDescriptor, Epoch>,
}
/// Intermediate key for Babe engine.
pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
/// Configuration for BABE used for defining block verification parameters as
/// well as authoring (e.g. the slot duration).
#[derive(Clone)]
pub struct Config {
genesis_config: BabeGenesisConfiguration,
}
impl Config {
/// Create a new config by reading the genesis configuration from the runtime.
pub fn get(client: &C) -> ClientResult
where
C: AuxStore + ProvideRuntimeApi + UsageProvider,
C::Api: BabeApi,
{
trace!(target: "babe", "Getting slot duration");
let mut best_block_id = BlockId::Hash(client.usage_info().chain.best_hash);
if client.usage_info().chain.finalized_state.is_none() {
debug!(target: "babe", "No finalized state is available. Reading config from genesis");
best_block_id = BlockId::Hash(client.usage_info().chain.genesis_hash);
}
let runtime_api = client.runtime_api();
let version = runtime_api.api_version::>(&best_block_id)?;
let genesis_config = if version == Some(1) {
#[allow(deprecated)]
{
runtime_api.configuration_before_version_2(&best_block_id)?.into()
}
} else if version == Some(2) {
runtime_api.configuration(&best_block_id)?
} else {
return Err(sp_blockchain::Error::VersionInvalid(
"Unsupported or invalid BabeApi version".to_string(),
))
};
Ok(Config { genesis_config })
}
/// Get the genesis configuration.
pub fn genesis_config(&self) -> &BabeGenesisConfiguration {
&self.genesis_config
}
/// Get the slot duration defined in the genesis configuration.
pub fn slot_duration(&self) -> SlotDuration {
SlotDuration::from_millis(self.genesis_config.slot_duration)
}
}
/// Parameters for BABE.
pub struct BabeParams {
/// The keystore that manages the keys of the node.
pub keystore: SyncCryptoStorePtr,
/// The client to use
pub client: Arc,
/// The SelectChain Strategy
pub select_chain: SC,
/// The environment we are producing blocks for.
pub env: E,
/// The underlying block-import object to supply our produced blocks to.
/// This must be a `BabeBlockImport` or a wrapper of it, otherwise
/// critical consensus logic will be omitted.
pub block_import: I,
/// A sync oracle
pub sync_oracle: SO,
/// Hook into the sync module to control the justification sync process.
pub justification_sync_link: L,
/// Something that can create the inherent data providers.
pub create_inherent_data_providers: CIDP,
/// Force authoring of blocks even if we are offline
pub force_authoring: bool,
/// Strategy and parameters for backing off block production.
pub backoff_authoring_blocks: Option,
/// The source of timestamps for relative slots
pub babe_link: BabeLink,
/// Checks if the current native implementation can author with a runtime at a given block.
pub can_author_with: CAW,
/// The proportion of the slot dedicated to proposing.
///
/// The block proposing will be limited to this proportion of the slot from the starting of the
/// slot. However, the proposing can still take longer when there is some lenience factor
/// applied, because there were no blocks produced for some slots.
pub block_proposal_slot_portion: SlotProportion,
/// The maximum proportion of the slot dedicated to proposing with any lenience factor applied
/// due to no blocks being produced.
pub max_block_proposal_slot_portion: Option,
/// Handle use to report telemetries.
pub telemetry: Option,
}
/// Start the babe worker.
pub fn start_babe(
BabeParams {
keystore,
client,
select_chain,
env,
block_import,
sync_oracle,
justification_sync_link,
create_inherent_data_providers,
force_authoring,
backoff_authoring_blocks,
babe_link,
can_author_with,
block_proposal_slot_portion,
max_block_proposal_slot_portion,
telemetry,
}: BabeParams,
) -> Result, sp_consensus::Error>
where
B: BlockT,
C: ProvideRuntimeApi
+ ProvideUncles
+ BlockchainEvents
+ PreCommitActions
+ HeaderBackend
+ HeaderMetadata
+ Send
+ Sync
+ 'static,
C::Api: BabeApi,
SC: SelectChain + 'static,
E: Environment + Send + Sync + 'static,
E::Proposer: Proposer>,
I: BlockImport>
+ Send
+ Sync
+ 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
L: sc_consensus::JustificationSyncLink + 'static,
CIDP: CreateInherentDataProviders + Send + Sync + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
BS: BackoffAuthoringBlocksStrategy> + Send + Sync + 'static,
CAW: CanAuthorWith + Send + Sync + 'static,
Error: std::error::Error + Send + From + From + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
let worker = BabeSlotWorker {
client: client.clone(),
block_import,
env,
sync_oracle: sync_oracle.clone(),
justification_sync_link,
force_authoring,
backoff_authoring_blocks,
keystore,
epoch_changes: babe_link.epoch_changes.clone(),
slot_notification_sinks: slot_notification_sinks.clone(),
config: babe_link.config.clone(),
block_proposal_slot_portion,
max_block_proposal_slot_portion,
telemetry,
};
info!(target: "babe", "👶 Starting BABE Authorship worker");
let slot_worker = sc_consensus_slots::start_slot_worker(
babe_link.config.slot_duration(),
select_chain,
sc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
sync_oracle,
create_inherent_data_providers,
can_author_with,
);
let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
let answer_requests =
answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
let inner = future::select(Box::pin(slot_worker), Box::pin(answer_requests));
Ok(BabeWorker {
inner: Box::pin(inner.map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
}
// Remove obsolete block's weight data by leveraging finality notifications.
// This includes data for all finalized blocks (excluding the most recent one)
// and all stale branches.
fn aux_storage_cleanup + HeaderBackend, Block: BlockT>(
client: &C,
notification: &FinalityNotification,
) -> AuxDataOperations {
let mut aux_keys = HashSet::new();
let first = notification.tree_route.first().unwrap_or(¬ification.hash);
match client.header_metadata(*first) {
Ok(meta) => {
aux_keys.insert(aux_schema::block_weight_key(meta.parent));
},
Err(err) => warn!(
target: "babe",
"Failed to lookup metadata for block `{:?}`: {}",
first,
err,
),
}
// Cleans data for finalized block's ancestors
aux_keys.extend(
notification
.tree_route
.iter()
// Ensure we don't prune latest finalized block.
// This should not happen, but better be safe than sorry!
.filter(|h| **h != notification.hash)
.map(aux_schema::block_weight_key),
);
// Cleans data for stale branches.
for head in notification.stale_heads.iter() {
let mut hash = *head;
// Insert stale blocks hashes until canonical chain is reached.
// If we reach a block that is already part of the `aux_keys` we can stop the processing the
// head.
while aux_keys.insert(aux_schema::block_weight_key(hash)) {
match client.header_metadata(hash) {
Ok(meta) => {
hash = meta.parent;
// If the parent is part of the canonical chain or there doesn't exist a block
// hash for the parent number (bug?!), we can abort adding blocks.
if client
.hash(meta.number.saturating_sub(1u32.into()))
.ok()
.flatten()
.map_or(true, |h| h == hash)
{
break
}
},
Err(err) => {
warn!(
target: "babe",
"Header lookup fail while cleaning data for block {:?}: {}",
hash,
err,
);
break
},
}
}
}
aux_keys.into_iter().map(|val| (val, None)).collect()
}
async fn answer_requests(
mut request_rx: Receiver>,
config: Config,
client: Arc,
epoch_changes: SharedEpochChanges,
) where
C: ProvideRuntimeApi
+ ProvideUncles
+ BlockchainEvents
+ HeaderBackend
+ HeaderMetadata
+ Send
+ Sync
+ 'static,
{
while let Some(request) = request_rx.next().await {
match request {
BabeRequest::EpochForChild(parent_hash, parent_number, slot_number, response) => {
let lookup = || {
let epoch_changes = epoch_changes.shared_data();
let epoch_descriptor = epoch_changes
.epoch_descriptor_for_child_of(
descendent_query(&*client),
&parent_hash,
parent_number,
slot_number,
)
.map_err(|e| Error::::ForkTree(Box::new(e)))?
.ok_or(Error::::FetchEpoch(parent_hash))?;
let viable_epoch = epoch_changes
.viable_epoch(&epoch_descriptor, |slot| {
Epoch::genesis(&config.genesis_config, slot)
})
.ok_or(Error::::FetchEpoch(parent_hash))?;
Ok(sp_consensus_babe::Epoch {
epoch_index: viable_epoch.as_ref().epoch_index,
start_slot: viable_epoch.as_ref().start_slot,
duration: viable_epoch.as_ref().duration,
authorities: viable_epoch.as_ref().authorities.clone(),
randomness: viable_epoch.as_ref().randomness,
config: viable_epoch.as_ref().config.clone(),
})
};
let _ = response.send(lookup());
},
}
}
}
/// Requests to the BABE service.
#[non_exhaustive]
pub enum BabeRequest {
/// Request the epoch that a child of the given block, with the given slot number would have.
///
/// The parent block is identified by its hash and number.
EpochForChild(
B::Hash,
NumberFor,
Slot,
oneshot::Sender>>,
),
}
/// A handle to the BABE worker for issuing requests.
#[derive(Clone)]
pub struct BabeWorkerHandle(Sender>);
impl BabeWorkerHandle {
/// Send a request to the BABE service.
pub async fn send(&mut self, request: BabeRequest) {
// Failure to send means that the service is down.
// This will manifest as the receiver of the request being dropped.
let _ = self.0.send(request).await;
}
}
/// Worker for Babe which implements `Future