diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index 92518ef22d..a5030f1b35 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -8,7 +8,8 @@ use sc_service::{error::Error as ServiceError, Configuration, TaskManager}; use sp_inherents::InherentDataProviders; use sc_executor::native_executor_instance; pub use sc_executor::NativeExecutor; -use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair}; +use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; +use sc_consensus_aura::{ImportQueueParams, StartAuraParams}; use sc_finality_grandpa::SharedVoterState; use sc_keystore::LocalKeystore; use sc_telemetry::TelemetrySpan; @@ -43,7 +44,7 @@ pub fn new_partial(config: &Configuration) -> Result(&config)?; @@ -67,15 +68,18 @@ pub fn new_partial(config: &Configuration) -> Result( - sc_consensus_aura::slot_duration(&*client)?, - aura_block_import.clone(), - Some(Box::new(grandpa_block_import.clone())), - client.clone(), - inherent_data_providers.clone(), - &task_manager.spawn_essential_handle(), - config.prometheus_registry(), - sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()), + let import_queue = sc_consensus_aura::import_queue::( + ImportQueueParams { + block_import: aura_block_import.clone(), + justification_import: Some(Box::new(grandpa_block_import.clone())), + client: client.clone(), + inherent_data_providers: inherent_data_providers.clone(), + spawner: &task_manager.spawn_essential_handle(), + can_author_with: sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()), + slot_duration: sc_consensus_aura::slot_duration(&*client)?, + registry: config.prometheus_registry(), + check_for_equivocation: Default::default(), + }, )?; Ok(sc_service::PartialComponents { @@ -185,7 +189,7 @@ pub fn new_full(mut config: Configuration) -> Result )?; if role.is_authority() { - let proposer = sc_basic_authorship::ProposerFactory::new( + let proposer_factory = sc_basic_authorship::ProposerFactory::new( task_manager.spawn_handle(), client.clone(), transaction_pool, @@ -195,18 +199,20 @@ pub fn new_full(mut config: Configuration) -> Result let can_author_with = sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()); - let aura = sc_consensus_aura::start_aura::<_, _, _, _, _, AuraPair, _, _, _,_>( - sc_consensus_aura::slot_duration(&*client)?, - client.clone(), - select_chain, - block_import, - proposer, - network.clone(), - inherent_data_providers.clone(), - force_authoring, - backoff_authoring_blocks, - keystore_container.sync_keystore(), - can_author_with, + let aura = sc_consensus_aura::start_aura::( + StartAuraParams { + slot_duration: sc_consensus_aura::slot_duration(&*client)?, + client: client.clone(), + select_chain, + block_import, + proposer_factory, + inherent_data_providers: inherent_data_providers.clone(), + force_authoring, + backoff_authoring_blocks, + keystore: keystore_container.sync_keystore(), + can_author_with, + sync_oracle: network.clone(), + }, )?; // the AURA authoring task is considered essential, i.e. if it @@ -289,15 +295,18 @@ pub fn new_light(mut config: Configuration) -> Result client.clone(), ); - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _, _>( - sc_consensus_aura::slot_duration(&*client)?, - aura_block_import, - Some(Box::new(grandpa_block_import)), - client.clone(), - InherentDataProviders::new(), - &task_manager.spawn_essential_handle(), - config.prometheus_registry(), - sp_consensus::NeverCanAuthor, + let import_queue = sc_consensus_aura::import_queue::( + ImportQueueParams { + block_import: aura_block_import.clone(), + justification_import: Some(Box::new(grandpa_block_import.clone())), + client: client.clone(), + inherent_data_providers: InherentDataProviders::new(), + spawner: &task_manager.spawn_essential_handle(), + can_author_with: sp_consensus::NeverCanAuthor, + slot_duration: sc_consensus_aura::slot_duration(&*client)?, + registry: config.prometheus_registry(), + check_for_equivocation: Default::default(), + }, )?; let (network, network_status_sinks, system_rpc_tx, network_starter) = diff --git a/substrate/client/consensus/aura/src/import_queue.rs b/substrate/client/consensus/aura/src/import_queue.rs new file mode 100644 index 0000000000..638931477a --- /dev/null +++ b/substrate/client/consensus/aura/src/import_queue.rs @@ -0,0 +1,542 @@ +// This file is part of Substrate. + +// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Module implementing the logic for verifying and importing AuRa blocks. + +use crate::{ + AuthorityId, find_pre_digest, slot_author, aura_err, Error, AuraSlotCompatible, SlotDuration, + register_aura_inherent_data_provider, authorities, +}; +use std::{ + sync::Arc, time::Duration, thread, marker::PhantomData, hash::Hash, fmt::Debug, + collections::HashMap, +}; +use log::{debug, info, trace}; +use prometheus_endpoint::Registry; +use codec::{Encode, Decode, Codec}; +use sp_consensus::{ + BlockImport, CanAuthorWith, ForkChoiceStrategy, BlockImportParams, + BlockOrigin, Error as ConsensusError, BlockCheckParams, ImportResult, + import_queue::{ + Verifier, BasicQueue, DefaultImportQueue, BoxJustificationImport, + }, +}; +use sc_client_api::{backend::AuxStore, BlockOf}; +use sp_blockchain::{well_known_cache_keys::{self, Id as CacheKeyId}, ProvideCache, HeaderBackend}; +use sp_block_builder::BlockBuilder as BlockBuilderApi; +use sp_runtime::{generic::{BlockId, OpaqueDigestItemId}, Justification}; +use sp_runtime::traits::{Block as BlockT, Header, DigestItemFor, Zero}; +use sp_api::ProvideRuntimeApi; +use sp_core::crypto::Pair; +use sp_inherents::{InherentDataProviders, InherentData}; +use sp_timestamp::InherentError as TIError; +use sc_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_INFO}; +use sc_consensus_slots::{CheckedHeader, SlotCompatible, check_equivocation}; +use sp_consensus_slots::Slot; +use sp_api::ApiExt; +use sp_consensus_aura::{ + digests::CompatibleDigestItem, AuraApi, inherents::AuraInherentData, + ConsensusLog, AURA_ENGINE_ID, +}; + +/// check a header has been signed by the right key. If the slot is too far in the future, an error +/// will be returned. If it's successful, returns the pre-header and the digest item +/// containing the seal. +/// +/// This digest item will always return `Some` when used with `as_aura_seal`. +fn check_header( + client: &C, + slot_now: Slot, + mut header: B::Header, + hash: B::Hash, + authorities: &[AuthorityId

], + check_for_equivocation: CheckForEquivocation, +) -> Result)>, Error> where + DigestItemFor: CompatibleDigestItem, + P::Signature: Codec, + C: sc_client_api::backend::AuxStore, + P::Public: Encode + Decode + PartialEq + Clone, +{ + let seal = match header.digest_mut().pop() { + Some(x) => x, + None => return Err(Error::HeaderUnsealed(hash)), + }; + + let sig = seal.as_aura_seal().ok_or_else(|| { + aura_err(Error::HeaderBadSeal(hash)) + })?; + + let slot = find_pre_digest::(&header)?; + + if slot > slot_now { + header.digest_mut().push(seal); + Ok(CheckedHeader::Deferred(header, slot)) + } else { + // check the signature is valid under the expected authority and + // chain state. + let expected_author = match slot_author::

(slot, &authorities) { + None => return Err(Error::SlotAuthorNotFound), + Some(author) => author, + }; + + let pre_hash = header.hash(); + + if P::verify(&sig, pre_hash.as_ref(), expected_author) { + if check_for_equivocation.check_for_equivocation() { + if let Some(equivocation_proof) = check_equivocation( + client, + slot_now, + slot, + &header, + expected_author, + ).map_err(Error::Client)? { + info!( + target: "aura", + "Slot author is equivocating at slot {} with headers {:?} and {:?}", + slot, + equivocation_proof.first_header.hash(), + equivocation_proof.second_header.hash(), + ); + } + } + + Ok(CheckedHeader::Checked(header, (slot, seal))) + } else { + Err(Error::BadSignature(hash)) + } + } +} + +/// A verifier for Aura blocks. +pub struct AuraVerifier { + client: Arc, + phantom: PhantomData

, + inherent_data_providers: InherentDataProviders, + can_author_with: CAW, + check_for_equivocation: CheckForEquivocation, +} + +impl AuraVerifier { + pub(crate) fn new( + client: Arc, + inherent_data_providers: InherentDataProviders, + can_author_with: CAW, + check_for_equivocation: CheckForEquivocation, + ) -> Self { + Self { + client, + inherent_data_providers, + can_author_with, + check_for_equivocation, + phantom: PhantomData, + } + } +} + +impl AuraVerifier where + P: Send + Sync + 'static, + CAW: Send + Sync + 'static, +{ + fn check_inherents( + &self, + block: B, + block_id: BlockId, + inherent_data: InherentData, + timestamp_now: u64, + ) -> Result<(), Error> where + C: ProvideRuntimeApi, C::Api: BlockBuilderApi, + CAW: CanAuthorWith, + { + const MAX_TIMESTAMP_DRIFT_SECS: u64 = 60; + + if let Err(e) = self.can_author_with.can_author_with(&block_id) { + debug!( + target: "aura", + "Skipping `check_inherents` as authoring version is not compatible: {}", + e, + ); + + return Ok(()) + } + + let inherent_res = self.client.runtime_api().check_inherents( + &block_id, + block, + inherent_data, + ).map_err(|e| Error::Client(e.into()))?; + + if !inherent_res.ok() { + inherent_res + .into_errors() + .try_for_each(|(i, e)| match TIError::try_from(&i, &e) { + Some(TIError::ValidAtTimestamp(timestamp)) => { + // halt import until timestamp is valid. + // reject when too far ahead. + if timestamp > timestamp_now + MAX_TIMESTAMP_DRIFT_SECS { + return Err(Error::TooFarInFuture); + } + + let diff = timestamp.saturating_sub(timestamp_now); + info!( + target: "aura", + "halting for block {} seconds in the future", + diff + ); + telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; + "diff" => ?diff + ); + thread::sleep(Duration::from_secs(diff)); + Ok(()) + }, + Some(TIError::Other(e)) => Err(Error::Runtime(e.into())), + None => Err(Error::DataProvider( + self.inherent_data_providers.error_to_string(&i, &e) + )), + }) + } else { + Ok(()) + } + } +} + +impl Verifier for AuraVerifier where + C: ProvideRuntimeApi + + Send + + Sync + + sc_client_api::backend::AuxStore + + ProvideCache + + BlockOf, + C::Api: BlockBuilderApi + AuraApi> + ApiExt, + DigestItemFor: CompatibleDigestItem, + P: Pair + Send + Sync + 'static, + P::Public: Send + Sync + Hash + Eq + Clone + Decode + Encode + Debug + 'static, + P::Signature: Encode + Decode, + CAW: CanAuthorWith + Send + Sync + 'static, +{ + fn verify( + &mut self, + origin: BlockOrigin, + header: B::Header, + justification: Option, + mut body: Option>, + ) -> Result<(BlockImportParams, Option)>>), String> { + let mut inherent_data = self.inherent_data_providers + .create_inherent_data() + .map_err(|e| e.into_string())?; + let (timestamp_now, slot_now, _) = AuraSlotCompatible.extract_timestamp_and_slot(&inherent_data) + .map_err(|e| format!("Could not extract timestamp and slot: {:?}", e))?; + let hash = header.hash(); + let parent_hash = *header.parent_hash(); + let authorities = authorities(self.client.as_ref(), &BlockId::Hash(parent_hash)) + .map_err(|e| format!("Could not fetch authorities at {:?}: {:?}", parent_hash, e))?; + + // we add one to allow for some small drift. + // FIXME #1019 in the future, alter this queue to allow deferring of + // headers + let checked_header = check_header::( + &self.client, + slot_now + 1, + header, + hash, + &authorities[..], + self.check_for_equivocation, + ).map_err(|e| e.to_string())?; + match checked_header { + CheckedHeader::Checked(pre_header, (slot, seal)) => { + // if the body is passed through, we need to use the runtime + // to check that the internally-set timestamp in the inherents + // actually matches the slot set in the seal. + if let Some(inner_body) = body.take() { + inherent_data.aura_replace_inherent_data(slot); + let block = B::new(pre_header.clone(), inner_body); + + // skip the inherents verification if the runtime API is old. + if self.client + .runtime_api() + .has_api_with::, _>( + &BlockId::Hash(parent_hash), + |v| v >= 2, + ) + .map_err(|e| format!("{:?}", e))? + { + self.check_inherents( + block.clone(), + BlockId::Hash(parent_hash), + inherent_data, + timestamp_now, + ).map_err(|e| e.to_string())?; + } + + let (_, inner_body) = block.deconstruct(); + body = Some(inner_body); + } + + trace!(target: "aura", "Checked {:?}; importing.", pre_header); + telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header); + + // Look for an authorities-change log. + let maybe_keys = pre_header.digest() + .logs() + .iter() + .filter_map(|l| l.try_to::>>( + OpaqueDigestItemId::Consensus(&AURA_ENGINE_ID) + )) + .find_map(|l| match l { + ConsensusLog::AuthoritiesChange(a) => Some( + vec![(well_known_cache_keys::AUTHORITIES, a.encode())] + ), + _ => None, + }); + + let mut import_block = BlockImportParams::new(origin, pre_header); + import_block.post_digests.push(seal); + import_block.body = body; + import_block.justification = justification; + import_block.fork_choice = Some(ForkChoiceStrategy::LongestChain); + import_block.post_hash = Some(hash); + + Ok((import_block, maybe_keys)) + } + CheckedHeader::Deferred(a, b) => { + debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); + telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future"; + "hash" => ?hash, "a" => ?a, "b" => ?b + ); + Err(format!("Header {:?} rejected: too far in the future", hash)) + } + } + } +} + +fn initialize_authorities_cache(client: &C) -> Result<(), ConsensusError> where + A: Codec + Debug, + B: BlockT, + C: ProvideRuntimeApi + BlockOf + ProvideCache, + C::Api: AuraApi, +{ + // no cache => no initialization + let cache = match client.cache() { + Some(cache) => cache, + None => return Ok(()), + }; + + // check if we already have initialized the cache + let map_err = |error| sp_consensus::Error::from(sp_consensus::Error::ClientImport( + format!( + "Error initializing authorities cache: {}", + error, + ))); + + let genesis_id = BlockId::Number(Zero::zero()); + let genesis_authorities: Option> = cache + .get_at(&well_known_cache_keys::AUTHORITIES, &genesis_id) + .unwrap_or(None) + .and_then(|(_, _, v)| Decode::decode(&mut &v[..]).ok()); + if genesis_authorities.is_some() { + return Ok(()); + } + + let genesis_authorities = authorities(client, &genesis_id)?; + cache.initialize(&well_known_cache_keys::AUTHORITIES, genesis_authorities.encode()) + .map_err(map_err)?; + + Ok(()) +} + +/// A block-import handler for Aura. +pub struct AuraBlockImport, P> { + inner: I, + client: Arc, + _phantom: PhantomData<(Block, P)>, +} + +impl, P> Clone for AuraBlockImport { + fn clone(&self) -> Self { + AuraBlockImport { + inner: self.inner.clone(), + client: self.client.clone(), + _phantom: PhantomData, + } + } +} + +impl, P> AuraBlockImport { + /// New aura block import. + pub fn new( + inner: I, + client: Arc, + ) -> Self { + Self { + inner, + client, + _phantom: PhantomData, + } + } +} + +impl BlockImport for AuraBlockImport where + I: BlockImport> + Send + Sync, + I::Error: Into, + C: HeaderBackend + ProvideRuntimeApi, + P: Pair + Send + Sync + 'static, + P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, + P::Signature: Encode + Decode, +{ + type Error = ConsensusError; + type Transaction = sp_api::TransactionFor; + + fn check_block( + &mut self, + block: BlockCheckParams, + ) -> Result { + self.inner.check_block(block).map_err(Into::into) + } + + fn import_block( + &mut self, + block: BlockImportParams, + new_cache: HashMap>, + ) -> Result { + let hash = block.post_hash(); + let slot = find_pre_digest::(&block.header) + .expect("valid Aura headers must contain a predigest; \ + header has been already verified; qed"); + + let parent_hash = *block.header.parent_hash(); + let parent_header = self.client.header(BlockId::Hash(parent_hash)) + .map_err(|e| ConsensusError::ChainLookup(e.to_string()))? + .ok_or_else(|| ConsensusError::ChainLookup(aura_err( + Error::::ParentUnavailable(parent_hash, hash) + ).into()))?; + + let parent_slot = find_pre_digest::(&parent_header) + .expect("valid Aura headers contain a pre-digest; \ + parent header has already been verified; qed"); + + // make sure that slot number is strictly increasing + if slot <= parent_slot { + return Err( + ConsensusError::ClientImport(aura_err( + Error::::SlotMustIncrease(parent_slot, slot) + ).into()) + ); + } + + self.inner.import_block(block, new_cache).map_err(Into::into) + } +} + +/// Should we check for equivocation of a block author? +#[derive(Debug, Clone, Copy)] +pub enum CheckForEquivocation { + /// Yes, check for equivocation. + /// + /// This is the default setting for this. + Yes, + /// No, don't check for equivocation. + No, +} + +impl CheckForEquivocation { + /// Should we check for equivocation? + fn check_for_equivocation(self) -> bool { + matches!(self, Self::Yes) + } +} + +impl Default for CheckForEquivocation { + fn default() -> Self { + Self::Yes + } +} + +/// Parameters of [`import_queue`]. +pub struct ImportQueueParams<'a, Block, I, C, S, CAW> { + /// The block import to use. + pub block_import: I, + /// The justification import. + pub justification_import: Option>, + /// The client to interact with the chain. + pub client: Arc, + /// The inherent data provider, to create the inherent data. + pub inherent_data_providers: InherentDataProviders, + /// The spawner to spawn background tasks. + pub spawner: &'a S, + /// The prometheus registry. + pub registry: Option<&'a Registry>, + /// Can we author with the current node? + pub can_author_with: CAW, + /// Should we check for equivocation? + pub check_for_equivocation: CheckForEquivocation, + /// The duration of one slot. + pub slot_duration: SlotDuration, +} + +/// Start an import queue for the Aura consensus algorithm. +pub fn import_queue<'a, P, Block, I, C, S, CAW>( + ImportQueueParams { + block_import, + justification_import, + client, + inherent_data_providers, + spawner, + registry, + can_author_with, + check_for_equivocation, + slot_duration, + }: ImportQueueParams<'a, Block, I, C, S, CAW> +) -> Result, sp_consensus::Error> where + Block: BlockT, + C::Api: BlockBuilderApi + AuraApi> + ApiExt, + C: 'static + + ProvideRuntimeApi + + BlockOf + + ProvideCache + + Send + + Sync + + AuxStore + + HeaderBackend, + I: BlockImport> + + Send + + Sync + + 'static, + DigestItemFor: CompatibleDigestItem, + P: Pair + Send + Sync + 'static, + P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, + P::Signature: Encode + Decode, + S: sp_core::traits::SpawnEssentialNamed, + CAW: CanAuthorWith + Send + Sync + 'static, +{ + register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; + initialize_authorities_cache(&*client)?; + + let verifier = AuraVerifier::<_, P, _>::new( + client, + inherent_data_providers, + can_author_with, + check_for_equivocation, + ); + + Ok(BasicQueue::new( + verifier, + Box::new(block_import), + justification_import, + spawner, + registry, + )) +} diff --git a/substrate/client/consensus/aura/src/lib.rs b/substrate/client/consensus/aura/src/lib.rs index 71aa7bdb7c..1c30f136ea 100644 --- a/substrate/client/consensus/aura/src/lib.rs +++ b/substrate/client/consensus/aura/src/lib.rs @@ -31,50 +31,34 @@ //! NOTE: Aura itself is designed to be generic over the crypto used. #![forbid(missing_docs, unsafe_code)] use std::{ - sync::Arc, time::Duration, thread, marker::PhantomData, hash::Hash, fmt::Debug, pin::Pin, - collections::HashMap, convert::{TryFrom, TryInto}, + sync::Arc, marker::PhantomData, hash::Hash, fmt::Debug, pin::Pin, convert::{TryFrom, TryInto}, }; use futures::prelude::*; use parking_lot::Mutex; -use log::{debug, info, trace}; -use prometheus_endpoint::Registry; +use log::{debug, trace}; use codec::{Encode, Decode, Codec}; use sp_consensus::{ BlockImport, Environment, Proposer, CanAuthorWith, ForkChoiceStrategy, BlockImportParams, - BlockOrigin, Error as ConsensusError, SelectChain, SlotData, BlockCheckParams, ImportResult, - import_queue::{ - Verifier, BasicQueue, DefaultImportQueue, BoxJustificationImport, - }, + BlockOrigin, Error as ConsensusError, SelectChain, SlotData, }; use sc_client_api::{backend::AuxStore, BlockOf}; -use sp_blockchain::{ - self, Result as CResult, well_known_cache_keys::{self, Id as CacheKeyId}, - ProvideCache, HeaderBackend, -}; -use sp_block_builder::BlockBuilder as BlockBuilderApi; +use sp_blockchain::{Result as CResult, well_known_cache_keys, ProvideCache, HeaderBackend}; use sp_core::crypto::Public; use sp_application_crypto::{AppKey, AppPublic}; -use sp_runtime::{generic::{BlockId, OpaqueDigestItemId}, traits::NumberFor, Justification}; +use sp_runtime::{generic::BlockId, traits::NumberFor}; use sp_runtime::traits::{Block as BlockT, Header, DigestItemFor, Zero, Member}; use sp_api::ProvideRuntimeApi; use sp_core::crypto::Pair; use sp_keystore::{SyncCryptoStorePtr, SyncCryptoStore}; use sp_inherents::{InherentDataProviders, InherentData}; -use sp_timestamp::{ - TimestampInherentData, InherentType as TimestampInherent, InherentError as TIError -}; -use sc_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_INFO}; - -use sc_consensus_slots::{ - CheckedHeader, SlotInfo, SlotCompatible, StorageChanges, check_equivocation, - BackoffAuthoringBlocksStrategy, -}; +use sp_timestamp::{TimestampInherentData, InherentType as TimestampInherent}; +use sc_consensus_slots::{SlotInfo, SlotCompatible, StorageChanges, BackoffAuthoringBlocksStrategy}; use sp_consensus_slots::Slot; -use sp_api::ApiExt; +mod import_queue; pub use sp_consensus_aura::{ ConsensusLog, AuraApi, AURA_ENGINE_ID, digests::CompatibleDigestItem, @@ -84,6 +68,7 @@ pub use sp_consensus_aura::{ }, }; pub use sp_consensus::SyncOracle; +pub use import_queue::{ImportQueueParams, import_queue, AuraBlockImport, CheckForEquivocation}; type AuthorityId

=

::Public; @@ -133,26 +118,54 @@ impl SlotCompatible for AuraSlotCompatible { } } +/// Parameters of [`start_aura`]. +pub struct StartAuraParams { + /// The duration of a slot. + pub slot_duration: SlotDuration, + /// The client to interact with the chain. + pub client: Arc, + /// A select chain implementation to select the best block. + pub select_chain: SC, + /// The block import. + pub block_import: I, + /// The proposer factory to build proposer instances. + pub proposer_factory: PF, + /// The sync oracle that can give us the current sync status. + pub sync_oracle: SO, + /// The inherent data providers to create the inherent data. + pub inherent_data_providers: InherentDataProviders, + /// Should we force the authoring of blocks? + pub force_authoring: bool, + /// The backoff strategy when we miss slots. + pub backoff_authoring_blocks: Option, + /// The keystore used by the node. + pub keystore: SyncCryptoStorePtr, + /// Can we author a block with this node? + pub can_author_with: CAW, +} + /// Start the aura worker. The returned future should be run in a futures executor. -pub fn start_aura( - slot_duration: SlotDuration, - client: Arc, - select_chain: SC, - block_import: I, - env: E, - sync_oracle: SO, - inherent_data_providers: InherentDataProviders, - force_authoring: bool, - backoff_authoring_blocks: Option, - keystore: SyncCryptoStorePtr, - can_author_with: CAW, +pub fn start_aura( + StartAuraParams { + slot_duration, + client, + select_chain, + block_import, + proposer_factory: env, + sync_oracle, + inherent_data_providers, + force_authoring, + backoff_authoring_blocks, + keystore, + can_author_with, + }: StartAuraParams, ) -> Result, sp_consensus::Error> where B: BlockT, C: ProvideRuntimeApi + BlockOf + ProvideCache + AuxStore + HeaderBackend + Send + Sync, C::Api: AuraApi>, SC: SelectChain, - E: Environment + Send + Sync + 'static, - E::Proposer: Proposer>, + PF: Environment + Send + Sync + 'static, + PF::Proposer: Proposer>, P: Pair + Send + Sync, P::Public: AppPublic + Hash + Member + Encode + Decode, P::Signature: TryFrom> + Hash + Member + Encode + Decode, @@ -430,290 +443,21 @@ fn find_pre_digest(header: &B::Header) -> Result( - client: &C, - slot_now: Slot, - mut header: B::Header, - hash: B::Hash, - authorities: &[AuthorityId

], -) -> Result)>, Error> where - DigestItemFor: CompatibleDigestItem, - P::Signature: Codec, - C: sc_client_api::backend::AuxStore, - P::Public: Encode + Decode + PartialEq + Clone, -{ - let seal = match header.digest_mut().pop() { - Some(x) => x, - None => return Err(Error::HeaderUnsealed(hash)), - }; - - let sig = seal.as_aura_seal().ok_or_else(|| { - aura_err(Error::HeaderBadSeal(hash)) - })?; - - let slot = find_pre_digest::(&header)?; - - if slot > slot_now { - header.digest_mut().push(seal); - Ok(CheckedHeader::Deferred(header, slot)) +/// Register the aura inherent data provider, if not registered already. +fn register_aura_inherent_data_provider( + inherent_data_providers: &InherentDataProviders, + slot_duration: u64, +) -> Result<(), sp_consensus::Error> { + if !inherent_data_providers.has_provider(&INHERENT_IDENTIFIER) { + inherent_data_providers + .register_provider(InherentDataProvider::new(slot_duration)) + .map_err(Into::into) + .map_err(sp_consensus::Error::InherentData) } else { - // check the signature is valid under the expected authority and - // chain state. - let expected_author = match slot_author::

(slot, &authorities) { - None => return Err(Error::SlotAuthorNotFound), - Some(author) => author, - }; - - let pre_hash = header.hash(); - - if P::verify(&sig, pre_hash.as_ref(), expected_author) { - if let Some(equivocation_proof) = check_equivocation( - client, - slot_now, - slot, - &header, - expected_author, - ).map_err(Error::Client)? { - info!( - "Slot author is equivocating at slot {} with headers {:?} and {:?}", - slot, - equivocation_proof.first_header.hash(), - equivocation_proof.second_header.hash(), - ); - } - - Ok(CheckedHeader::Checked(header, (slot, seal))) - } else { - Err(Error::BadSignature(hash)) - } + Ok(()) } } -/// A verifier for Aura blocks. -pub struct AuraVerifier { - client: Arc, - phantom: PhantomData

, - inherent_data_providers: sp_inherents::InherentDataProviders, - can_author_with: CAW, -} - -impl AuraVerifier where - P: Send + Sync + 'static, - CAW: Send + Sync + 'static, -{ - fn check_inherents( - &self, - block: B, - block_id: BlockId, - inherent_data: InherentData, - timestamp_now: u64, - ) -> Result<(), Error> where - C: ProvideRuntimeApi, C::Api: BlockBuilderApi, - CAW: CanAuthorWith, - { - const MAX_TIMESTAMP_DRIFT_SECS: u64 = 60; - - if let Err(e) = self.can_author_with.can_author_with(&block_id) { - debug!( - target: "aura", - "Skipping `check_inherents` as authoring version is not compatible: {}", - e, - ); - - return Ok(()) - } - - let inherent_res = self.client.runtime_api().check_inherents( - &block_id, - block, - inherent_data, - ).map_err(|e| Error::Client(e.into()))?; - - if !inherent_res.ok() { - inherent_res - .into_errors() - .try_for_each(|(i, e)| match TIError::try_from(&i, &e) { - Some(TIError::ValidAtTimestamp(timestamp)) => { - // halt import until timestamp is valid. - // reject when too far ahead. - if timestamp > timestamp_now + MAX_TIMESTAMP_DRIFT_SECS { - return Err(Error::TooFarInFuture); - } - - let diff = timestamp.saturating_sub(timestamp_now); - info!( - target: "aura", - "halting for block {} seconds in the future", - diff - ); - telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; - "diff" => ?diff - ); - thread::sleep(Duration::from_secs(diff)); - Ok(()) - }, - Some(TIError::Other(e)) => Err(Error::Runtime(e.into())), - None => Err(Error::DataProvider( - self.inherent_data_providers.error_to_string(&i, &e) - )), - }) - } else { - Ok(()) - } - } -} - -#[forbid(deprecated)] -impl Verifier for AuraVerifier where - C: ProvideRuntimeApi + - Send + - Sync + - sc_client_api::backend::AuxStore + - ProvideCache + - BlockOf, - C::Api: BlockBuilderApi + AuraApi> + ApiExt, - DigestItemFor: CompatibleDigestItem, - P: Pair + Send + Sync + 'static, - P::Public: Send + Sync + Hash + Eq + Clone + Decode + Encode + Debug + 'static, - P::Signature: Encode + Decode, - CAW: CanAuthorWith + Send + Sync + 'static, -{ - fn verify( - &mut self, - origin: BlockOrigin, - header: B::Header, - justification: Option, - mut body: Option>, - ) -> Result<(BlockImportParams, Option)>>), String> { - let mut inherent_data = self.inherent_data_providers - .create_inherent_data() - .map_err(|e| e.into_string())?; - let (timestamp_now, slot_now, _) = AuraSlotCompatible.extract_timestamp_and_slot(&inherent_data) - .map_err(|e| format!("Could not extract timestamp and slot: {:?}", e))?; - let hash = header.hash(); - let parent_hash = *header.parent_hash(); - let authorities = authorities(self.client.as_ref(), &BlockId::Hash(parent_hash)) - .map_err(|e| format!("Could not fetch authorities at {:?}: {:?}", parent_hash, e))?; - - // we add one to allow for some small drift. - // FIXME #1019 in the future, alter this queue to allow deferring of - // headers - let checked_header = check_header::( - &self.client, - slot_now + 1, - header, - hash, - &authorities[..], - ).map_err(|e| e.to_string())?; - match checked_header { - CheckedHeader::Checked(pre_header, (slot, seal)) => { - // if the body is passed through, we need to use the runtime - // to check that the internally-set timestamp in the inherents - // actually matches the slot set in the seal. - if let Some(inner_body) = body.take() { - inherent_data.aura_replace_inherent_data(slot); - let block = B::new(pre_header.clone(), inner_body); - - // skip the inherents verification if the runtime API is old. - if self.client - .runtime_api() - .has_api_with::, _>( - &BlockId::Hash(parent_hash), - |v| v >= 2, - ) - .map_err(|e| format!("{:?}", e))? - { - self.check_inherents( - block.clone(), - BlockId::Hash(parent_hash), - inherent_data, - timestamp_now, - ).map_err(|e| e.to_string())?; - } - - let (_, inner_body) = block.deconstruct(); - body = Some(inner_body); - } - - trace!(target: "aura", "Checked {:?}; importing.", pre_header); - telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header); - - // Look for an authorities-change log. - let maybe_keys = pre_header.digest() - .logs() - .iter() - .filter_map(|l| l.try_to::>>( - OpaqueDigestItemId::Consensus(&AURA_ENGINE_ID) - )) - .find_map(|l| match l { - ConsensusLog::AuthoritiesChange(a) => Some( - vec![(well_known_cache_keys::AUTHORITIES, a.encode())] - ), - _ => None, - }); - - let mut import_block = BlockImportParams::new(origin, pre_header); - import_block.post_digests.push(seal); - import_block.body = body; - import_block.justification = justification; - import_block.fork_choice = Some(ForkChoiceStrategy::LongestChain); - import_block.post_hash = Some(hash); - - Ok((import_block, maybe_keys)) - } - CheckedHeader::Deferred(a, b) => { - debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); - telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future"; - "hash" => ?hash, "a" => ?a, "b" => ?b - ); - Err(format!("Header {:?} rejected: too far in the future", hash)) - } - } - } -} - -fn initialize_authorities_cache(client: &C) -> Result<(), ConsensusError> where - A: Codec + Debug, - B: BlockT, - C: ProvideRuntimeApi + BlockOf + ProvideCache, - C::Api: AuraApi, -{ - // no cache => no initialization - let cache = match client.cache() { - Some(cache) => cache, - None => return Ok(()), - }; - - // check if we already have initialized the cache - let map_err = |error| sp_consensus::Error::from(sp_consensus::Error::ClientImport( - format!( - "Error initializing authorities cache: {}", - error, - ))); - - let genesis_id = BlockId::Number(Zero::zero()); - let genesis_authorities: Option> = cache - .get_at(&well_known_cache_keys::AUTHORITIES, &genesis_id) - .unwrap_or(None) - .and_then(|(_, _, v)| Decode::decode(&mut &v[..]).ok()); - if genesis_authorities.is_some() { - return Ok(()); - } - - let genesis_authorities = authorities(client, &genesis_id)?; - cache.initialize(&well_known_cache_keys::AUTHORITIES, genesis_authorities.encode()) - .map_err(map_err)?; - - Ok(()) -} - -#[allow(deprecated)] fn authorities(client: &C, at: &BlockId) -> Result, ConsensusError> where A: Codec + Debug, B: BlockT, @@ -731,145 +475,6 @@ fn authorities(client: &C, at: &BlockId) -> Result, Consensus .ok_or_else(|| sp_consensus::Error::InvalidAuthoritiesSet.into()) } -/// Register the aura inherent data provider, if not registered already. -fn register_aura_inherent_data_provider( - inherent_data_providers: &InherentDataProviders, - slot_duration: u64, -) -> Result<(), sp_consensus::Error> { - if !inherent_data_providers.has_provider(&INHERENT_IDENTIFIER) { - inherent_data_providers - .register_provider(InherentDataProvider::new(slot_duration)) - .map_err(Into::into) - .map_err(sp_consensus::Error::InherentData) - } else { - Ok(()) - } -} - -/// A block-import handler for Aura. -pub struct AuraBlockImport, P> { - inner: I, - client: Arc, - _phantom: PhantomData<(Block, P)>, -} - -impl, P> Clone for AuraBlockImport { - fn clone(&self) -> Self { - AuraBlockImport { - inner: self.inner.clone(), - client: self.client.clone(), - _phantom: PhantomData, - } - } -} - -impl, P> AuraBlockImport { - /// New aura block import. - pub fn new( - inner: I, - client: Arc, - ) -> Self { - Self { - inner, - client, - _phantom: PhantomData, - } - } -} - -impl BlockImport for AuraBlockImport where - I: BlockImport> + Send + Sync, - I::Error: Into, - C: HeaderBackend + ProvideRuntimeApi, - P: Pair + Send + Sync + 'static, - P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, - P::Signature: Encode + Decode, -{ - type Error = ConsensusError; - type Transaction = sp_api::TransactionFor; - - fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { - self.inner.check_block(block).map_err(Into::into) - } - - fn import_block( - &mut self, - block: BlockImportParams, - new_cache: HashMap>, - ) -> Result { - let hash = block.post_hash(); - let slot = find_pre_digest::(&block.header) - .expect("valid Aura headers must contain a predigest; \ - header has been already verified; qed"); - - let parent_hash = *block.header.parent_hash(); - let parent_header = self.client.header(BlockId::Hash(parent_hash)) - .map_err(|e| ConsensusError::ChainLookup(e.to_string()))? - .ok_or_else(|| ConsensusError::ChainLookup(aura_err( - Error::::ParentUnavailable(parent_hash, hash) - ).into()))?; - - let parent_slot = find_pre_digest::(&parent_header) - .expect("valid Aura headers contain a pre-digest; \ - parent header has already been verified; qed"); - - // make sure that slot number is strictly increasing - if slot <= parent_slot { - return Err( - ConsensusError::ClientImport(aura_err( - Error::::SlotMustIncrease(parent_slot, slot) - ).into()) - ); - } - - self.inner.import_block(block, new_cache).map_err(Into::into) - } -} - -/// Start an import queue for the Aura consensus algorithm. -pub fn import_queue( - slot_duration: SlotDuration, - block_import: I, - justification_import: Option>, - client: Arc, - inherent_data_providers: InherentDataProviders, - spawner: &S, - registry: Option<&Registry>, - can_author_with: CAW, -) -> Result, sp_consensus::Error> where - B: BlockT, - C::Api: BlockBuilderApi + AuraApi> + ApiExt, - C: 'static + ProvideRuntimeApi + BlockOf + ProvideCache + Send + Sync + AuxStore + HeaderBackend, - I: BlockImport> + Send + Sync + 'static, - DigestItemFor: CompatibleDigestItem, - P: Pair + Send + Sync + 'static, - P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, - P::Signature: Encode + Decode, - S: sp_core::traits::SpawnEssentialNamed, - CAW: CanAuthorWith + Send + Sync + 'static, -{ - register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; - initialize_authorities_cache(&*client)?; - - let verifier = AuraVerifier::<_, P, _> { - client, - inherent_data_providers, - phantom: PhantomData, - can_author_with, - }; - - Ok(BasicQueue::new( - verifier, - Box::new(block_import), - justification_import, - spawner, - registry, - )) -} - #[cfg(test)] mod tests { use super::*; @@ -884,7 +489,7 @@ mod tests { use sc_client_api::BlockchainEvents; use sp_consensus_aura::sr25519::AuthorityPair; use sc_consensus_slots::{SimpleSlotWorker, BackoffAuthoringOnFinalizedHeadLagging}; - use std::{task::Poll, time::Instant}; + use std::{task::Poll, time::{Instant, Duration}}; use sc_block_builder::BlockBuilderProvider; use sp_runtime::traits::Header as _; use substrate_test_runtime_client::{TestClient, runtime::{Header, H256}}; @@ -941,7 +546,7 @@ mod tests { } impl TestNetFactory for AuraTestNet { - type Verifier = AuraVerifier; + type Verifier = import_queue::AuraVerifier; type PeerData = (); /// Create new test network with peers and given config. @@ -964,12 +569,12 @@ mod tests { ).expect("Registers aura inherent data provider"); assert_eq!(slot_duration.get(), SLOT_DURATION); - AuraVerifier { + import_queue::AuraVerifier::new( client, inherent_data_providers, - phantom: Default::default(), - can_author_with: AlwaysCanAuthor, - } + AlwaysCanAuthor, + CheckForEquivocation::Yes, + ) }, PeersClient::Light(_, _) => unreachable!("No (yet) tests for light client + Aura"), } @@ -982,14 +587,12 @@ mod tests { fn peers(&self) -> &Vec> { &self.peers } - fn mut_peers>)>(&mut self, closure: F) { closure(&mut self.peers); } } #[test] - #[allow(deprecated)] fn authoring_blocks() { sp_tracing::try_init_simple(); let net = AuraTestNet::new(3); @@ -1033,19 +636,19 @@ mod tests { &inherent_data_providers, slot_duration.get() ).expect("Registers aura inherent data provider"); - aura_futures.push(start_aura::<_, _, _, _, _, AuthorityPair, _, _, _, _>( + aura_futures.push(start_aura::(StartAuraParams { slot_duration, - client.clone(), + block_import: client.clone(), select_chain, client, - environ, - DummyOracle, + proposer_factory: environ, + sync_oracle: DummyOracle, inherent_data_providers, - false, - Some(BackoffAuthoringOnFinalizedHeadLagging::default()), + force_authoring: false, + backoff_authoring_blocks: Some(BackoffAuthoringOnFinalizedHeadLagging::default()), keystore, - sp_consensus::AlwaysCanAuthor, - ).expect("Starts aura")); + can_author_with: sp_consensus::AlwaysCanAuthor, + }).expect("Starts aura")); } futures::executor::block_on(future::select(