// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! # BABE consensus //! //! BABE (Blind Assignment for Blockchain Extension) consensus in Substrate. #![forbid(unsafe_code, missing_docs)] pub use babe_primitives::*; pub use consensus_common::SyncOracle; use std::{collections::HashMap, sync::Arc, u64, fmt::{Debug, Display}, pin::Pin, time::{Instant, Duration}}; use babe_primitives; use consensus_common::ImportResult; use consensus_common::import_queue::{ BoxJustificationImport, BoxFinalityProofImport, }; use consensus_common::well_known_cache_keys::Id as CacheKeyId; use sr_primitives::{generic, generic::{BlockId, OpaqueDigestItemId}, Justification}; use sr_primitives::traits::{ Block as BlockT, Header, DigestItemFor, NumberFor, ProvideRuntimeApi, SimpleBitOps, Zero, }; use keystore::KeyStorePtr; use runtime_support::serde::{Serialize, Deserialize}; use codec::{Decode, Encode}; use parking_lot::{Mutex, MutexGuard}; use primitives::{Blake2Hasher, H256, Pair, Public}; use merlin::Transcript; use inherents::{InherentDataProviders, InherentData}; use substrate_telemetry::{ telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO, }; use schnorrkel::{ keys::Keypair, vrf::{ VRFProof, VRFProofBatchable, VRFInOut, }, }; use consensus_common::{ self, BlockImport, Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, Error as ConsensusError, }; use srml_babe::{ BabeInherentData, timestamp::{TimestampInherentData, InherentType as TimestampInherent} }; use consensus_common::{SelectChain, well_known_cache_keys}; use consensus_common::import_queue::{Verifier, BasicQueue}; use client::{ block_builder::api::BlockBuilder as BlockBuilderApi, blockchain::{self, HeaderBackend, ProvideCache}, BlockchainEvents, CallExecutor, Client, runtime_api::ApiExt, error::Result as ClientResult, backend::{AuxStore, Backend}, utils::is_descendent_of, }; use fork_tree::ForkTree; use slots::{CheckedHeader, check_equivocation}; use futures::{prelude::*, future}; use futures01::Stream as _; use futures_timer::Delay; use log::{error, warn, debug, info, trace}; use slots::{SlotWorker, SlotData, SlotInfo, SlotCompatible, SignedDuration}; mod aux_schema; #[cfg(test)] mod tests; pub use babe_primitives::AuthorityId; /// A slot duration. Create with `get_or_compute`. // FIXME: Once Rust has higher-kinded types, the duplication between this // and `super::babe::Config` can be eliminated. // https://github.com/paritytech/substrate/issues/2434 pub struct Config(slots::SlotDuration); impl Config { /// Either fetch the slot duration from disk or compute it from the genesis /// state. pub fn get_or_compute(client: &C) -> ClientResult where C: AuxStore + ProvideRuntimeApi, C::Api: BabeApi, { trace!(target: "babe", "Getting slot duration"); match slots::SlotDuration::get_or_compute(client, |a, b| a.startup_data(b)).map(Self) { Ok(s) => Ok(s), Err(s) => { warn!(target: "babe", "Failed to get slot duration"); Err(s) } } } /// Get the slot duration in milliseconds. pub fn get(&self) -> u64 { self.0.slot_duration } /// Retrieve the threshold calculation constant `c`. pub fn c(&self) -> (u64, u64) { self.0.c } } impl SlotCompatible for BabeLink { fn extract_timestamp_and_slot( &self, data: &InherentData, ) -> Result<(TimestampInherent, u64, std::time::Duration), consensus_common::Error> { trace!(target: "babe", "extract timestamp"); data.timestamp_inherent_data() .and_then(|t| data.babe_inherent_data().map(|a| (t, a))) .map_err(Into::into) .map_err(consensus_common::Error::InherentData) .map(|(x, y)| (x, y, self.0.lock().0.take().unwrap_or_default())) } } /// Parameters for BABE. pub struct BabeParams { /// The configuration for BABE. Includes the slot duration, threshold, and /// other parameters. pub config: Config, /// The keystore that manages the keys of the node. pub keystore: KeyStorePtr, /// The client to use pub client: Arc, /// The SelectChain Strategy pub select_chain: SC, /// A block importer pub block_import: I, /// The environment pub env: E, /// A sync oracle pub sync_oracle: SO, /// Providers for inherent data. pub inherent_data_providers: InherentDataProviders, /// Force authoring of blocks even if we are offline pub force_authoring: bool, /// The source of timestamps for relative slots pub time_source: BabeLink, } /// Start the babe worker. The returned future should be run in a tokio runtime. pub fn start_babe(BabeParams { config, client, keystore, select_chain, block_import, env, sync_oracle, inherent_data_providers, force_authoring, time_source, }: BabeParams) -> Result< impl futures01::Future, consensus_common::Error, > where B: BlockT, C: ProvideRuntimeApi + ProvideCache, C::Api: BabeApi, SC: SelectChain, E::Proposer: Proposer, >::Create: Unpin + Send + 'static, H: Header, E: Environment, I: BlockImport + Send + Sync + 'static, Error: std::error::Error + Send + From<::consensus_common::Error> + From + 'static, SO: SyncOracle + Send + Sync + Clone, { let worker = BabeWorker { client: client.clone(), block_import: Arc::new(Mutex::new(block_import)), env, sync_oracle: sync_oracle.clone(), force_authoring, c: config.c(), keystore, }; register_babe_inherent_data_provider(&inherent_data_providers, config.0.slot_duration())?; Ok(slots::start_slot_worker( config.0, select_chain, worker, sync_oracle, inherent_data_providers, time_source, ).map(|()| Ok::<(), ()>(())).compat()) } struct BabeWorker { client: Arc, block_import: Arc>, env: E, sync_oracle: SO, force_authoring: bool, c: (u64, u64), keystore: KeyStorePtr, } impl SlotWorker for BabeWorker where B: BlockT, C: ProvideRuntimeApi + ProvideCache, C::Api: BabeApi, E: Environment, E::Proposer: Proposer, >::Create: Unpin + Send + 'static, Hash: Debug + Eq + Copy + SimpleBitOps + Encode + Decode + Serialize + for<'de> Deserialize<'de> + Debug + Default + AsRef<[u8]> + AsMut<[u8]> + std::hash::Hash + Display + Send + Sync + 'static, H: Header, I: BlockImport + Send + Sync + 'static, SO: SyncOracle + Send + Clone, Error: std::error::Error + Send + From<::consensus_common::Error> + From + 'static, { type OnSlot = Pin> + Send>>; fn on_slot( &mut self, chain_head: B::Header, slot_info: SlotInfo, ) -> Self::OnSlot { let ref client = self.client; let block_import = self.block_import.clone(); let (timestamp, slot_number, slot_duration) = (slot_info.timestamp, slot_info.number, slot_info.duration); let epoch = match epoch(client.as_ref(), &BlockId::Hash(chain_head.hash())) { Ok(authorities) => authorities, Err(e) => { error!( target: "babe", "Unable to fetch authorities at block {:?}: {:?}", chain_head.hash(), e ); telemetry!(CONSENSUS_WARN; "babe.unable_fetching_authorities"; "slot" => ?chain_head.hash(), "err" => ?e ); return Box::pin(future::ready(Ok(()))); } }; let Epoch { ref authorities, .. } = epoch; if authorities.is_empty() { error!(target: "babe", "No authorities at block {:?}", chain_head.hash()); } if !self.force_authoring && self.sync_oracle.is_offline() && authorities.len() > 1 { debug!(target: "babe", "Skipping proposal slot. Waiting for the network."); telemetry!(CONSENSUS_DEBUG; "babe.skipping_proposal_slot"; "authorities_len" => authorities.len() ); return Box::pin(future::ready(Ok(()))); } let proposal_work = if let Some(claim) = claim_slot( slot_info.number, epoch, self.c, &self.keystore, ) { let ((inout, vrf_proof, _batchable_proof), authority_index, key) = claim; debug!( target: "babe", "Starting authorship at slot {}; timestamp = {}", slot_number, timestamp, ); telemetry!(CONSENSUS_DEBUG; "babe.starting_authorship"; "slot_number" => slot_number, "timestamp" => timestamp ); // we are the slot author. make a block and sign it. let mut proposer = match self.env.init(&chain_head) { Ok(p) => p, Err(e) => { warn!(target: "babe", "Unable to author block in slot {:?}: {:?}", slot_number, e, ); telemetry!(CONSENSUS_WARN; "babe.unable_authoring_block"; "slot" => slot_number, "err" => ?e ); return Box::pin(future::ready(Ok(()))) } }; let inherent_digest = BabePreDigest { vrf_proof, vrf_output: inout.to_output(), authority_index: authority_index as u32, slot_number, }; // deadline our production to approx. the end of the slot let remaining_duration = slot_info.remaining_duration(); futures::future::select( proposer.propose( slot_info.inherent_data, generic::Digest { logs: vec![ generic::DigestItem::babe_pre_digest(inherent_digest.clone()), ], }, remaining_duration, ).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()), Delay::new(remaining_duration) .map_err(|err| consensus_common::Error::FaultyTimer(err).into()) ).map(|v| match v { futures::future::Either::Left((v, _)) => v.map(|v| (v, key)), futures::future::Either::Right((Ok(_), _)) => Err(consensus_common::Error::ClientImport("Timeout in the BaBe proposer".into())), futures::future::Either::Right((Err(err), _)) => Err(err), }) } else { return Box::pin(future::ready(Ok(()))); }; Box::pin(proposal_work.map_ok(move |(b, key)| { // minor hack since we don't have access to the timestamp // that is actually set by the proposer. let slot_after_building = SignedDuration::default().slot_now(slot_duration); if slot_after_building != slot_number { info!( target: "babe", "Discarding proposal for slot {}; block production took too long", slot_number ); telemetry!(CONSENSUS_INFO; "babe.discarding_proposal_took_too_long"; "slot" => slot_number ); return; } let (header, body) = b.deconstruct(); let header_num = header.number().clone(); let parent_hash = header.parent_hash().clone(); // sign the pre-sealed hash of the block and then // add it to a digest item. let header_hash = header.hash(); let signature = key.sign(header_hash.as_ref()); let signature_digest_item = DigestItemFor::::babe_seal(signature); let import_block = BlockImportParams:: { origin: BlockOrigin::Own, header, justification: None, post_digests: vec![signature_digest_item], body: Some(body), finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, }; info!(target: "babe", "Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", header_num, import_block.post_header().hash(), header_hash, ); telemetry!(CONSENSUS_INFO; "babe.pre_sealed_block"; "header_num" => ?header_num, "hash_now" => ?import_block.post_header().hash(), "hash_previously" => ?header_hash, ); if let Err(e) = block_import.lock().import_block(import_block, Default::default()) { warn!(target: "babe", "Error with block built on {:?}: {:?}", parent_hash, e); telemetry!(CONSENSUS_WARN; "babe.err_with_block_built_on"; "hash" => ?parent_hash, "err" => ?e ); } })) } } macro_rules! babe_err { ($($i: expr),+) => { { debug!(target: "babe", $($i),+) ; format!($($i),+) } }; } /// Extract the BABE pre digest from the given header. Pre-runtime digests are /// mandatory, the function will return `Err` if none is found. fn find_pre_digest(header: &B::Header) -> Result where DigestItemFor: CompatibleDigestItem, { let mut pre_digest: Option<_> = None; for log in header.digest().logs() { trace!(target: "babe", "Checking log {:?}, looking for pre runtime digest", log); match (log.as_babe_pre_digest(), pre_digest.is_some()) { (Some(_), true) => Err(babe_err!("Multiple BABE pre-runtime digests, rejecting!"))?, (None, _) => trace!(target: "babe", "Ignoring digest not meant for us"), (s, false) => pre_digest = s, } } pre_digest.ok_or_else(|| babe_err!("No BABE pre-runtime digest found")) } /// Extract the BABE epoch change digest from the given header, if it exists. fn find_next_epoch_digest(header: &B::Header) -> Result, String> where DigestItemFor: CompatibleDigestItem, { let mut epoch_digest: Option<_> = None; for log in header.digest().logs() { trace!(target: "babe", "Checking log {:?}, looking for epoch change digest.", log); let log = log.try_to::(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID)); match (log, epoch_digest.is_some()) { (Some(ConsensusLog::NextEpochData(_)), true) => Err(babe_err!("Multiple BABE epoch change digests, rejecting!"))?, (Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch), _ => trace!(target: "babe", "Ignoring digest not meant for us"), } } Ok(epoch_digest) } /// Check a header has been signed by the right key. If the slot is too far in /// the future, an error will be returned. If successful, returns the pre-header /// and the digest item containing the seal. /// /// The seal must be the last digest. Otherwise, the whole header is considered /// unsigned. This is required for security and must not be changed. /// /// This digest item will always return `Some` when used with `as_babe_pre_digest`. // FIXME #1018 needs misbehavior types fn check_header( client: &C, slot_now: u64, mut header: B::Header, hash: B::Hash, authorities: &[(AuthorityId, BabeWeight)], randomness: [u8; 32], epoch_index: u64, c: (u64, u64), ) -> Result, DigestItemFor)>, String> where DigestItemFor: CompatibleDigestItem, { trace!(target: "babe", "Checking header"); let seal = match header.digest_mut().pop() { Some(x) => x, None => return Err(babe_err!("Header {:?} is unsealed", hash)), }; let sig = seal.as_babe_seal().ok_or_else(|| { babe_err!("Header {:?} has a bad seal", hash) })?; let pre_digest = find_pre_digest::(&header)?; let BabePreDigest { slot_number, authority_index, ref vrf_proof, ref vrf_output } = pre_digest; if slot_number > slot_now { header.digest_mut().push(seal); Ok(CheckedHeader::Deferred(header, slot_number)) } else if authority_index > authorities.len() as u32 { Err(babe_err!("Slot author not found")) } else { let (pre_hash, author) = (header.hash(), &authorities[authority_index as usize].0); if AuthorityPair::verify(&sig, pre_hash, &author) { let (inout, _batchable_proof) = { let transcript = make_transcript( &randomness, slot_number, epoch_index, ); schnorrkel::PublicKey::from_bytes(author.as_slice()).and_then(|p| { p.vrf_verify(transcript, vrf_output, vrf_proof) }).map_err(|s| { babe_err!("VRF verification failed: {:?}", s) })? }; let threshold = calculate_threshold(c, authorities, authority_index as usize); if !check(&inout, threshold) { return Err(babe_err!("VRF verification of block by author {:?} failed: \ threshold {} exceeded", author, threshold)); } if let Some(equivocation_proof) = check_equivocation( client, slot_now, slot_number, &header, author, ).map_err(|e| e.to_string())? { info!( "Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}", author, slot_number, equivocation_proof.fst_header().hash(), equivocation_proof.snd_header().hash(), ); } let pre_digest = CompatibleDigestItem::babe_pre_digest(pre_digest); Ok(CheckedHeader::Checked(header, (pre_digest, seal))) } else { Err(babe_err!("Bad signature on {:?}", hash)) } } } /// State that must be shared between the import queue and the authoring logic. #[derive(Default, Clone, Debug)] pub struct BabeLink(Arc, Vec<(Instant, u64)>)>>); /// A verifier for Babe blocks. pub struct BabeVerifier { api: Arc, inherent_data_providers: inherents::InherentDataProviders, config: Config, time_source: BabeLink, } impl BabeVerifier { fn check_inherents( &self, block: B, block_id: BlockId, inherent_data: InherentData, ) -> Result<(), String> where C: ProvideRuntimeApi, C::Api: BlockBuilderApi { let inherent_res = self.api.runtime_api().check_inherents( &block_id, block, inherent_data, ).map_err(|e| format!("{:?}", e))?; if !inherent_res.ok() { inherent_res .into_errors() .try_for_each(|(i, e)| { Err(self.inherent_data_providers.error_to_string(&i, &e)) }) } else { Ok(()) } } } #[allow(dead_code)] fn median_algorithm( median_required_blocks: u64, slot_duration: u64, slot_number: u64, slot_now: u64, time_source: &mut (Option, Vec<(Instant, u64)>), ) { let num_timestamps = time_source.1.len(); if num_timestamps as u64 >= median_required_blocks && median_required_blocks > 0 { let mut new_list: Vec<_> = time_source.1.iter().map(|&(t, sl)| { let offset: u128 = u128::from(slot_duration) .checked_mul(1_000_000u128) // self.config.get() returns *milliseconds* .and_then(|x| { x.checked_mul(u128::from(slot_number).saturating_sub(u128::from(sl))) }) .expect("we cannot have timespans long enough for this to overflow; qed"); const NANOS_PER_SEC: u32 = 1_000_000_000; let nanos = (offset % u128::from(NANOS_PER_SEC)) as u32; let secs = (offset / u128::from(NANOS_PER_SEC)) as u64; t + Duration::new(secs, nanos) }).collect(); // FIXME #2926: use a selection algorithm instead of a full sorting algorithm. new_list.sort_unstable(); let &median = new_list .get(num_timestamps / 2) .expect("we have at least one timestamp, so this is a valid index; qed"); let now = Instant::now(); if now >= median { time_source.0.replace(now - median); } time_source.1.clear(); } else { time_source.1.push((Instant::now(), slot_now)) } } impl Verifier for BabeVerifier where C: ProvideRuntimeApi + Send + Sync + AuxStore + ProvideCache, C::Api: BlockBuilderApi + BabeApi, { fn verify( &mut self, origin: BlockOrigin, header: B::Header, justification: Option, mut body: Option>, ) -> Result<(BlockImportParams, Option)>>), String> { trace!( target: "babe", "Verifying origin: {:?} header: {:?} justification: {:?} body: {:?}", origin, header, justification, body, ); debug!(target: "babe", "We have {:?} logs in this header", header.digest().logs().len()); let mut inherent_data = self .inherent_data_providers .create_inherent_data() .map_err(String::from)?; let (_, slot_now, _) = self.time_source.extract_timestamp_and_slot(&inherent_data) .map_err(|e| format!("Could not extract timestamp and slot: {:?}", e))?; let hash = header.hash(); let parent_hash = *header.parent_hash(); let Epoch { authorities, randomness, epoch_index, .. } = epoch(self.api.as_ref(), &BlockId::Hash(parent_hash)) .map_err(|e| format!("Could not fetch epoch at {:?}: {:?}", parent_hash, e))?; // We add one to allow for some small drift. // FIXME #1019 in the future, alter this queue to allow deferring of headers let checked_header = check_header::( &self.api, slot_now + 1, header, hash, &authorities, randomness, epoch_index, self.config.c(), )?; match checked_header { CheckedHeader::Checked(pre_header, (pre_digest, seal)) => { let BabePreDigest { slot_number, .. } = pre_digest.as_babe_pre_digest() .expect("check_header always returns a pre-digest digest item; qed"); // if the body is passed through, we need to use the runtime // to check that the internally-set timestamp in the inherents // actually matches the slot set in the seal. if let Some(inner_body) = body.take() { inherent_data.babe_replace_inherent_data(slot_number); let block = B::new(pre_header.clone(), inner_body); self.check_inherents( block.clone(), BlockId::Hash(parent_hash), inherent_data, )?; let (_, inner_body) = block.deconstruct(); body = Some(inner_body); } trace!(target: "babe", "Checked {:?}; importing.", pre_header); telemetry!( CONSENSUS_TRACE; "babe.checked_and_importing"; "pre_header" => ?pre_header); let import_block = BlockImportParams { origin, header: pre_header, post_digests: vec![seal], body, finalized: false, justification, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, }; Ok((import_block, Default::default())) } CheckedHeader::Deferred(a, b) => { debug!(target: "babe", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); telemetry!(CONSENSUS_DEBUG; "babe.header_too_far_in_future"; "hash" => ?hash, "a" => ?a, "b" => ?b ); Err(format!("Header {:?} rejected: too far in the future", hash)) } } } } /// Extract current epoch data from cache and fallback to querying the runtime /// if the cache isn't populated. fn epoch(client: &C, at: &BlockId) -> Result where B: BlockT, C: ProvideRuntimeApi + ProvideCache, C::Api: BabeApi, { client .cache() .and_then(|cache| cache.get_at(&well_known_cache_keys::EPOCH, at) .and_then(|v| Decode::decode(&mut &v[..]).ok())) .or_else(|| { if client.runtime_api().has_api::>(at).unwrap_or(false) { let s = BabeApi::epoch(&*client.runtime_api(), at).ok()?; if s.authorities.is_empty() { error!("No authorities!"); None } else { Some(s) } } else { error!("bad api!"); None } }).ok_or(consensus_common::Error::InvalidAuthoritiesSet) } /// The BABE import queue type. pub type BabeImportQueue = BasicQueue; /// Register the babe inherent data provider, if not registered already. fn register_babe_inherent_data_provider( inherent_data_providers: &InherentDataProviders, slot_duration: u64, ) -> Result<(), consensus_common::Error> { debug!(target: "babe", "Registering"); if !inherent_data_providers.has_provider(&srml_babe::INHERENT_IDENTIFIER) { inherent_data_providers .register_provider(srml_babe::InherentDataProvider::new(slot_duration)) .map_err(Into::into) .map_err(consensus_common::Error::InherentData) } else { Ok(()) } } fn get_keypair(q: &AuthorityPair) -> &Keypair { use primitives::crypto::IsWrappedBy; primitives::sr25519::Pair::from_ref(q).as_ref() } #[allow(deprecated)] fn make_transcript( randomness: &[u8], slot_number: u64, epoch: u64, ) -> Transcript { let mut transcript = Transcript::new(&BABE_ENGINE_ID); transcript.commit_bytes(b"slot number", &slot_number.to_le_bytes()); transcript.commit_bytes(b"current epoch", &epoch.to_le_bytes()); transcript.commit_bytes(b"chain randomness", randomness); transcript } fn check(inout: &VRFInOut, threshold: u128) -> bool { u128::from_le_bytes(inout.make_bytes::<[u8; 16]>(BABE_VRF_PREFIX)) < threshold } fn calculate_threshold( c: (u64, u64), authorities: &[(AuthorityId, BabeWeight)], authority_index: usize, ) -> u128 { use num_bigint::BigUint; use num_rational::BigRational; use num_traits::{cast::ToPrimitive, identities::One}; let c = c.0 as f64 / c.1 as f64; let theta = authorities[authority_index].1 as f64 / authorities.iter().map(|(_, weight)| weight).sum::() as f64; let calc = || { let p = BigRational::from_float(1f64 - (1f64 - c).powf(theta))?; let numer = p.numer().to_biguint()?; let denom = p.denom().to_biguint()?; ((BigUint::one() << 128) * numer / denom).to_u128() }; calc().unwrap_or(u128::max_value()) } /// Claim a slot if it is our turn. Returns `None` if it is not our turn. /// /// This hashes the slot number, epoch, genesis hash, and chain randomness into /// the VRF. If the VRF produces a value less than `threshold`, it is our turn, /// so it returns `Some(_)`. Otherwise, it returns `None`. fn claim_slot( slot_number: u64, Epoch { ref authorities, ref randomness, epoch_index, .. }: Epoch, c: (u64, u64), keystore: &KeyStorePtr, ) -> Option<((VRFInOut, VRFProof, VRFProofBatchable), usize, AuthorityPair)> { let keystore = keystore.read(); let (key_pair, authority_index) = authorities.iter() .enumerate() .find_map(|(i, a)| { keystore.key_pair::(&a.0).ok().map(|kp| (kp, i)) })?; let transcript = make_transcript(randomness, slot_number, epoch_index); // Compute the threshold we will use. // // We already checked that authorities contains `key.public()`, so it can't // be empty. Therefore, this division in `calculate_threshold` is safe. let threshold = calculate_threshold(c, authorities, authority_index); get_keypair(&key_pair) .vrf_sign_after_check(transcript, |inout| check(inout, threshold)) .map(|s|(s, authority_index, key_pair)) } fn initialize_authorities_cache(client: &C) -> Result<(), ConsensusError> where B: BlockT, C: ProvideRuntimeApi + ProvideCache, C::Api: BabeApi, { // no cache => no initialization let cache = match client.cache() { Some(cache) => cache, None => return Ok(()), }; // check if we already have initialized the cache let genesis_id = BlockId::Number(Zero::zero()); let genesis_epoch: Option = cache .get_at(&well_known_cache_keys::EPOCH, &genesis_id) .and_then(|v| Decode::decode(&mut &v[..]).ok()); if genesis_epoch.is_some() { return Ok(()); } let map_err = |error| consensus_common::Error::from(consensus_common::Error::ClientImport( format!( "Error initializing authorities cache: {}", error, ))); let genesis_epoch = epoch(client, &genesis_id)?; cache.initialize(&well_known_cache_keys::EPOCH, genesis_epoch.encode()) .map_err(map_err) } /// Tree of all epoch changes across all *seen* forks. Data stored in tree is /// the hash and block number of the block signaling the epoch change, and the /// epoch that was signalled at that block. type EpochChanges = ForkTree< ::Hash, NumberFor, Epoch, >; /// A shared epoch changes tree. #[derive(Clone)] struct SharedEpochChanges { inner: Arc>>, } impl SharedEpochChanges { fn new() -> Self { SharedEpochChanges { inner: Arc::new(Mutex::new(EpochChanges::::new())) } } fn lock(&self) -> MutexGuard> { self.inner.lock() } } impl From> for SharedEpochChanges { fn from(epoch_changes: EpochChanges) -> Self { SharedEpochChanges { inner: Arc::new(Mutex::new(epoch_changes)) } } } /// A block-import handler for BABE. /// /// This scans each imported block for epoch change signals. The signals are /// tracked in a tree (of all forks), and the import logic validates all epoch /// change transitions, i.e. whether a given epoch change is expected or whether /// it is missing. /// /// The epoch change tree should be pruned as blocks are finalized. pub struct BabeBlockImport { inner: I, client: Arc>, api: Arc, epoch_changes: SharedEpochChanges, } impl Clone for BabeBlockImport { fn clone(&self) -> Self { BabeBlockImport { inner: self.inner.clone(), client: self.client.clone(), api: self.api.clone(), epoch_changes: self.epoch_changes.clone(), } } } impl BabeBlockImport { fn new( client: Arc>, api: Arc, epoch_changes: SharedEpochChanges, block_import: I, ) -> Self { BabeBlockImport { client, api, inner: block_import, epoch_changes, } } } impl BlockImport for BabeBlockImport where Block: BlockT, I: BlockImport + Send + Sync, I::Error: Into, B: Backend + 'static, E: CallExecutor + 'static + Clone + Send + Sync, RA: Send + Sync, PRA: ProvideRuntimeApi + ProvideCache, PRA::Api: BabeApi, { type Error = ConsensusError; fn import_block( &mut self, mut block: BlockImportParams, mut new_cache: HashMap>, ) -> Result { let hash = block.post_header().hash(); let number = block.header.number().clone(); // early exit if block already in chain, otherwise the check for // epoch changes will error when trying to re-import an epoch change #[allow(deprecated)] match self.client.backend().blockchain().status(BlockId::Hash(hash)) { Ok(blockchain::BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain), Ok(blockchain::BlockStatus::Unknown) => {}, Err(e) => return Err(ConsensusError::ClientImport(e.to_string()).into()), } let slot_number = { let pre_digest = find_pre_digest::(&block.header) .expect("valid babe headers must contain a predigest; \ header has been already verified; qed"); let BabePreDigest { slot_number, .. } = pre_digest; slot_number }; // returns a function for checking whether a block is a descendent of another // consistent with querying client directly after importing the block. let parent_hash = *block.header.parent_hash(); let is_descendent_of = is_descendent_of(&self.client, Some((&hash, &parent_hash))); // check if there's any epoch change expected to happen at this slot let mut epoch_changes = self.epoch_changes.lock(); let enacted_epoch = epoch_changes.find_node_where( &hash, &number, &is_descendent_of, &|epoch| epoch.start_slot <= slot_number, ).map_err(|e| ConsensusError::from(ConsensusError::ClientImport(e.to_string())))?; let check_roots = || -> Result { // this can only happen when the chain starts, since there's no // epoch change at genesis. afterwards every time we expect an epoch // change it means we will import another one. for (root, _, _) in epoch_changes.roots() { let is_descendent_of = is_descendent_of(root, &hash) .map_err(|e| { ConsensusError::from(ConsensusError::ClientImport(e.to_string())) })?; if is_descendent_of { return Ok(false); } } Ok(true) }; let expected_epoch_change = enacted_epoch.is_some(); let next_epoch_digest = find_next_epoch_digest::(&block.header) .map_err(|e| ConsensusError::from(ConsensusError::ClientImport(e.to_string())))?; match (expected_epoch_change, next_epoch_digest.is_some()) { (true, true) => {}, (false, false) => {}, (true, false) => { return Err( ConsensusError::ClientImport( "Expected epoch change to happen by this block".into(), ) ); }, (false, true) => { if !check_roots()? { return Err(ConsensusError::ClientImport("Unexpected epoch change".into())); } }, } // if there's a pending epoch we'll save the previous epoch changes here // this way we can revert it if there's any error let mut old_epoch_changes = None; if let Some(next_epoch) = next_epoch_digest { if let Some(enacted_epoch) = enacted_epoch { let enacted_epoch = &enacted_epoch.data; if next_epoch.epoch_index.checked_sub(enacted_epoch.epoch_index) != Some(1) { return Err(ConsensusError::ClientImport(format!( "Invalid BABE epoch change: expected next epoch to be {:?}, got {:?}", enacted_epoch.epoch_index.saturating_add(1), next_epoch.epoch_index, ))); } // update the current epoch in the client cache new_cache.insert( well_known_cache_keys::EPOCH, enacted_epoch.encode(), ); let current_epoch = epoch(&*self.api, &BlockId::Hash(parent_hash))?; // if the authorities have changed then we populate the // `AUTHORITIES` key with the enacted epoch, so that the inner // `ImportBlock` can process it (`EPOCH` is specific to BABE). // e.g. in the case of GRANDPA it would require a justification // for the block, expecting that the authorities actually // changed. if current_epoch.authorities != enacted_epoch.authorities { new_cache.insert( well_known_cache_keys::AUTHORITIES, enacted_epoch.encode(), ); } } old_epoch_changes = Some(epoch_changes.clone()); // track the epoch change in the fork tree epoch_changes.import( hash, number, next_epoch, &is_descendent_of, ).map_err(|e| ConsensusError::from(ConsensusError::ClientImport(e.to_string())))?; crate::aux_schema::write_epoch_changes::( &*epoch_changes, |insert| block.auxiliary.extend( insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))) ) ); } let import_result = self.inner.import_block(block, new_cache); // revert to the original epoch changes in case there's an error // importing the block if let Err(_) = import_result { if let Some(old_epoch_changes) = old_epoch_changes { *epoch_changes = old_epoch_changes; } } import_result.map_err(Into::into) } fn check_block( &mut self, hash: Block::Hash, parent_hash: Block::Hash, ) -> Result { self.inner.check_block(hash, parent_hash).map_err(Into::into) } } /// Start an import queue for the BABE consensus algorithm. This method returns /// the import queue, some data that needs to be passed to the block authoring /// logic (`BabeLink`), a `BabeBlockImport` which should be used by the /// authoring when importing its own blocks, and a future that must be run to /// completion and is responsible for listening to finality notifications and /// pruning the epoch changes tree. pub fn import_queue, I, RA, PRA>( config: Config, block_import: I, justification_import: Option>, finality_proof_import: Option>, client: Arc>, api: Arc, inherent_data_providers: InherentDataProviders, ) -> ClientResult<( BabeImportQueue, BabeLink, BabeBlockImport, impl futures01::Future, )> where B: Backend + 'static, I: BlockImport + Clone + Send + Sync + 'static, I::Error: Into, E: CallExecutor + Clone + Send + Sync + 'static, RA: Send + Sync + 'static, PRA: ProvideRuntimeApi + ProvideCache + Send + Sync + AuxStore + 'static, PRA::Api: BlockBuilderApi + BabeApi, { register_babe_inherent_data_provider(&inherent_data_providers, config.get())?; initialize_authorities_cache(&*api)?; let verifier = BabeVerifier { api: api.clone(), inherent_data_providers, time_source: Default::default(), config, }; #[allow(deprecated)] let epoch_changes = aux_schema::load_epoch_changes(&**client.backend())?; let block_import = BabeBlockImport::new( client.clone(), api, epoch_changes.clone(), block_import, ); let pruning_task = client.finality_notification_stream() .map(|v| Ok::<_, ()>(v)).compat() .for_each(move |notification| { let is_descendent_of = is_descendent_of(&client, None); epoch_changes.lock().prune( ¬ification.hash, *notification.header.number(), &is_descendent_of, ).map_err(|e| { debug!(target: "babe", "Error pruning epoch changes fork tree: {:?}", e) })?; Ok(()) }); let timestamp_core = verifier.time_source.clone(); let queue = BasicQueue::new( verifier, Box::new(block_import.clone()), justification_import, finality_proof_import, ); Ok((queue, timestamp_core, block_import, pruning_task)) } /// BABE test helpers. Utility methods for manually authoring blocks. #[cfg(feature = "test-helpers")] pub mod test_helpers { use super::*; /// Try to claim the given slot and return a `BabePreDigest` if /// successful. pub fn claim_slot( client: &C, at: &BlockId, slot_number: u64, c: (u64, u64), keystore: &KeyStorePtr, ) -> Option where B: BlockT, C: ProvideRuntimeApi + ProvideCache, C::Api: BabeApi, { let epoch = epoch(client, at).unwrap(); super::claim_slot( slot_number, epoch, c, keystore, ).map(|((inout, vrf_proof, _), authority_index, _)| { BabePreDigest { vrf_proof, vrf_output: inout.to_output(), authority_index: authority_index as u32, slot_number, } }) } }