// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
//! Substrate Client
use std::{
marker::PhantomData, collections::{HashSet, BTreeMap, HashMap}, sync::Arc, panic::UnwindSafe,
result,
};
use log::{info, trace, warn};
use parking_lot::{Mutex, RwLock};
use codec::{Encode, Decode};
use hash_db::Prefix;
use sp_core::{
ChangesTrieConfiguration, convert_hash, traits::CodeExecutor,
NativeOrEncoded, storage::{StorageKey, StorageData, well_known_keys, ChildInfo},
};
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_runtime::{
Justification, BuildStorage,
generic::{BlockId, SignedBlock, DigestItem},
traits::{
Block as BlockT, Header as HeaderT, Zero, NumberFor, HashFor, SaturatedConversion, One,
DigestFor,
},
};
use sp_state_machine::{
DBValue, Backend as StateBackend, ChangesTrieAnchorBlockId,
prove_read, prove_child_read, ChangesTrieRootsStorage, ChangesTrieStorage,
ChangesTrieConfigurationRange, key_changes, key_changes_proof,
};
use sc_executor::{RuntimeVersion, RuntimeInfo};
use sp_consensus::{
Error as ConsensusError, BlockStatus, BlockImportParams, BlockCheckParams, ImportResult,
BlockOrigin, ForkChoiceStrategy, SelectChain, RecordProof,
};
use sp_blockchain::{self as blockchain,
Backend as ChainBackend,
HeaderBackend as ChainHeaderBackend, ProvideCache, Cache,
well_known_cache_keys::Id as CacheKeyId,
HeaderMetadata, CachedHeaderMetadata,
};
use sp_trie::StorageProof;
use sp_api::{
CallApiAt, ConstructRuntimeApi, Core as CoreApi, ApiExt, ApiRef, ProvideRuntimeApi,
CallApiAtParams,
};
use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider};
pub use sc_client_api::{
backend::{
self, BlockImportOperation, PrunableStateChangesTrieStorage,
ClientImportOperation, Finalizer, ImportSummary, NewBlockState,
changes_tries_state_at_block, StorageProvider,
LockImportRun,
},
client::{
ImportNotifications, FinalityNotification, FinalityNotifications, BlockImportNotification,
ClientInfo, BlockchainEvents, BlockBackend, ProvideUncles, BadBlocks, ForkBlocks,
BlockOf,
},
execution_extensions::{ExecutionExtensions, ExecutionStrategies},
notifications::{StorageNotifications, StorageEventStream},
CallExecutor, ExecutorProvider, ProofProvider, CloneableSpawn,
};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_blockchain::Error;
use prometheus_endpoint::Registry;
use crate::{
call_executor::LocalCallExecutor,
light::{call_executor::prove_execution, fetcher::ChangesProof},
in_mem, genesis, cht, block_rules::{BlockRules, LookupResult as BlockLookupResult},
};
use crate::client::backend::KeyIterator;
/// Substrate Client
pub struct Client where Block: BlockT {
backend: Arc,
executor: E,
storage_notifications: Mutex>,
import_notification_sinks: Mutex>>>,
finality_notification_sinks: Mutex>>>,
// holds the block hash currently being imported. TODO: replace this with block queue
importing_block: RwLock>,
block_rules: BlockRules,
execution_extensions: ExecutionExtensions,
_phantom: PhantomData,
}
// used in importing a block, where additional changes are made after the runtime
// executed.
enum PrePostHeader {
// they are the same: no post-runtime digest items.
Same(H),
// different headers (pre, post).
Different(H, H),
}
impl PrePostHeader {
// get a reference to the "post-header" -- the header as it should be after all changes are applied.
fn post(&self) -> &H {
match *self {
PrePostHeader::Same(ref h) => h,
PrePostHeader::Different(_, ref h) => h,
}
}
// convert to the "post-header" -- the header as it should be after all changes are applied.
fn into_post(self) -> H {
match self {
PrePostHeader::Same(h) => h,
PrePostHeader::Different(_, h) => h,
}
}
}
/// Create an instance of in-memory client.
pub fn new_in_mem(
executor: E,
genesis_storage: &S,
keystore: Option,
prometheus_registry: Option,
spawn_handle: Box,
) -> sp_blockchain::Result,
LocalCallExecutor, E>,
Block,
RA
>> where
E: CodeExecutor + RuntimeInfo,
S: BuildStorage,
Block: BlockT,
{
new_with_backend(Arc::new(in_mem::Backend::new()), executor, genesis_storage, keystore, spawn_handle, prometheus_registry)
}
/// Create a client with the explicitly provided backend.
/// This is useful for testing backend implementations.
pub fn new_with_backend(
backend: Arc,
executor: E,
build_genesis_storage: &S,
keystore: Option,
spawn_handle: Box,
prometheus_registry: Option,
) -> sp_blockchain::Result, Block, RA>>
where
E: CodeExecutor + RuntimeInfo,
S: BuildStorage,
Block: BlockT,
B: backend::LocalBackend + 'static,
{
let call_executor = LocalCallExecutor::new(backend.clone(), executor, spawn_handle);
let extensions = ExecutionExtensions::new(Default::default(), keystore);
Client::new(
backend,
call_executor,
build_genesis_storage,
Default::default(),
Default::default(),
extensions,
prometheus_registry,
)
}
impl BlockOf for Client where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
type Type = Block;
}
impl LockImportRun for Client
where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
fn lock_import_and_run(&self, f: F) -> Result
where
F: FnOnce(&mut ClientImportOperation) -> Result,
Err: From,
{
let inner = || {
let _import_lock = self.backend.get_import_lock().write();
let mut op = ClientImportOperation {
op: self.backend.begin_operation()?,
notify_imported: None,
notify_finalized: Vec::new(),
};
let r = f(&mut op)?;
let ClientImportOperation { op, notify_imported, notify_finalized } = op;
self.backend.commit_operation(op)?;
self.notify_finalized(notify_finalized)?;
self.notify_imported(notify_imported)?;
Ok(r)
};
let result = inner();
*self.importing_block.write() = None;
result
}
}
impl LockImportRun for &Client
where
Block: BlockT,
B: backend::Backend,
E: CallExecutor,
{
fn lock_import_and_run(&self, f: F) -> Result
where
F: FnOnce(&mut ClientImportOperation) -> Result,
Err: From,
{
(**self).lock_import_and_run(f)
}
}
impl Client where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
/// Creates new Substrate Client with given blockchain and code executor.
pub fn new(
backend: Arc,
executor: E,
build_genesis_storage: &dyn BuildStorage,
fork_blocks: ForkBlocks,
bad_blocks: BadBlocks,
execution_extensions: ExecutionExtensions,
_prometheus_registry: Option,
) -> sp_blockchain::Result {
if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() {
let genesis_storage = build_genesis_storage.build_storage()?;
let mut op = backend.begin_operation()?;
backend.begin_state_operation(&mut op, BlockId::Hash(Default::default()))?;
let state_root = op.reset_storage(genesis_storage)?;
let genesis_block = genesis::construct_genesis_block::(state_root.into());
info!("🔨 Initializing Genesis block/state (state: {}, header-hash: {})",
genesis_block.header().state_root(),
genesis_block.header().hash()
);
op.set_block_data(
genesis_block.deconstruct().0,
Some(vec![]),
None,
NewBlockState::Final
)?;
backend.commit_operation(op)?;
}
Ok(Client {
backend,
executor,
storage_notifications: Default::default(),
import_notification_sinks: Default::default(),
finality_notification_sinks: Default::default(),
importing_block: Default::default(),
block_rules: BlockRules::new(fork_blocks, bad_blocks),
execution_extensions,
_phantom: Default::default(),
})
}
/// Get a reference to the state at a given block.
pub fn state_at(&self, block: &BlockId) -> sp_blockchain::Result {
self.backend.state_at(*block)
}
/// Get the code at a given block.
pub fn code_at(&self, id: &BlockId) -> sp_blockchain::Result> {
Ok(StorageProvider::storage(self, id, &StorageKey(well_known_keys::CODE.to_vec()))?
.expect("None is returned if there's no value stored for the given key;\
':code' key is always defined; qed").0)
}
/// Get the RuntimeVersion at a given block.
pub fn runtime_version_at(&self, id: &BlockId) -> sp_blockchain::Result {
self.executor.runtime_version(id)
}
/// Get block hash by number.
pub fn block_hash(&self,
block_number: <::Header as HeaderT>::Number
) -> sp_blockchain::Result> {
self.backend.blockchain().hash(block_number)
}
/// Reads given header and generates CHT-based header proof for CHT of given size.
pub fn header_proof_with_cht_size(
&self,
id: &BlockId,
cht_size: NumberFor,
) -> sp_blockchain::Result<(Block::Header, StorageProof)> {
let proof_error = || sp_blockchain::Error::Backend(format!("Failed to generate header proof for {:?}", id));
let header = self.backend.blockchain().expect_header(*id)?;
let block_num = *header.number();
let cht_num = cht::block_to_cht_number(cht_size, block_num).ok_or_else(proof_error)?;
let cht_start = cht::start_number(cht_size, cht_num);
let mut current_num = cht_start;
let cht_range = ::std::iter::from_fn(|| {
let old_current_num = current_num;
current_num = current_num + One::one();
Some(old_current_num)
});
let headers = cht_range.map(|num| self.block_hash(num));
let proof = cht::build_proof::, _, _>(
cht_size,
cht_num,
std::iter::once(block_num),
headers,
)?;
Ok((header, proof))
}
/// Does the same work as `key_changes_proof`, but assumes that CHTs are of passed size.
pub fn key_changes_proof_with_cht_size(
&self,
first: Block::Hash,
last: Block::Hash,
min: Block::Hash,
max: Block::Hash,
storage_key: Option<&StorageKey>,
key: &StorageKey,
cht_size: NumberFor,
) -> sp_blockchain::Result> {
struct AccessedRootsRecorder<'a, Block: BlockT> {
storage: &'a dyn ChangesTrieStorage, NumberFor>,
min: NumberFor,
required_roots_proofs: Mutex, Block::Hash>>,
};
impl<'a, Block: BlockT> ChangesTrieRootsStorage, NumberFor> for
AccessedRootsRecorder<'a, Block>
{
fn build_anchor(&self, hash: Block::Hash)
-> Result>, String>
{
self.storage.build_anchor(hash)
}
fn root(
&self,
anchor: &ChangesTrieAnchorBlockId>,
block: NumberFor,
) -> Result, String> {
let root = self.storage.root(anchor, block)?;
if block < self.min {
if let Some(ref root) = root {
self.required_roots_proofs.lock().insert(
block,
root.clone()
);
}
}
Ok(root)
}
}
impl<'a, Block: BlockT> ChangesTrieStorage, NumberFor> for
AccessedRootsRecorder<'a, Block>
{
fn as_roots_storage(&self)
-> &dyn sp_state_machine::ChangesTrieRootsStorage, NumberFor>
{
self
}
fn with_cached_changed_keys(
&self,
root: &Block::Hash,
functor: &mut dyn FnMut(&HashMap>, HashSet>>),
) -> bool {
self.storage.with_cached_changed_keys(root, functor)
}
fn get(&self, key: &Block::Hash, prefix: Prefix) -> Result, String> {
self.storage.get(key, prefix)
}
}
let first_number = self.backend.blockchain()
.expect_block_number_from_id(&BlockId::Hash(first))?;
let (storage, configs) = self.require_changes_trie(first_number, last, true)?;
let min_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(min))?;
let recording_storage = AccessedRootsRecorder:: {
storage: storage.storage(),
min: min_number,
required_roots_proofs: Mutex::new(BTreeMap::new()),
};
let max_number = std::cmp::min(
self.backend.blockchain().info().best_number,
self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(max))?,
);
// fetch key changes proof
let mut proof = Vec::new();
for (config_zero, config_end, config) in configs {
let last_number = self.backend.blockchain()
.expect_block_number_from_id(&BlockId::Hash(last))?;
let config_range = ChangesTrieConfigurationRange {
config: &config,
zero: config_zero,
end: config_end.map(|(config_end_number, _)| config_end_number),
};
let proof_range = key_changes_proof::, _>(
config_range,
&recording_storage,
first_number,
&ChangesTrieAnchorBlockId {
hash: convert_hash(&last),
number: last_number,
},
max_number,
storage_key.as_ref().map(|x| &x.0[..]),
&key.0,
)
.map_err(|err| sp_blockchain::Error::ChangesTrieAccessFailed(err))?;
proof.extend(proof_range);
}
// now gather proofs for all changes tries roots that were touched during key_changes_proof
// execution AND are unknown (i.e. replaced with CHT) to the requester
let roots = recording_storage.required_roots_proofs.into_inner();
let roots_proof = self.changes_trie_roots_proof(cht_size, roots.keys().cloned())?;
Ok(ChangesProof {
max_block: max_number,
proof,
roots: roots.into_iter().map(|(n, h)| (n, convert_hash(&h))).collect(),
roots_proof,
})
}
/// Generate CHT-based proof for roots of changes tries at given blocks.
fn changes_trie_roots_proof>>(
&self,
cht_size: NumberFor,
blocks: I
) -> sp_blockchain::Result {
// most probably we have touched several changes tries that are parts of the single CHT
// => GroupBy changes tries by CHT number and then gather proof for the whole group at once
let mut proofs = Vec::new();
cht::for_each_cht_group::(cht_size, blocks, |_, cht_num, cht_blocks| {
let cht_proof = self.changes_trie_roots_proof_at_cht(cht_size, cht_num, cht_blocks)?;
proofs.push(cht_proof);
Ok(())
}, ())?;
Ok(StorageProof::merge(proofs))
}
/// Generates CHT-based proof for roots of changes tries at given blocks (that are part of single CHT).
fn changes_trie_roots_proof_at_cht(
&self,
cht_size: NumberFor,
cht_num: NumberFor,
blocks: Vec>
) -> sp_blockchain::Result {
let cht_start = cht::start_number(cht_size, cht_num);
let mut current_num = cht_start;
let cht_range = ::std::iter::from_fn(|| {
let old_current_num = current_num;
current_num = current_num + One::one();
Some(old_current_num)
});
let roots = cht_range
.map(|num| self.header(&BlockId::Number(num))
.map(|block|
block.and_then(|block| block.digest().log(DigestItem::as_changes_trie_root).cloned()))
);
let proof = cht::build_proof::, _, _>(
cht_size,
cht_num,
blocks,
roots,
)?;
Ok(proof)
}
/// Returns changes trie storage and all configurations that have been active in the range [first; last].
///
/// Configurations are returned in descending order (and obviously never overlap).
/// If fail_if_disabled is false, returns maximal consequent configurations ranges, starting from last and
/// stopping on either first, or when CT have been disabled.
/// If fail_if_disabled is true, fails when there's a subrange where CT have been disabled
/// inside first..last blocks range.
fn require_changes_trie(
&self,
first: NumberFor,
last: Block::Hash,
fail_if_disabled: bool,
) -> sp_blockchain::Result<(
&dyn PrunableStateChangesTrieStorage,
Vec<(NumberFor, Option<(NumberFor, Block::Hash)>, ChangesTrieConfiguration)>,
)> {
let storage = match self.backend.changes_trie_storage() {
Some(storage) => storage,
None => return Err(sp_blockchain::Error::ChangesTriesNotSupported),
};
let mut configs = Vec::with_capacity(1);
let mut current = last;
loop {
let config_range = storage.configuration_at(&BlockId::Hash(current))?;
match config_range.config {
Some(config) => configs.push((config_range.zero.0, config_range.end, config)),
None if !fail_if_disabled => return Ok((storage, configs)),
None => return Err(sp_blockchain::Error::ChangesTriesNotSupported),
}
if config_range.zero.0 < first {
break;
}
current = *self.backend.blockchain().expect_header(BlockId::Hash(config_range.zero.1))?.parent_hash();
}
Ok((storage, configs))
}
/// Apply a checked and validated block to an operation. If a justification is provided
/// then `finalized` *must* be true.
fn apply_block(
&self,
operation: &mut ClientImportOperation,
import_block: BlockImportParams>,
new_cache: HashMap>,
) -> sp_blockchain::Result where
Self: ProvideRuntimeApi,
>::Api: CoreApi +
ApiExt,
{
let BlockImportParams {
origin,
header,
justification,
post_digests,
body,
storage_changes,
finalized,
auxiliary,
fork_choice,
intermediates,
import_existing,
..
} = import_block;
assert!(justification.is_some() && finalized || justification.is_none());
if !intermediates.is_empty() {
return Err(Error::IncompletePipeline)
}
let fork_choice = fork_choice.ok_or(Error::IncompletePipeline)?;
let import_headers = if post_digests.is_empty() {
PrePostHeader::Same(header)
} else {
let mut post_header = header.clone();
for item in post_digests {
post_header.digest_mut().push(item);
}
PrePostHeader::Different(header, post_header)
};
let hash = import_headers.post().hash();
let height = (*import_headers.post().number()).saturated_into::();
*self.importing_block.write() = Some(hash);
let result = self.execute_and_import_block(
operation,
origin,
hash,
import_headers,
justification,
body,
storage_changes,
new_cache,
finalized,
auxiliary,
fork_choice,
import_existing,
);
if let Ok(ImportResult::Imported(ref aux)) = result {
if aux.is_new_best {
telemetry!(SUBSTRATE_INFO; "block.import";
"height" => height,
"best" => ?hash,
"origin" => ?origin
);
}
}
result
}
fn execute_and_import_block(
&self,
operation: &mut ClientImportOperation,
origin: BlockOrigin,
hash: Block::Hash,
import_headers: PrePostHeader,
justification: Option,
body: Option>,
storage_changes: Option, Block>>,
new_cache: HashMap>,
finalized: bool,
aux: Vec<(Vec, Option>)>,
fork_choice: ForkChoiceStrategy,
import_existing: bool,
) -> sp_blockchain::Result where
Self: ProvideRuntimeApi,
>::Api: CoreApi +
ApiExt,
{
let parent_hash = import_headers.post().parent_hash().clone();
let status = self.backend.blockchain().status(BlockId::Hash(hash))?;
match (import_existing, status) {
(false, blockchain::BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain),
(false, blockchain::BlockStatus::Unknown) => {},
(true, blockchain::BlockStatus::InChain) => {},
(true, blockchain::BlockStatus::Unknown) =>
return Err(Error::UnknownBlock(format!("{:?}", hash))),
}
let info = self.backend.blockchain().info();
// the block is lower than our last finalized block so it must revert
// finality, refusing import.
if *import_headers.post().number() <= info.finalized_number {
return Err(sp_blockchain::Error::NotInFinalizedChain);
}
// this is a fairly arbitrary choice of where to draw the line on making notifications,
// but the general goal is to only make notifications when we are already fully synced
// and get a new chain head.
let make_notifications = match origin {
BlockOrigin::NetworkBroadcast | BlockOrigin::Own | BlockOrigin::ConsensusBroadcast => true,
BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false,
};
let storage_changes = match storage_changes {
Some(storage_changes) => {
self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?;
// ensure parent block is finalized to maintain invariant that
// finality is called sequentially.
if finalized {
self.apply_finality_with_block_hash(
operation,
parent_hash,
None,
info.best_hash,
make_notifications,
)?;
}
operation.op.update_cache(new_cache);
let (main_sc, child_sc, tx, _, changes_trie_tx) = storage_changes.into_inner();
operation.op.update_db_storage(tx)?;
operation.op.update_storage(main_sc.clone(), child_sc.clone())?;
if let Some(changes_trie_transaction) = changes_trie_tx {
operation.op.update_changes_trie(changes_trie_transaction)?;
}
Some((main_sc, child_sc))
},
None => None,
};
let is_new_best = finalized || match fork_choice {
ForkChoiceStrategy::LongestChain => import_headers.post().number() > &info.best_number,
ForkChoiceStrategy::Custom(v) => v,
};
let leaf_state = if finalized {
NewBlockState::Final
} else if is_new_best {
NewBlockState::Best
} else {
NewBlockState::Normal
};
let retracted = if is_new_best {
let route_from_best = sp_blockchain::tree_route(
self.backend.blockchain(),
info.best_hash,
parent_hash,
)?;
route_from_best.retracted().iter().rev().map(|e| e.hash.clone()).collect()
} else {
Vec::default()
};
trace!(
"Imported {}, (#{}), best={}, origin={:?}",
hash,
import_headers.post().number(),
is_new_best,
origin,
);
operation.op.set_block_data(
import_headers.post().clone(),
body,
justification,
leaf_state,
)?;
operation.op.insert_aux(aux)?;
if make_notifications {
if finalized {
operation.notify_finalized.push(hash);
}
operation.notify_imported = Some(ImportSummary {
hash,
origin,
header: import_headers.into_post(),
is_new_best,
storage_changes,
retracted,
})
}
Ok(ImportResult::imported(is_new_best))
}
/// Prepares the storage changes for a block.
///
/// It checks if the state should be enacted and if the `import_block` maybe already provides
/// the required storage changes. If the state should be enacted and the storage changes are not
/// provided, the block is re-executed to get the storage changes.
fn prepare_block_storage_changes(
&self,
import_block: &mut BlockImportParams>,
) -> sp_blockchain::Result>
where
Self: ProvideRuntimeApi,
>::Api: CoreApi +
ApiExt,
{
let parent_hash = import_block.header.parent_hash();
let at = BlockId::Hash(*parent_hash);
let enact_state = match self.block_status(&at)? {
BlockStatus::Unknown => return Ok(Some(ImportResult::UnknownParent)),
BlockStatus::InChainWithState | BlockStatus::Queued => true,
BlockStatus::InChainPruned if import_block.allow_missing_state => false,
BlockStatus::InChainPruned => return Ok(Some(ImportResult::MissingState)),
BlockStatus::KnownBad => return Ok(Some(ImportResult::KnownBad)),
};
match (enact_state, &mut import_block.storage_changes, &mut import_block.body) {
// We have storage changes and should enact the state, so we don't need to do anything
// here
(true, Some(_), _) => {},
// We should enact state, but don't have any storage changes, so we need to execute the
// block.
(true, ref mut storage_changes @ None, Some(ref body)) => {
let runtime_api = self.runtime_api();
runtime_api.execute_block(
&at,
Block::new(import_block.header.clone(), body.clone()),
)?;
let state = self.backend.state_at(at)?;
let changes_trie_state = changes_tries_state_at_block(
&at,
self.backend.changes_trie_storage(),
)?;
let gen_storage_changes = runtime_api.into_storage_changes(
&state,
changes_trie_state.as_ref(),
*parent_hash,
)?;
if import_block.header.state_root()
!= &gen_storage_changes.transaction_storage_root
{
return Err(Error::InvalidStateRoot)
} else {
**storage_changes = Some(gen_storage_changes);
}
},
// No block body, no storage changes
(true, None, None) => {},
// We should not enact the state, so we set the storage changes to `None`.
(false, changes, _) => {
changes.take();
}
};
Ok(None)
}
fn apply_finality_with_block_hash(
&self,
operation: &mut ClientImportOperation,
block: Block::Hash,
justification: Option,
best_block: Block::Hash,
notify: bool,
) -> sp_blockchain::Result<()> {
// find tree route from last finalized to given block.
let last_finalized = self.backend.blockchain().last_finalized()?;
if block == last_finalized {
warn!("Possible safety violation: attempted to re-finalize last finalized block {:?} ", last_finalized);
return Ok(());
}
let route_from_finalized = sp_blockchain::tree_route(self.backend.blockchain(), last_finalized, block)?;
if let Some(retracted) = route_from_finalized.retracted().get(0) {
warn!("Safety violation: attempted to revert finalized block {:?} which is not in the \
same chain as last finalized {:?}", retracted, last_finalized);
return Err(sp_blockchain::Error::NotInFinalizedChain);
}
let route_from_best = sp_blockchain::tree_route(self.backend.blockchain(), best_block, block)?;
// if the block is not a direct ancestor of the current best chain,
// then some other block is the common ancestor.
if route_from_best.common_block().hash != block {
// NOTE: we're setting the finalized block as best block, this might
// be slightly inaccurate since we might have a "better" block
// further along this chain, but since best chain selection logic is
// plugable we cannot make a better choice here. usages that need
// an accurate "best" block need to go through `SelectChain`
// instead.
operation.op.mark_head(BlockId::Hash(block))?;
}
let enacted = route_from_finalized.enacted();
assert!(enacted.len() > 0);
for finalize_new in &enacted[..enacted.len() - 1] {
operation.op.mark_finalized(BlockId::Hash(finalize_new.hash), None)?;
}
assert_eq!(enacted.last().map(|e| e.hash), Some(block));
operation.op.mark_finalized(BlockId::Hash(block), justification)?;
if notify {
// sometimes when syncing, tons of blocks can be finalized at once.
// we'll send notifications spuriously in that case.
const MAX_TO_NOTIFY: usize = 256;
let enacted = route_from_finalized.enacted();
let start = enacted.len() - ::std::cmp::min(enacted.len(), MAX_TO_NOTIFY);
for finalized in &enacted[start..] {
operation.notify_finalized.push(finalized.hash);
}
}
Ok(())
}
fn notify_finalized(
&self,
notify_finalized: Vec,
) -> sp_blockchain::Result<()> {
let mut sinks = self.finality_notification_sinks.lock();
if notify_finalized.is_empty() {
// cleanup any closed finality notification sinks
// since we won't be running the loop below which
// would also remove any closed sinks.
sinks.retain(|sink| !sink.is_closed());
return Ok(());
}
for finalized_hash in notify_finalized {
let header = self.header(&BlockId::Hash(finalized_hash))?
.expect("header already known to exist in DB because it is indicated in the tree route; qed");
telemetry!(SUBSTRATE_INFO; "notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?finalized_hash,
);
let notification = FinalityNotification {
header,
hash: finalized_hash,
};
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
}
Ok(())
}
fn notify_imported(
&self,
notify_import: Option>,
) -> sp_blockchain::Result<()> {
let notify_import = match notify_import {
Some(notify_import) => notify_import,
None => {
// cleanup any closed import notification sinks since we won't
// be sending any notifications below which would remove any
// closed sinks. this is necessary since during initial sync we
// won't send any import notifications which could lead to a
// temporary leak of closed/discarded notification sinks (e.g.
// from consensus code).
self.import_notification_sinks
.lock()
.retain(|sink| !sink.is_closed());
return Ok(());
}
};
if let Some(storage_changes) = notify_import.storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(
¬ify_import.hash,
storage_changes.0.into_iter(),
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
);
}
let notification = BlockImportNotification:: {
hash: notify_import.hash,
origin: notify_import.origin,
header: notify_import.header,
is_new_best: notify_import.is_new_best,
retracted: notify_import.retracted,
};
self.import_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
Ok(())
}
/// Attempts to revert the chain by `n` blocks guaranteeing that no block is
/// reverted past the last finalized block. Returns the number of blocks
/// that were successfully reverted.
pub fn revert(&self, n: NumberFor) -> sp_blockchain::Result> {
Ok(self.backend.revert(n, false)?)
}
/// Attempts to revert the chain by `n` blocks disregarding finality. This
/// method will revert any finalized blocks as requested and can potentially
/// leave the node in an inconsistent state. Other modules in the system that
/// persist data and that rely on finality (e.g. consensus parts) will be
/// unaffected by the revert. Use this method with caution and making sure
/// that no other data needs to be reverted for consistency aside from the
/// block data.
///
/// Returns the number of blocks that were successfully reverted.
pub fn unsafe_revert(&self, n: NumberFor) -> sp_blockchain::Result> {
Ok(self.backend.revert(n, true)?)
}
/// Get usage info about current client.
pub fn usage_info(&self) -> ClientInfo {
ClientInfo {
chain: self.chain_info(),
usage: self.backend.usage_info(),
}
}
/// Get blockchain info.
pub fn chain_info(&self) -> blockchain::Info {
self.backend.blockchain().info()
}
/// Get block status.
pub fn block_status(&self, id: &BlockId) -> sp_blockchain::Result {
// this can probably be implemented more efficiently
if let BlockId::Hash(ref h) = id {
if self.importing_block.read().as_ref().map_or(false, |importing| h == importing) {
return Ok(BlockStatus::Queued);
}
}
let hash_and_number = match id.clone() {
BlockId::Hash(hash) => self.backend.blockchain().number(hash)?.map(|n| (hash, n)),
BlockId::Number(n) => self.backend.blockchain().hash(n)?.map(|hash| (hash, n)),
};
match hash_and_number {
Some((hash, number)) => {
if self.backend.have_state_at(&hash, number) {
Ok(BlockStatus::InChainWithState)
} else {
Ok(BlockStatus::InChainPruned)
}
}
None => Ok(BlockStatus::Unknown),
}
}
/// Get block header by id.
pub fn header(&self, id: &BlockId) -> sp_blockchain::Result::Header>> {
self.backend.blockchain().header(*id)
}
/// Get block body by id.
pub fn body(&self, id: &BlockId) -> sp_blockchain::Result::Extrinsic>>> {
self.backend.blockchain().body(*id)
}
/// Gets the uncles of the block with `target_hash` going back `max_generation` ancestors.
pub fn uncles(&self, target_hash: Block::Hash, max_generation: NumberFor) -> sp_blockchain::Result> {
let load_header = |id: Block::Hash| -> sp_blockchain::Result {
match self.backend.blockchain().header(BlockId::Hash(id))? {
Some(hdr) => Ok(hdr),
None => Err(Error::UnknownBlock(format!("{:?}", id))),
}
};
let genesis_hash = self.backend.blockchain().info().genesis_hash;
if genesis_hash == target_hash { return Ok(Vec::new()); }
let mut current_hash = target_hash;
let mut current = load_header(current_hash)?;
let mut ancestor_hash = *current.parent_hash();
let mut ancestor = load_header(ancestor_hash)?;
let mut uncles = Vec::new();
for _generation in 0..max_generation.saturated_into() {
let children = self.backend.blockchain().children(ancestor_hash)?;
uncles.extend(children.into_iter().filter(|h| h != ¤t_hash));
current_hash = ancestor_hash;
if genesis_hash == current_hash { break; }
current = ancestor;
ancestor_hash = *current.parent_hash();
ancestor = load_header(ancestor_hash)?;
}
trace!("Collected {} uncles", uncles.len());
Ok(uncles)
}
/// Prepare in-memory header that is used in execution environment.
fn prepare_environment_block(&self, parent: &BlockId) -> sp_blockchain::Result {
let parent_header = self.backend.blockchain().expect_header(*parent)?;
Ok(<::Header as HeaderT>::new(
self.backend.blockchain().expect_block_number_from_id(parent)? + One::one(),
Default::default(),
Default::default(),
parent_header.hash(),
Default::default(),
))
}
}
impl ProofProvider for Client where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
fn read_proof(
&self,
id: &BlockId,
keys: &mut dyn Iterator- ,
) -> sp_blockchain::Result
{
self.state_at(id)
.and_then(|state| prove_read(state, keys)
.map_err(Into::into))
}
fn read_child_proof(
&self,
id: &BlockId,
storage_key: &[u8],
child_info: ChildInfo,
keys: &mut dyn Iterator- ,
) -> sp_blockchain::Result
{
self.state_at(id)
.and_then(|state| prove_child_read(state, storage_key, child_info, keys)
.map_err(Into::into))
}
fn execution_proof(
&self,
id: &BlockId,
method: &str,
call_data: &[u8]
) -> sp_blockchain::Result<(Vec, StorageProof)> {
// Make sure we include the `:code` and `:heap_pages` in the execution proof to be
// backwards compatible.
//
// TODO: Remove when solved: https://github.com/paritytech/substrate/issues/5047
let code_proof = self.read_proof(
id,
&mut [well_known_keys::CODE, well_known_keys::HEAP_PAGES].iter().map(|v| *v),
)?;
let state = self.state_at(id)?;
let header = self.prepare_environment_block(id)?;
prove_execution(
state,
header,
&self.executor,
method,
call_data,
).map(|(r, p)| {
(r, StorageProof::merge(vec![p, code_proof]))
})
}
fn header_proof(&self, id: &BlockId) -> sp_blockchain::Result<(Block::Header, StorageProof)> {
self.header_proof_with_cht_size(id, cht::size())
}
fn key_changes_proof(
&self,
first: Block::Hash,
last: Block::Hash,
min: Block::Hash,
max: Block::Hash,
storage_key: Option<&StorageKey>,
key: &StorageKey,
) -> sp_blockchain::Result> {
self.key_changes_proof_with_cht_size(
first,
last,
min,
max,
storage_key,
key,
cht::size(),
)
}
}
impl BlockBuilderProvider for Client
where
B: backend::Backend + Send + Sync + 'static,
E: CallExecutor + Send + Sync + 'static,
Block: BlockT,
Self: ChainHeaderBackend + ProvideRuntimeApi,
>::Api: ApiExt>
+ BlockBuilderApi,
{
fn new_block_at>(
&self,
parent: &BlockId,
inherent_digests: DigestFor,
record_proof: R,
) -> sp_blockchain::Result> {
sc_block_builder::BlockBuilder::new(
self,
self.expect_block_hash_from_id(parent)?,
self.expect_block_number_from_id(parent)?,
record_proof.into(),
inherent_digests,
&self.backend
)
}
fn new_block(
&self,
inherent_digests: DigestFor,
) -> sp_blockchain::Result> {
let info = self.chain_info();
sc_block_builder::BlockBuilder::new(
self,
info.best_hash,
info.best_number,
RecordProof::No,
inherent_digests,
&self.backend,
)
}
}
impl ExecutorProvider for Client where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
type Executor = E;
fn executor(&self) -> &Self::Executor {
&self.executor
}
fn execution_extensions(&self) -> &ExecutionExtensions {
&self.execution_extensions
}
}
impl StorageProvider for Client where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
fn storage_keys(&self, id: &BlockId, key_prefix: &StorageKey) -> sp_blockchain::Result> {
let keys = self.state_at(id)?.keys(&key_prefix.0).into_iter().map(StorageKey).collect();
Ok(keys)
}
fn storage_pairs(&self, id: &BlockId, key_prefix: &StorageKey)
-> sp_blockchain::Result>
{
let state = self.state_at(id)?;
let keys = state
.keys(&key_prefix.0)
.into_iter()
.map(|k| {
let d = state.storage(&k).ok().flatten().unwrap_or_default();
(StorageKey(k), StorageData(d))
})
.collect();
Ok(keys)
}
fn storage_keys_iter<'a>(
&self,
id: &BlockId,
prefix: Option<&'a StorageKey>,
start_key: Option<&StorageKey>
) -> sp_blockchain::Result> {
let state = self.state_at(id)?;
let start_key = start_key
.or(prefix)
.map(|key| key.0.clone())
.unwrap_or_else(Vec::new);
Ok(KeyIterator::new(state, prefix, start_key))
}
fn storage(&self, id: &BlockId, key: &StorageKey) -> sp_blockchain::Result>
{
Ok(self.state_at(id)?
.storage(&key.0).map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
.map(StorageData)
)
}
fn storage_hash(&self, id: &BlockId, key: &StorageKey) -> sp_blockchain::Result>
{
Ok(self.state_at(id)?
.storage_hash(&key.0).map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
)
}
fn child_storage_keys(
&self,
id: &BlockId,
child_storage_key: &StorageKey,
child_info: ChildInfo,
key_prefix: &StorageKey
) -> sp_blockchain::Result> {
let keys = self.state_at(id)?
.child_keys(&child_storage_key.0, child_info, &key_prefix.0)
.into_iter()
.map(StorageKey)
.collect();
Ok(keys)
}
fn child_storage(
&self,
id: &BlockId,
storage_key: &StorageKey,
child_info: ChildInfo,
key: &StorageKey
) -> sp_blockchain::Result> {
Ok(self.state_at(id)?
.child_storage(&storage_key.0, child_info, &key.0)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
.map(StorageData))
}
fn child_storage_hash(
&self,
id: &BlockId,
storage_key: &StorageKey,
child_info: ChildInfo,
key: &StorageKey
) -> sp_blockchain::Result> {
Ok(self.state_at(id)?
.child_storage_hash(&storage_key.0, child_info, &key.0)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
)
}
fn max_key_changes_range(
&self,
first: NumberFor,
last: BlockId,
) -> sp_blockchain::Result, BlockId)>> {
let last_number = self.backend.blockchain().expect_block_number_from_id(&last)?;
let last_hash = self.backend.blockchain().expect_block_hash_from_id(&last)?;
if first > last_number {
return Err(sp_blockchain::Error::ChangesTrieAccessFailed("Invalid changes trie range".into()));
}
let (storage, configs) = match self.require_changes_trie(first, last_hash, false).ok() {
Some((storage, configs)) => (storage, configs),
None => return Ok(None),
};
let first_available_changes_trie = configs.last().map(|config| config.0);
match first_available_changes_trie {
Some(first_available_changes_trie) => {
let oldest_unpruned = storage.oldest_pruned_digest_range_end();
let first = std::cmp::max(first_available_changes_trie, oldest_unpruned);
Ok(Some((first, last)))
},
None => Ok(None)
}
}
fn key_changes(
&self,
first: NumberFor,
last: BlockId,
storage_key: Option<&StorageKey>,
key: &StorageKey
) -> sp_blockchain::Result, u32)>> {
let last_number = self.backend.blockchain().expect_block_number_from_id(&last)?;
let last_hash = self.backend.blockchain().expect_block_hash_from_id(&last)?;
let (storage, configs) = self.require_changes_trie(first, last_hash, true)?;
let mut result = Vec::new();
let best_number = self.backend.blockchain().info().best_number;
for (config_zero, config_end, config) in configs {
let range_first = ::std::cmp::max(first, config_zero + One::one());
let range_anchor = match config_end {
Some((config_end_number, config_end_hash)) => if last_number > config_end_number {
ChangesTrieAnchorBlockId { hash: config_end_hash, number: config_end_number }
} else {
ChangesTrieAnchorBlockId { hash: convert_hash(&last_hash), number: last_number }
},
None => ChangesTrieAnchorBlockId { hash: convert_hash(&last_hash), number: last_number },
};
let config_range = ChangesTrieConfigurationRange {
config: &config,
zero: config_zero.clone(),
end: config_end.map(|(config_end_number, _)| config_end_number),
};
let result_range: Vec<(NumberFor, u32)> = key_changes::, _>(
config_range,
storage.storage(),
range_first,
&range_anchor,
best_number,
storage_key.as_ref().map(|x| &x.0[..]),
&key.0)
.and_then(|r| r.map(|r| r.map(|(block, tx)| (block, tx))).collect::>())
.map_err(|err| sp_blockchain::Error::ChangesTrieAccessFailed(err))?;
result.extend(result_range);
}
Ok(result)
}
}
impl HeaderMetadata for Client where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
type Error = sp_blockchain::Error;
fn header_metadata(&self, hash: Block::Hash) -> Result, Self::Error> {
self.backend.blockchain().header_metadata(hash)
}
fn insert_header_metadata(&self, hash: Block::Hash, metadata: CachedHeaderMetadata) {
self.backend.blockchain().insert_header_metadata(hash, metadata)
}
fn remove_header_metadata(&self, hash: Block::Hash) {
self.backend.blockchain().remove_header_metadata(hash)
}
}
impl ProvideUncles for Client where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
fn uncles(&self, target_hash: Block::Hash, max_generation: NumberFor) -> sp_blockchain::Result> {
Ok(Client::uncles(self, target_hash, max_generation)?
.into_iter()
.filter_map(|hash| Client::header(self, &BlockId::Hash(hash)).unwrap_or(None))
.collect()
)
}
}
impl ChainHeaderBackend for Client where
B: backend::Backend,
E: CallExecutor + Send + Sync,
Block: BlockT,
RA: Send + Sync,
{
fn header(&self, id: BlockId) -> sp_blockchain::Result> {
self.backend.blockchain().header(id)
}
fn info(&self) -> blockchain::Info {
self.backend.blockchain().info()
}
fn status(&self, id: BlockId) -> sp_blockchain::Result {
self.backend.blockchain().status(id)
}
fn number(&self, hash: Block::Hash) -> sp_blockchain::Result::Header as HeaderT>::Number>> {
self.backend.blockchain().number(hash)
}
fn hash(&self, number: NumberFor) -> sp_blockchain::Result> {
self.backend.blockchain().hash(number)
}
}
impl sp_runtime::traits::BlockIdTo for Client where
B: backend::Backend