* State sync

* Importing state fixes

* Bugfixes

* Sync with proof

* Status reporting

* Unsafe sync mode

* Sync test

* Cleanup

* Apply suggestions from code review

Co-authored-by: cheme <emericchevalier.pro@gmail.com>
Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>

* set_genesis_storage

* Extract keys from range proof

* Detect iter completion

* Download and import bodies with fast sync

* Replaced meta updates tuple with a struct

* Fixed reverting finalized state

* Reverted timeout

* Typo

* Doc

* Doc

* Fixed light client test

* Fixed error handling

* Tweaks

* More UpdateMeta changes

* Rename convert_transaction

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Code review suggestions

* Fixed count handling

Co-authored-by: cheme <emericchevalier.pro@gmail.com>
Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Arkadiy Paronyan
2021-06-22 11:32:43 +02:00
committed by GitHub
parent 5899eedc8c
commit 77a4b980ae
54 changed files with 2128 additions and 379 deletions
+3 -2
View File
@@ -8022,6 +8022,7 @@ dependencies = [
"sp-runtime",
"sp-session",
"sp-state-machine",
"sp-storage",
"sp-tracing",
"sp-transaction-pool",
"sp-transaction-storage-proof",
@@ -10551,9 +10552,9 @@ dependencies = [
[[package]]
name = "trie-db"
version = "0.22.3"
version = "0.22.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec051edf7f0fc9499a2cb0947652cab2148b9d7f61cee7605e312e9f970dacaf"
checksum = "cd81fe0c8bc2b528a51c9d2c31dae4483367a26a723a3c9a4a8120311d7774e3"
dependencies = [
"hash-db",
"hashbrown",
+5
View File
@@ -41,6 +41,7 @@ use sp_consensus::BlockOrigin;
use parking_lot::RwLock;
pub use sp_state_machine::Backend as StateBackend;
pub use sp_consensus::ImportedState;
use std::marker::PhantomData;
/// Extracts the state backend type for the given backend.
@@ -161,6 +162,10 @@ pub trait BlockImportOperation<Block: BlockT> {
update: TransactionForSB<Self::State, Block>,
) -> sp_blockchain::Result<()>;
/// Set genesis state. If `commit` is `false` the state is saved in memory, but is not written
/// to the database.
fn set_genesis_state(&mut self, storage: Storage, commit: bool) -> sp_blockchain::Result<Block::Hash>;
/// Inject storage data into the database replacing any existing data.
fn reset_storage(&mut self, storage: Storage) -> sp_blockchain::Result<Block::Hash>;
+38 -19
View File
@@ -347,6 +347,11 @@ impl<Block: BlockT> HeaderBackend<Block> for Blockchain<Block> {
genesis_hash: storage.genesis_hash,
finalized_hash: storage.finalized_hash,
finalized_number: storage.finalized_number,
finalized_state: if storage.finalized_hash != Default::default() {
Some((storage.finalized_hash.clone(), storage.finalized_number))
} else {
None
},
number_leaves: storage.leaves.count()
}
}
@@ -528,6 +533,32 @@ pub struct BlockImportOperation<Block: BlockT> {
set_head: Option<BlockId<Block>>,
}
impl<Block: BlockT> BlockImportOperation<Block> where
Block::Hash: Ord,
{
fn apply_storage(&mut self, storage: Storage, commit: bool) -> sp_blockchain::Result<Block::Hash> {
check_genesis_storage(&storage)?;
let child_delta = storage.children_default.iter()
.map(|(_storage_key, child_content)|
(
&child_content.child_info,
child_content.data.iter().map(|(k, v)| (k.as_ref(), Some(v.as_ref())))
)
);
let (root, transaction) = self.old_state.full_storage_root(
storage.top.iter().map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
child_delta,
);
if commit {
self.new_state = Some(transaction);
}
Ok(root)
}
}
impl<Block: BlockT> backend::BlockImportOperation<Block> for BlockImportOperation<Block> where
Block::Hash: Ord,
{
@@ -569,24 +600,12 @@ impl<Block: BlockT> backend::BlockImportOperation<Block> for BlockImportOperatio
Ok(())
}
fn set_genesis_state(&mut self, storage: Storage, commit: bool) -> sp_blockchain::Result<Block::Hash> {
self.apply_storage(storage, commit)
}
fn reset_storage(&mut self, storage: Storage) -> sp_blockchain::Result<Block::Hash> {
check_genesis_storage(&storage)?;
let child_delta = storage.children_default.iter()
.map(|(_storage_key, child_content)|
(
&child_content.child_info,
child_content.data.iter().map(|(k, v)| (k.as_ref(), Some(v.as_ref())))
)
);
let (root, transaction) = self.old_state.full_storage_root(
storage.top.iter().map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
child_delta,
);
self.new_state = Some(transaction);
Ok(root)
self.apply_storage(storage, true)
}
fn insert_aux<I>(&mut self, ops: I) -> sp_blockchain::Result<()>
@@ -806,12 +825,12 @@ impl<Block: BlockT> backend::RemoteBackend<Block> for Backend<Block> where Block
/// Check that genesis storage is valid.
pub fn check_genesis_storage(storage: &Storage) -> sp_blockchain::Result<()> {
if storage.top.iter().any(|(k, _)| well_known_keys::is_child_storage_key(k)) {
return Err(sp_blockchain::Error::GenesisInvalid.into());
return Err(sp_blockchain::Error::InvalidState.into());
}
if storage.children_default.keys()
.any(|child_key| !well_known_keys::is_child_storage_key(&child_key)) {
return Err(sp_blockchain::Error::GenesisInvalid.into());
return Err(sp_blockchain::Error::InvalidState.into());
}
Ok(())
+1
View File
@@ -41,6 +41,7 @@ pub use proof_provider::*;
pub use sp_blockchain::HeaderBackend;
pub use sp_state_machine::{StorageProof, ExecutionStrategy};
pub use sp_storage::{StorageData, StorageKey, PrefixedStorageKey, ChildInfo};
/// Usage Information Provider interface
///
@@ -70,4 +70,31 @@ pub trait ProofProvider<Block: BlockT> {
storage_key: Option<&PrefixedStorageKey>,
key: &StorageKey,
) -> sp_blockchain::Result<ChangesProof<Block::Header>>;
/// Given a `BlockId` iterate over all storage values starting at `start_key` exclusively,
/// building proofs until size limit is reached. Returns combined proof and the number of collected keys.
fn read_proof_collection(
&self,
id: &BlockId<Block>,
start_key: &[u8],
size_limit: usize,
) -> sp_blockchain::Result<(StorageProof, u32)>;
/// Given a `BlockId` iterate over all storage values starting at `start_key`.
/// Returns collected keys and values.
fn storage_collection(
&self,
id: &BlockId<Block>,
start_key: &[u8],
size_limit: usize,
) -> sp_blockchain::Result<Vec<(Vec<u8>, Vec<u8>)>>;
/// Verify read storage proof for a set of keys.
/// Returns collected key-value pairs and a flag indicating if iteration is complete.
fn verify_range_proof(
&self,
root: Block::Hash,
proof: StorageProof,
start_key: &[u8],
) -> sp_blockchain::Result<(Vec<(Vec<u8>, Vec<u8>)>, bool)>;
}
@@ -69,6 +69,7 @@ impl<Block: BlockT> HeaderBackend<Block> for TestApi {
finalized_number: Zero::zero(),
genesis_hash: Default::default(),
number_leaves: Default::default(),
finalized_state: None,
}
}
+24
View File
@@ -232,6 +232,30 @@ arg_enum! {
}
}
arg_enum! {
/// Syncing mode.
#[allow(missing_docs)]
#[derive(Debug, Clone, Copy)]
pub enum SyncMode {
// Full sync. Donwnload end verify all blocks.
Full,
// Download blocks without executing them. Download latest state with proofs.
Fast,
// Download blocks without executing them. Download latest state without proofs.
FastUnsafe,
}
}
impl Into<sc_network::config::SyncMode> for SyncMode {
fn into(self) -> sc_network::config::SyncMode {
match self {
SyncMode::Full => sc_network::config::SyncMode::Full,
SyncMode::Fast => sc_network::config::SyncMode::Fast { skip_proofs: false },
SyncMode::FastUnsafe => sc_network::config::SyncMode::Fast { skip_proofs: true },
}
}
}
/// Default value for the `--execution-syncing` parameter.
pub const DEFAULT_EXECUTION_SYNCING: ExecutionStrategy = ExecutionStrategy::NativeElseWasm;
/// Default value for the `--execution-import-block` parameter.
@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::params::node_key_params::NodeKeyParams;
use crate::arg_enums::SyncMode;
use sc_network::{
config::{NetworkConfiguration, NodeKeyConfig, NonReservedPeerMode, SetConfig, TransportConfig},
multiaddr::Protocol,
@@ -125,6 +126,13 @@ pub struct NetworkParams {
/// Join the IPFS network and serve transactions over bitswap protocol.
#[structopt(long)]
pub ipfs_server: bool,
/// Blockchain syncing mode.
/// Full - Download and validate full blockchain history (Default).
/// Fast - Download blocks and the latest state only.
/// FastUnsafe - Same as Fast, but do skips downloading state proofs.
#[structopt(long, default_value = "Full")]
pub sync: SyncMode,
}
impl NetworkParams {
@@ -218,6 +226,7 @@ impl NetworkParams {
kademlia_disjoint_query_paths: self.kademlia_disjoint_query_paths,
yamux_window_size: None,
ipfs_server: self.ipfs_server,
sync_mode: self.sync.into(),
}
}
}
+4 -2
View File
@@ -42,7 +42,7 @@ use codec::{Encode, Decode, Codec};
use sp_consensus::{
BlockImport, Environment, Proposer, CanAuthorWith, ForkChoiceStrategy, BlockImportParams,
BlockOrigin, Error as ConsensusError, SelectChain,
BlockOrigin, Error as ConsensusError, SelectChain, StateAction,
};
use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider};
use sp_blockchain::{Result as CResult, ProvideCache, HeaderBackend};
@@ -421,7 +421,9 @@ where
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(signature_digest_item);
import_block.body = Some(body);
import_block.storage_changes = Some(storage_changes);
import_block.state_action = StateAction::ApplyChanges(
sp_consensus::StorageChanges::Changes(storage_changes)
);
import_block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
Ok(import_block)
+10 -2
View File
@@ -101,6 +101,7 @@ use sp_consensus::{
import_queue::{BasicQueue, CacheKeyId, DefaultImportQueue, Verifier},
BlockCheckParams, BlockImport, BlockImportParams, BlockOrigin, Environment,
Error as ConsensusError, ForkChoiceStrategy, Proposer, SelectChain, SlotData,
StateAction,
};
use sp_consensus_babe::inherents::BabeInherentData;
use sp_consensus_slots::Slot;
@@ -790,7 +791,9 @@ where
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(digest_item);
import_block.body = Some(body);
import_block.storage_changes = Some(storage_changes);
import_block.state_action = StateAction::ApplyChanges(
sp_consensus::StorageChanges::Changes(storage_changes)
);
import_block.intermediates.insert(
Cow::from(INTERMEDIATE_KEY),
Box::new(BabeIntermediate::<B> { epoch_descriptor }) as Box<_>,
@@ -1295,7 +1298,12 @@ impl<Block, Client, Inner> BlockImport<Block> for BabeBlockImport<Block, Client,
// early exit if block already in chain, otherwise the check for
// epoch changes will error when trying to re-import an epoch change
match self.client.status(BlockId::Hash(hash)) {
Ok(sp_blockchain::BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain),
Ok(sp_blockchain::BlockStatus::InChain) => {
// When re-importing existing block strip away intermediates.
let _ = block.take_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
return self.inner.import_block(block, new_cache).await.map_err(Into::into)
},
Ok(sp_blockchain::BlockStatus::Unknown) => {},
Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
}
@@ -28,7 +28,7 @@ use futures::prelude::*;
use sc_transaction_pool::txpool;
use sp_consensus::{
self, BlockImport, Environment, Proposer, ForkChoiceStrategy,
BlockImportParams, BlockOrigin, ImportResult, SelectChain,
BlockImportParams, BlockOrigin, ImportResult, SelectChain, StateAction,
};
use sp_blockchain::HeaderBackend;
use std::collections::HashMap;
@@ -145,7 +145,9 @@ pub async fn seal_block<B, BI, SC, C, E, P, CIDP>(
params.body = Some(body);
params.finalized = finalize;
params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
params.storage_changes = Some(proposal.storage_changes);
params.state_action = StateAction::ApplyChanges(
sp_consensus::StorageChanges::Changes(proposal.storage_changes)
);
if let Some(digest_provider) = digest_provider {
digest_provider.append_block_import(&parent, &mut params, &inherent_data)?;
+5 -2
View File
@@ -18,7 +18,8 @@
use std::{pin::Pin, time::Duration, collections::HashMap, borrow::Cow};
use sc_client_api::ImportNotifications;
use sp_consensus::{Proposal, BlockOrigin, BlockImportParams, import_queue::BoxBlockImport};
use sp_consensus::{Proposal, BlockOrigin, BlockImportParams, StorageChanges,
StateAction, import_queue::BoxBlockImport};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
@@ -136,7 +137,9 @@ where
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.storage_changes = Some(build.proposal.storage_changes);
import_block.state_action = StateAction::ApplyChanges(
StorageChanges::Changes(build.proposal.storage_changes)
);
let intermediate = PowIntermediate::<Algorithm::Difficulty> {
difficulty: Some(build.metadata.difficulty),
+12
View File
@@ -373,6 +373,18 @@ impl<B: BlockT> StateBackend<HashFor<B>> for BenchmarkingState<B> {
}
}
fn apply_to_key_values_while<F: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: F,
allow_missing: bool,
) -> Result<bool, Self::Error> {
self.state.borrow().as_ref().ok_or_else(state_err)?
.apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing)
}
fn apply_to_keys_while<F: FnMut(&[u8]) -> bool>(
&self,
child_info: Option<&ChildInfo>,
+335 -135
View File
@@ -205,6 +205,17 @@ impl<B: BlockT> StateBackend<HashFor<B>> for RefTrackingState<B> {
self.state.for_key_values_with_prefix(prefix, f)
}
fn apply_to_key_values_while<F: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: F,
allow_missing: bool,
) -> Result<bool, Self::Error> {
self.state.apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing)
}
fn apply_to_keys_while<F: FnMut(&[u8]) -> bool>(
&self,
child_info: Option<&ChildInfo>,
@@ -387,6 +398,14 @@ impl<'a> sc_state_db::MetaDb for StateMetaDb<'a> {
}
}
struct MetaUpdate<Block: BlockT> {
pub hash: Block::Hash,
pub number: NumberFor<Block>,
pub is_best: bool,
pub is_finalized: bool,
pub with_state: bool,
}
fn cache_header<Hash: std::cmp::Eq + std::hash::Hash, Header>(
cache: &mut LinkedHashMap<Hash, Option<Header>>,
hash: Hash,
@@ -427,11 +446,9 @@ impl<Block: BlockT> BlockchainDb<Block> {
fn update_meta(
&self,
hash: Block::Hash,
number: <Block::Header as HeaderT>::Number,
is_best: bool,
is_finalized: bool
update: MetaUpdate<Block>,
) {
let MetaUpdate { hash, number, is_best, is_finalized, with_state } = update;
let mut meta = self.meta.write();
if number.is_zero() {
meta.genesis_hash = hash;
@@ -444,6 +461,9 @@ impl<Block: BlockT> BlockchainDb<Block> {
}
if is_finalized {
if with_state {
meta.finalized_state = Some((hash.clone(), number));
}
meta.finalized_number = number;
meta.finalized_hash = hash;
}
@@ -484,6 +504,7 @@ impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for Blockcha
genesis_hash: meta.genesis_hash,
finalized_hash: meta.finalized_hash,
finalized_number: meta.finalized_number,
finalized_state: meta.finalized_state.clone(),
number_leaves: self.leaves.read().count(),
}
}
@@ -754,6 +775,42 @@ impl<Block: BlockT> BlockImportOperation<Block> {
}
}
}
fn apply_new_state(
&mut self,
storage: Storage,
) -> ClientResult<Block::Hash> {
if storage.top.keys().any(|k| well_known_keys::is_child_storage_key(&k)) {
return Err(sp_blockchain::Error::InvalidState.into());
}
let child_delta = storage.children_default.iter().map(|(_storage_key, child_content)|(
&child_content.child_info,
child_content.data.iter().map(|(k, v)| (&k[..], Some(&v[..]))),
));
let mut changes_trie_config = None;
let (root, transaction) = self.old_state.full_storage_root(
storage.top.iter().map(|(k, v)| {
if &k[..] == well_known_keys::CHANGES_TRIE_CONFIG {
changes_trie_config = Some(Decode::decode(&mut &v[..]));
}
(&k[..], Some(&v[..]))
}),
child_delta
);
let changes_trie_config = match changes_trie_config {
Some(Ok(c)) => Some(c),
Some(Err(_)) => return Err(sp_blockchain::Error::InvalidState.into()),
None => None,
};
self.db_updates = transaction;
self.changes_trie_config_update = Some(changes_trie_config);
Ok(root)
}
}
impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for BlockImportOperation<Block> {
@@ -796,35 +853,21 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for Bloc
&mut self,
storage: Storage,
) -> ClientResult<Block::Hash> {
if storage.top.keys().any(|k| well_known_keys::is_child_storage_key(&k)) {
return Err(sp_blockchain::Error::GenesisInvalid.into());
}
let child_delta = storage.children_default.iter().map(|(_storage_key, child_content)|(
&child_content.child_info,
child_content.data.iter().map(|(k, v)| (&k[..], Some(&v[..]))),
));
let mut changes_trie_config: Option<ChangesTrieConfiguration> = None;
let (root, transaction) = self.old_state.full_storage_root(
storage.top.iter().map(|(k, v)| {
if &k[..] == well_known_keys::CHANGES_TRIE_CONFIG {
changes_trie_config = Some(
Decode::decode(&mut &v[..])
.expect("changes trie configuration is encoded properly at genesis")
);
}
(&k[..], Some(&v[..]))
}),
child_delta
);
self.db_updates = transaction;
self.changes_trie_config_update = Some(changes_trie_config);
let root = self.apply_new_state(storage)?;
self.commit_state = true;
Ok(root)
}
fn set_genesis_state(
&mut self,
storage: Storage,
commit: bool,
) -> ClientResult<Block::Hash> {
let root = self.apply_new_state(storage)?;
self.commit_state = commit;
Ok(root)
}
fn update_changes_trie(
&mut self,
update: ChangesTrieTransaction<HashFor<Block>, NumberFor<Block>>,
@@ -907,18 +950,39 @@ impl<Block: BlockT> sc_state_db::NodeDb for StorageDb<Block> {
}
}
struct DbGenesisStorage<Block: BlockT>(pub Block::Hash);
struct DbGenesisStorage<Block: BlockT> {
root: Block::Hash,
storage: PrefixedMemoryDB<HashFor<Block>>,
}
impl<Block: BlockT> DbGenesisStorage<Block> {
pub fn new() -> Self {
let mut root = Block::Hash::default();
let mut mdb = MemoryDB::<HashFor<Block>>::default();
sp_state_machine::TrieDBMut::<HashFor<Block>>::new(&mut mdb, &mut root);
DbGenesisStorage(root)
pub fn new(root: Block::Hash, storage: PrefixedMemoryDB<HashFor<Block>>) -> Self {
DbGenesisStorage {
root,
storage,
}
}
}
impl<Block: BlockT> sp_state_machine::Storage<HashFor<Block>> for DbGenesisStorage<Block> {
fn get(&self, key: &Block::Hash, prefix: Prefix) -> Result<Option<DBValue>, String> {
use hash_db::HashDB;
Ok(self.storage.get(key, prefix))
}
}
struct EmptyStorage<Block: BlockT>(pub Block::Hash);
impl<Block: BlockT> EmptyStorage<Block> {
pub fn new() -> Self {
let mut root = Block::Hash::default();
let mut mdb = MemoryDB::<HashFor<Block>>::default();
sp_state_machine::TrieDBMut::<HashFor<Block>>::new(&mut mdb, &mut root);
EmptyStorage(root)
}
}
impl<Block: BlockT> sp_state_machine::Storage<HashFor<Block>> for EmptyStorage<Block> {
fn get(&self, _key: &Block::Hash, _prefix: Prefix) -> Result<Option<DBValue>, String> {
Ok(None)
}
@@ -980,6 +1044,7 @@ pub struct Backend<Block: BlockT> {
transaction_storage: TransactionStorageMode,
io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>,
state_usage: Arc<StateUsageStats>,
genesis_state: RwLock<Option<Arc<DbGenesisStorage<Block>>>>,
}
impl<Block: BlockT> Backend<Block> {
@@ -1058,7 +1123,7 @@ impl<Block: BlockT> Backend<Block> {
},
)?;
Ok(Backend {
let backend = Backend {
storage: Arc::new(storage_db),
offchain_storage,
changes_tries_storage,
@@ -1074,7 +1139,24 @@ impl<Block: BlockT> Backend<Block> {
state_usage: Arc::new(StateUsageStats::new()),
keep_blocks: config.keep_blocks.clone(),
transaction_storage: config.transaction_storage.clone(),
})
genesis_state: RwLock::new(None),
};
// Older DB versions have no last state key. Check if the state is available and set it.
let info = backend.blockchain.info();
if info.finalized_state.is_none()
&& info.finalized_hash != Default::default()
&& sc_client_api::Backend::have_state_at(&backend, &info.finalized_hash, info.finalized_number)
{
backend.blockchain.update_meta(MetaUpdate {
hash: info.finalized_hash,
number: info.finalized_number,
is_best: info.finalized_hash == info.best_hash,
is_finalized: true,
with_state: true,
});
}
Ok(backend)
}
/// Handle setting head within a transaction. `route_to` should be the last
@@ -1170,10 +1252,11 @@ impl<Block: BlockT> Backend<Block> {
justification: Option<Justification>,
changes_trie_cache_ops: &mut Option<DbChangesTrieStorageTransaction<Block>>,
finalization_displaced: &mut Option<FinalizationDisplaced<Block::Hash, NumberFor<Block>>>,
) -> ClientResult<(Block::Hash, <Block::Header as HeaderT>::Number, bool, bool)> {
) -> ClientResult<MetaUpdate<Block>> {
// TODO: ensure best chain contains this block.
let number = *header.number();
self.ensure_sequential_finalization(header, last_finalized)?;
let with_state = sc_client_api::Backend::have_state_at(self, &hash, number);
self.note_finalized(
transaction,
@@ -1182,6 +1265,7 @@ impl<Block: BlockT> Backend<Block> {
*hash,
changes_trie_cache_ops,
finalization_displaced,
with_state,
)?;
if let Some(justification) = justification {
@@ -1191,7 +1275,13 @@ impl<Block: BlockT> Backend<Block> {
Justifications::from(justification).encode(),
);
}
Ok((*hash, number, false, true))
Ok(MetaUpdate {
hash: *hash,
number,
is_best: false,
is_finalized: true,
with_state,
})
}
// performs forced canonicalization with a delay after importing a non-finalized block.
@@ -1219,6 +1309,9 @@ impl<Block: BlockT> Backend<Block> {
)?.expect("existence of block with number `new_canonical` \
implies existence of blocks with all numbers before it; qed")
};
if !sc_client_api::Backend::have_state_at(self, &hash, new_canonical.saturated_into()) {
return Ok(())
}
trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash);
let commit = self.storage.state_db.canonicalize_block(&hash)
@@ -1240,12 +1333,13 @@ impl<Block: BlockT> Backend<Block> {
let mut meta_updates = Vec::with_capacity(operation.finalized_blocks.len());
let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash;
let mut last_finalized_num = self.blockchain.meta.read().finalized_number;
let best_num = self.blockchain.meta.read().best_number;
let mut changes_trie_cache_ops = None;
for (block, justification) in operation.finalized_blocks {
let block_hash = self.blockchain.expect_block_hash_from_id(&block)?;
let block_header = self.blockchain.expect_header(BlockId::Hash(block_hash))?;
meta_updates.push(self.finalize_block_with_transaction(
&mut transaction,
&block_hash,
@@ -1256,12 +1350,16 @@ impl<Block: BlockT> Backend<Block> {
&mut finalization_displaced_leaves,
)?);
last_finalized_hash = block_hash;
last_finalized_num = block_header.number().clone();
}
let imported = if let Some(pending_block) = operation.pending_block {
let hash = pending_block.header.hash();
let parent_hash = *pending_block.header.parent_hash();
let number = pending_block.header.number().clone();
let existing_header = number <= best_num && self.blockchain.header(BlockId::hash(hash))?.is_some();
// blocks are keyed by number + hash.
let lookup_key = utils::number_and_hash_to_lookup_key(number, hash)?;
@@ -1296,13 +1394,24 @@ impl<Block: BlockT> Backend<Block> {
}
if number.is_zero() {
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key);
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key.clone());
transaction.set(columns::META, meta_keys::GENESIS_HASH, hash.as_ref());
// for tests, because config is set from within the reset_storage
if operation.changes_trie_config_update.is_none() {
operation.changes_trie_config_update = Some(None);
}
if operation.commit_state {
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_STATE, lookup_key);
} else {
// When we don't want to commit the genesis state, we still preserve it in memory
// to bootstrap consensus. It is queried for an initial list of authorities, etc.
*self.genesis_state.write() = Some(Arc::new(DbGenesisStorage::new(
pending_block.header.state_root().clone(),
operation.db_updates.clone()
)));
}
}
let finalized = if operation.commit_state {
@@ -1361,79 +1470,111 @@ impl<Block: BlockT> Backend<Block> {
changeset,
).map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from_state_db(e))?;
apply_state_commit(&mut transaction, commit);
if number <= last_finalized_num {
// Canonicalize in the db when re-importing existing blocks with state.
let commit = self.storage.state_db.canonicalize_block(&hash)
.map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from_state_db(e))?;
apply_state_commit(&mut transaction, commit);
meta_updates.push(MetaUpdate {
hash,
number,
is_best: false,
is_finalized: true,
with_state: true,
});
}
// Check if need to finalize. Genesis is always finalized instantly.
let finalized = number_u64 == 0 || pending_block.leaf_state.is_final();
finalized
} else {
false
number.is_zero() || pending_block.leaf_state.is_final()
};
let header = &pending_block.header;
let is_best = pending_block.leaf_state.is_best();
let changes_trie_updates = operation.changes_trie_updates;
let changes_trie_config_update = operation.changes_trie_config_update;
changes_trie_cache_ops = Some(self.changes_tries_storage.commit(
&mut transaction,
changes_trie_updates,
cache::ComplexBlockId::new(
*header.parent_hash(),
if number.is_zero() { Zero::zero() } else { number - One::one() },
),
cache::ComplexBlockId::new(hash, number),
header,
finalized,
changes_trie_config_update,
changes_trie_cache_ops,
)?);
self.state_usage.merge_sm(operation.old_state.usage_info());
// release state reference so that it can be finalized
let cache = operation.old_state.into_cache_changes();
if finalized {
// TODO: ensure best chain contains this block.
self.ensure_sequential_finalization(header, Some(last_finalized_hash))?;
self.note_finalized(
&mut transaction,
true,
header,
hash,
&mut changes_trie_cache_ops,
&mut finalization_displaced_leaves,
)?;
} else {
// canonicalize blocks which are old enough, regardless of finality.
self.force_delayed_canonicalize(&mut transaction, hash, *header.number())?
}
debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, is_best);
let displaced_leaf = {
let mut leaves = self.blockchain.leaves.write();
let displaced_leaf = leaves.import(hash, number, parent_hash);
leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
displaced_leaf
};
let mut children = children::read_children(
&*self.storage.db,
columns::META,
meta_keys::CHILDREN_PREFIX,
parent_hash,
)?;
children.push(hash);
children::write_children(
&mut transaction,
columns::META,
meta_keys::CHILDREN_PREFIX,
parent_hash,
children,
debug!(target: "db",
"DB Commit {:?} ({}), best={}, state={}, existing={}",
hash, number, is_best, operation.commit_state, existing_header,
);
meta_updates.push((hash, number, pending_block.leaf_state.is_best(), finalized));
if !existing_header {
let changes_trie_config_update = operation.changes_trie_config_update;
changes_trie_cache_ops = Some(self.changes_tries_storage.commit(
&mut transaction,
changes_trie_updates,
cache::ComplexBlockId::new(
*header.parent_hash(),
if number.is_zero() { Zero::zero() } else { number - One::one() },
),
cache::ComplexBlockId::new(hash, number),
header,
finalized,
changes_trie_config_update,
changes_trie_cache_ops,
)?);
Some((pending_block.header, number, hash, enacted, retracted, displaced_leaf, is_best, cache))
self.state_usage.merge_sm(operation.old_state.usage_info());
// release state reference so that it can be finalized
let cache = operation.old_state.into_cache_changes();
if finalized {
// TODO: ensure best chain contains this block.
self.ensure_sequential_finalization(header, Some(last_finalized_hash))?;
self.note_finalized(
&mut transaction,
true,
header,
hash,
&mut changes_trie_cache_ops,
&mut finalization_displaced_leaves,
operation.commit_state,
)?;
} else {
// canonicalize blocks which are old enough, regardless of finality.
self.force_delayed_canonicalize(&mut transaction, hash, *header.number())?
}
let displaced_leaf = {
let mut leaves = self.blockchain.leaves.write();
let displaced_leaf = leaves.import(hash, number, parent_hash);
leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
displaced_leaf
};
let mut children = children::read_children(
&*self.storage.db,
columns::META,
meta_keys::CHILDREN_PREFIX,
parent_hash,
)?;
if !children.contains(&hash) {
children.push(hash);
}
children::write_children(
&mut transaction,
columns::META,
meta_keys::CHILDREN_PREFIX,
parent_hash,
children,
);
meta_updates.push(MetaUpdate {
hash,
number,
is_best: pending_block.leaf_state.is_best(),
is_finalized: finalized,
with_state: operation.commit_state,
});
Some((pending_block.header, number, hash, enacted, retracted, displaced_leaf, is_best, cache))
} else {
None
}
} else {
None
};
@@ -1448,7 +1589,13 @@ impl<Block: BlockT> Backend<Block> {
hash.clone(),
(number.clone(), hash.clone())
)?;
meta_updates.push((hash, *number, true, false));
meta_updates.push(MetaUpdate {
hash,
number: *number,
is_best: true,
is_finalized: false,
with_state: false,
});
Some((enacted, retracted))
} else {
return Err(sp_blockchain::Error::UnknownBlock(format!("Cannot set head {:?}", set_head)))
@@ -1472,6 +1619,7 @@ impl<Block: BlockT> Backend<Block> {
is_best,
mut cache,
)) = imported {
trace!(target: "db", "DB Commit done {:?}", hash);
let header_metadata = CachedHeaderMetadata::from(&header);
self.blockchain.insert_header_metadata(
header_metadata.hash,
@@ -1498,8 +1646,8 @@ impl<Block: BlockT> Backend<Block> {
self.shared_cache.lock().sync(&enacted, &retracted);
}
for (hash, number, is_best, is_finalized) in meta_updates {
self.blockchain.update_meta(hash, number, is_best, is_finalized);
for m in meta_updates {
self.blockchain.update_meta(m);
}
Ok(())
@@ -1515,29 +1663,35 @@ impl<Block: BlockT> Backend<Block> {
f_header: &Block::Header,
f_hash: Block::Hash,
changes_trie_cache_ops: &mut Option<DbChangesTrieStorageTransaction<Block>>,
displaced: &mut Option<FinalizationDisplaced<Block::Hash, NumberFor<Block>>>
displaced: &mut Option<FinalizationDisplaced<Block::Hash, NumberFor<Block>>>,
with_state: bool,
) -> ClientResult<()> {
let f_num = f_header.number().clone();
if self.storage.state_db.best_canonical().map(|c| f_num.saturated_into::<u64>() > c).unwrap_or(true) {
let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone())?;
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key);
let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone())?;
if with_state {
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_STATE, lookup_key.clone());
}
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key);
if sc_client_api::Backend::have_state_at(self, &f_hash, f_num) &&
self.storage.state_db.best_canonical().map(|c| f_num.saturated_into::<u64>() > c).unwrap_or(true)
{
let commit = self.storage.state_db.canonicalize_block(&f_hash)
.map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from_state_db(e))?;
apply_state_commit(transaction, commit);
}
if !f_num.is_zero() {
let new_changes_trie_cache_ops = self.changes_tries_storage.finalize(
transaction,
*f_header.parent_hash(),
f_hash,
f_num,
if is_inserted { Some(&f_header) } else { None },
changes_trie_cache_ops.take(),
)?;
*changes_trie_cache_ops = Some(new_changes_trie_cache_ops);
}
if !f_num.is_zero() {
let new_changes_trie_cache_ops = self.changes_tries_storage.finalize(
transaction,
*f_header.parent_hash(),
f_hash,
f_num,
if is_inserted { Some(&f_header) } else { None },
changes_trie_cache_ops.take(),
)?;
*changes_trie_cache_ops = Some(new_changes_trie_cache_ops);
}
let new_displaced = self.blockchain.leaves.write().finalize_height(f_num);
@@ -1628,6 +1782,23 @@ impl<Block: BlockT> Backend<Block> {
}
Ok(())
}
fn empty_state(&self) -> ClientResult<SyncingCachingState<RefTrackingState<Block>, Block>> {
let root = EmptyStorage::<Block>::new().0; // Empty trie
let db_state = DbState::<Block>::new(self.storage.clone(), root);
let state = RefTrackingState::new(db_state, self.storage.clone(), None);
let caching_state = CachingState::new(
state,
self.shared_cache.clone(),
None,
);
Ok(SyncingCachingState::new(
caching_state,
self.state_usage.clone(),
self.blockchain.meta.clone(),
self.import_lock.clone(),
))
}
}
@@ -1737,7 +1908,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
type OffchainStorage = offchain::LocalStorage;
fn begin_operation(&self) -> ClientResult<Self::BlockImportOperation> {
let mut old_state = self.state_at(BlockId::Hash(Default::default()))?;
let mut old_state = self.empty_state()?;
old_state.disable_syncing();
Ok(BlockImportOperation {
@@ -1763,7 +1934,11 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
operation: &mut Self::BlockImportOperation,
block: BlockId<Block>,
) -> ClientResult<()> {
operation.old_state = self.state_at(block)?;
if block.is_pre_genesis() {
operation.old_state = self.empty_state()?;
} else {
operation.old_state = self.state_at(block)?;
}
operation.old_state.disable_syncing();
operation.commit_state = true;
@@ -1800,7 +1975,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
let mut displaced = None;
let mut changes_trie_cache_ops = None;
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
let m = self.finalize_block_with_transaction(
&mut transaction,
&hash,
&header,
@@ -1810,7 +1985,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
&mut displaced,
)?;
self.storage.db.commit(transaction)?;
self.blockchain.update_meta(hash, number, is_best, is_finalized);
self.blockchain.update_meta(m);
self.changes_tries_storage.post_commit(changes_trie_cache_ops);
Ok(())
}
@@ -1967,14 +2142,36 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
meta_keys::FINALIZED_BLOCK,
key.clone()
);
reverted_finalized.insert(removed_hash);
if let Some((hash, _)) = self.blockchain.info().finalized_state {
if hash == best_hash {
if !best_number.is_zero()
&& self.have_state_at(&prev_hash, best_number - One::one())
{
let lookup_key = utils::number_and_hash_to_lookup_key(
best_number - One::one(),
prev_hash
)?;
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_STATE, lookup_key);
} else {
transaction.remove(columns::META, meta_keys::FINALIZED_STATE);
}
}
}
}
transaction.set_from_vec(columns::META, meta_keys::BEST_BLOCK, key);
transaction.remove(columns::KEY_LOOKUP, removed.hash().as_ref());
children::remove_children(&mut transaction, columns::META, meta_keys::CHILDREN_PREFIX, best_hash);
self.storage.db.commit(transaction)?;
self.changes_tries_storage.post_commit(Some(changes_trie_cache_ops));
self.blockchain.update_meta(best_hash, best_number, true, update_finalized);
self.blockchain.update_meta(MetaUpdate {
hash: best_hash,
number: best_number,
is_best: true,
is_finalized: update_finalized,
with_state: false
});
}
None => return Ok(c.saturated_into::<NumberFor<Block>>())
}
@@ -2061,26 +2258,30 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
fn state_at(&self, block: BlockId<Block>) -> ClientResult<Self::State> {
use sc_client_api::blockchain::HeaderBackend as BcHeaderBackend;
// special case for genesis initialization
match block {
BlockId::Hash(h) if h == Default::default() => {
let genesis_storage = DbGenesisStorage::<Block>::new();
let root = genesis_storage.0.clone();
let db_state = DbState::<Block>::new(Arc::new(genesis_storage), root);
let is_genesis = match &block {
BlockId::Number(n) if n.is_zero() => true,
BlockId::Hash(h) if h == &self.blockchain.meta.read().genesis_hash => true,
_ => false,
};
if is_genesis {
if let Some(genesis_state) = &*self.genesis_state.read() {
let root = genesis_state.root.clone();
let db_state = DbState::<Block>::new(genesis_state.clone(), root);
let state = RefTrackingState::new(db_state, self.storage.clone(), None);
let caching_state = CachingState::new(
state,
self.shared_cache.clone(),
None,
);
return Ok(SyncingCachingState::new(
let mut state = SyncingCachingState::new(
caching_state,
self.state_usage.clone(),
self.blockchain.meta.clone(),
self.import_lock.clone(),
));
},
_ => {}
);
state.disable_syncing();
return Ok(state)
}
}
let hash = match block {
@@ -2305,7 +2506,6 @@ pub(crate) mod tests {
let db = Backend::<Block>::new_test(2, 0);
let hash = {
let mut op = db.begin_operation().unwrap();
db.begin_state_operation(&mut op, BlockId::Hash(Default::default())).unwrap();
let mut header = Header {
number: 0,
parent_hash: Default::default(),
+6 -1
View File
@@ -151,9 +151,14 @@ impl<Block> BlockchainHeaderBackend<Block> for LightStorage<Block>
BlockchainInfo {
best_hash: meta.best_hash,
best_number: meta.best_number,
genesis_hash: meta.genesis_hash,
genesis_hash: meta.genesis_hash.clone(),
finalized_hash: meta.finalized_hash,
finalized_number: meta.finalized_number,
finalized_state: if meta.finalized_hash != Default::default() {
Some((meta.genesis_hash, Zero::zero()))
} else {
None
},
number_leaves: 1,
}
}
+22
View File
@@ -605,6 +605,17 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> StateBackend<HashFor<B>> for Cachin
self.state.exists_child_storage(child_info, key)
}
fn apply_to_key_values_while<F: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: F,
allow_missing: bool,
) -> Result<bool, Self::Error> {
self.state.apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing)
}
fn apply_to_keys_while<F: FnMut(&[u8]) -> bool>(
&self,
child_info: Option<&ChildInfo>,
@@ -788,6 +799,17 @@ impl<S: StateBackend<HashFor<B>>, B: BlockT> StateBackend<HashFor<B>> for Syncin
self.caching_state().exists_child_storage(child_info, key)
}
fn apply_to_key_values_while<F: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: F,
allow_missing: bool,
) -> Result<bool, Self::Error> {
self.caching_state().apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing)
}
fn apply_to_keys_while<F: FnMut(&[u8]) -> bool>(
&self,
child_info: Option<&ChildInfo>,
+13 -1
View File
@@ -49,6 +49,8 @@ pub mod meta_keys {
pub const BEST_BLOCK: &[u8; 4] = b"best";
/// Last finalized block key.
pub const FINALIZED_BLOCK: &[u8; 5] = b"final";
/// Last finalized state key.
pub const FINALIZED_STATE: &[u8; 6] = b"fstate";
/// Meta information prefix for list-based caches.
pub const CACHE_META_PREFIX: &[u8; 5] = b"cache";
/// Meta information for changes tries key.
@@ -74,6 +76,8 @@ pub struct Meta<N, H> {
pub finalized_number: N,
/// Hash of the genesis block.
pub genesis_hash: H,
/// Finalized state, if any
pub finalized_state: Option<(H, N)>,
}
/// A block lookup key: used for canonical lookup from block number to hash
@@ -391,6 +395,7 @@ pub fn read_meta<Block>(db: &dyn Database<DbHash>, col_header: u32) -> Result<
finalized_hash: Default::default(),
finalized_number: Zero::zero(),
genesis_hash: Default::default(),
finalized_state: None,
}),
};
@@ -408,12 +413,18 @@ pub fn read_meta<Block>(db: &dyn Database<DbHash>, col_header: u32) -> Result<
);
Ok((hash, *header.number()))
} else {
Ok((genesis_hash.clone(), Zero::zero()))
Ok((Default::default(), Zero::zero()))
}
};
let (best_hash, best_number) = load_meta_block("best", meta_keys::BEST_BLOCK)?;
let (finalized_hash, finalized_number) = load_meta_block("final", meta_keys::FINALIZED_BLOCK)?;
let (finalized_state_hash, finalized_state_number) = load_meta_block("final_state", meta_keys::FINALIZED_STATE)?;
let finalized_state = if finalized_state_hash != Default::default() {
Some((finalized_state_hash, finalized_state_number))
} else {
None
};
Ok(Meta {
best_hash,
@@ -421,6 +432,7 @@ pub fn read_meta<Block>(db: &dyn Database<DbHash>, col_header: u32) -> Result<
finalized_hash,
finalized_number,
genesis_hash,
finalized_state,
})
}
@@ -456,7 +456,11 @@ where
// early exit if block already in chain, otherwise the check for
// authority changes will error when trying to re-import a change block
match self.inner.status(BlockId::Hash(hash)) {
Ok(BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain),
Ok(BlockStatus::InChain) => {
// Strip justifications when re-importing an existing block.
let _justifications = block.justifications.take();
return (&*self.inner).import_block(block, new_cache).await
}
Ok(BlockStatus::Unknown) => {},
Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
}
+13 -4
View File
@@ -93,10 +93,19 @@ impl<B: BlockT> InformantDisplay<B> {
(diff_bytes_inbound, diff_bytes_outbound)
};
let (level, status, target) = match (net_status.sync_state, net_status.best_seen_block) {
(SyncState::Idle, _) => ("💤", "Idle".into(), "".into()),
(SyncState::Downloading, None) => ("⚙️ ", format!("Preparing{}", speed), "".into()),
(SyncState::Downloading, Some(n)) => (
let (level, status, target) = match (
net_status.sync_state,
net_status.best_seen_block,
net_status.state_sync
) {
(_, _, Some(state)) => (
"⚙️ ",
"Downloading state".into(),
format!(", {}%, ({:.2}) Mib", state.percentage, (state.size as f32) / (1024f32 * 1024f32)),
),
(SyncState::Idle, _, _) => ("💤", "Idle".into(), "".into()),
(SyncState::Downloading, None, _) => ("⚙️ ", format!("Preparing{}", speed), "".into()),
(SyncState::Downloading, Some(n), None) => (
"⚙️ ",
format!("Syncing{}", speed),
format!(", target=#{}", n),
+24 -2
View File
@@ -321,7 +321,7 @@ impl<S, Block> BlockImportOperation<Block> for ImportOperation<Block, S>
Ok(())
}
fn reset_storage(&mut self, input: Storage) -> ClientResult<Block::Hash> {
fn set_genesis_state(&mut self, input: Storage, commit: bool) -> ClientResult<Block::Hash> {
check_genesis_storage(&input)?;
// changes trie configuration
@@ -347,11 +347,17 @@ impl<S, Block> BlockImportOperation<Block> for ImportOperation<Block, S>
let storage_update = InMemoryBackend::from(storage);
let (storage_root, _) = storage_update.full_storage_root(std::iter::empty(), child_delta);
self.storage_update = Some(storage_update);
if commit {
self.storage_update = Some(storage_update);
}
Ok(storage_root)
}
fn reset_storage(&mut self, _input: Storage) -> ClientResult<Block::Hash> {
Err(ClientError::NotAvailableOnLightClient)
}
fn insert_aux<I>(&mut self, ops: I) -> ClientResult<()>
where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>
{
@@ -461,6 +467,22 @@ impl<H: Hasher> StateBackend<H> for GenesisOrUnavailableState<H>
}
}
fn apply_to_key_values_while<A: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
action: A,
allow_missing: bool,
) -> ClientResult<bool> {
match *self {
GenesisOrUnavailableState::Genesis(ref state) =>
Ok(state.apply_to_key_values_while(child_info, prefix, start_at, action, allow_missing)
.expect(IN_MEMORY_EXPECT_PROOF)),
GenesisOrUnavailableState::Unavailable => Err(ClientError::NotAvailableOnLightClient),
}
}
fn apply_to_keys_while<A: FnMut(&[u8]) -> bool>(
&self,
child_info: Option<&ChildInfo>,
+24 -1
View File
@@ -79,6 +79,11 @@ pub struct Behaviour<B: BlockT> {
/// [`request_responses::RequestResponsesBehaviour`].
#[behaviour(ignore)]
block_request_protocol_name: String,
/// Protocol name used to send out state requests via
/// [`request_responses::RequestResponsesBehaviour`].
#[behaviour(ignore)]
state_request_protocol_name: String,
}
/// Event generated by `Behaviour`.
@@ -186,6 +191,7 @@ impl<B: BlockT> Behaviour<B> {
light_client_request_sender: light_client_requests::sender::LightClientRequestSender<B>,
disco_config: DiscoveryConfig,
block_request_protocol_config: request_responses::ProtocolConfig,
state_request_protocol_config: request_responses::ProtocolConfig,
bitswap: Option<Bitswap<B>>,
light_client_request_protocol_config: request_responses::ProtocolConfig,
// All remaining request protocol configs.
@@ -193,7 +199,9 @@ impl<B: BlockT> Behaviour<B> {
) -> Result<Self, request_responses::RegisterError> {
// Extract protocol name and add to `request_response_protocols`.
let block_request_protocol_name = block_request_protocol_config.name.to_string();
let state_request_protocol_name = state_request_protocol_config.name.to_string();
request_response_protocols.push(block_request_protocol_config);
request_response_protocols.push(state_request_protocol_config);
request_response_protocols.push(light_client_request_protocol_config);
@@ -206,8 +214,8 @@ impl<B: BlockT> Behaviour<B> {
request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?,
light_client_request_sender,
events: VecDeque::new(),
block_request_protocol_name,
state_request_protocol_name,
})
}
@@ -329,6 +337,21 @@ Behaviour<B> {
&target, &self.block_request_protocol_name, buf, pending_response, IfDisconnected::ImmediateError,
);
},
CustomMessageOutcome::StateRequest { target, request, pending_response } => {
let mut buf = Vec::with_capacity(request.encoded_len());
if let Err(err) = request.encode(&mut buf) {
log::warn!(
target: "sync",
"Failed to encode state request {:?}: {:?}",
request, err
);
return
}
self.request_responses.send_request(
&target, &self.state_request_protocol_name, buf, pending_response, IfDisconnected::ImmediateError,
);
},
CustomMessageOutcome::NotificationStreamOpened {
remote, protocol, negotiated_fallback, roles, notifications_sink
} => {
+1
View File
@@ -21,6 +21,7 @@
use sp_blockchain::{Error, HeaderBackend, HeaderMetadata};
use sc_client_api::{BlockBackend, ProofProvider};
use sp_runtime::traits::{Block as BlockT, BlockIdTo};
pub use sc_client_api::{StorageKey, StorageData, ImportedState};
/// Local client abstraction for the network.
pub trait Client<Block: BlockT>: HeaderBackend<Block> + ProofProvider<Block> + BlockIdTo<Block, Error = Error>
+30
View File
@@ -123,6 +123,15 @@ pub struct Params<B: BlockT, H: ExHashT> {
/// [`crate::light_client_requests::handler::LightClientRequestHandler::new`] allowing
/// both outgoing and incoming requests.
pub light_client_request_protocol_config: RequestResponseConfig,
/// Request response configuration for the state request protocol.
///
/// Can be constructed either via
/// [`crate::state_requests::generate_protocol_config`] allowing outgoing but not
/// incoming requests, or constructed via
/// [`crate::state_requests::handler::StateRequestHandler::new`] allowing
/// both outgoing and incoming requests.
pub state_request_protocol_config: RequestResponseConfig,
}
/// Role of the local node.
@@ -373,6 +382,24 @@ impl From<multiaddr::Error> for ParseErr {
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
/// Sync operation mode.
pub enum SyncMode {
/// Full block download and verification.
Full,
/// Download blocks and the latest state.
Fast {
/// Skip state proof download and verification.
skip_proofs: bool
},
}
impl Default for SyncMode {
fn default() -> Self {
SyncMode::Full
}
}
/// Network service configuration.
#[derive(Clone, Debug)]
pub struct NetworkConfiguration {
@@ -400,6 +427,8 @@ pub struct NetworkConfiguration {
pub transport: TransportConfig,
/// Maximum number of peers to ask the same blocks in parallel.
pub max_parallel_downloads: u32,
/// Initial syncing mode.
pub sync_mode: SyncMode,
/// True if Kademlia random discovery should be enabled.
///
@@ -462,6 +491,7 @@ impl NetworkConfiguration {
wasm_external_transport: None,
},
max_parallel_downloads: 5,
sync_mode: SyncMode::Full,
enable_dht_random_walk: true,
allow_non_globals_in_dht: false,
kademlia_disjoint_query_paths: false,
@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::block_request_handler::BlockRequestHandler;
use crate::state_request_handler::StateRequestHandler;
use crate::light_client_requests::handler::LightClientRequestHandler;
use crate::gossip::QueuedSender;
use crate::{config, Event, NetworkService, NetworkWorker};
@@ -107,6 +108,16 @@ fn build_test_full_node(network_config: config::NetworkConfiguration)
protocol_config
};
let state_request_protocol_config = {
let (handler, protocol_config) = StateRequestHandler::new(
&protocol_id,
client.clone(),
50,
);
async_std::task::spawn(handler.run().boxed());
protocol_config
};
let light_client_request_protocol_config = {
let (handler, protocol_config) = LightClientRequestHandler::new(
&protocol_id,
@@ -131,6 +142,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration)
),
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
})
.unwrap();
+5 -1
View File
@@ -260,6 +260,7 @@ mod utils;
pub mod block_request_handler;
pub mod bitswap;
pub mod light_client_requests;
pub mod state_request_handler;
pub mod config;
pub mod error;
pub mod gossip;
@@ -268,7 +269,8 @@ pub mod transactions;
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo};
pub use protocol::{event::{DhtEvent, Event, ObservedRole}, PeerInfo};
pub use protocol::sync::{SyncState, StateDownloadProgress};
pub use service::{
NetworkService, NetworkWorker, RequestFailure, OutboundFailure, NotificationSender,
NotificationSenderReady, IfDisconnected,
@@ -321,4 +323,6 @@ pub struct NetworkStatus<B: BlockT> {
pub total_bytes_inbound: u64,
/// The total number of bytes sent.
pub total_bytes_outbound: u64,
/// State sync in progress.
pub state_sync: Option<protocol::sync::StateDownloadProgress>,
}
+129 -28
View File
@@ -22,6 +22,7 @@ use crate::{
error,
request_responses::RequestFailure,
utils::{interval, LruHashSet},
schema::v1::StateResponse,
};
use bytes::Bytes;
@@ -49,7 +50,7 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub},
};
use sp_arithmetic::traits::SaturatedConversion;
use sync::{ChainSync, SyncState};
use sync::{ChainSync, Status as SyncStatus};
use std::borrow::Cow;
use std::convert::TryFrom as _;
use std::collections::{HashMap, HashSet, VecDeque};
@@ -179,13 +180,19 @@ pub struct Protocol<B: BlockT> {
block_announce_data_cache: lru::LruCache<B::Hash, Vec<u8>>,
}
#[derive(Debug)]
enum PeerRequest<B: BlockT> {
Block(message::BlockRequest<B>),
State,
}
/// Peer information
#[derive(Debug)]
struct Peer<B: BlockT> {
info: PeerInfo<B>,
/// Current block request, if any. Started by emitting [`CustomMessageOutcome::BlockRequest`].
block_request: Option<(
message::BlockRequest<B>,
/// Current request, if any. Started by emitting [`CustomMessageOutcome::BlockRequest`].
request: Option<(
PeerRequest<B>,
oneshot::Receiver<Result<Vec<u8>, RequestFailure>>,
)>,
/// Holds a set of blocks known to this peer.
@@ -210,6 +217,21 @@ pub struct ProtocolConfig {
pub roles: Roles,
/// Maximum number of peers to ask the same blocks in parallel.
pub max_parallel_downloads: u32,
/// Enable state sync.
pub sync_mode: config::SyncMode,
}
impl ProtocolConfig {
fn sync_mode(&self) -> sync::SyncMode {
if self.roles.is_light() {
sync::SyncMode::Light
} else {
match self.sync_mode {
config::SyncMode::Full => sync::SyncMode::Full,
config::SyncMode::Fast { skip_proofs } => sync::SyncMode::LightState { skip_proofs },
}
}
}
}
impl Default for ProtocolConfig {
@@ -217,6 +239,7 @@ impl Default for ProtocolConfig {
ProtocolConfig {
roles: Roles::FULL,
max_parallel_downloads: 5,
sync_mode: config::SyncMode::Full,
}
}
}
@@ -263,12 +286,11 @@ impl<B: BlockT> Protocol<B> {
) -> error::Result<(Protocol<B>, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let info = chain.info();
let sync = ChainSync::new(
config.roles,
config.sync_mode(),
chain.clone(),
&info,
block_announce_validator,
config.max_parallel_downloads,
);
).map_err(Box::new)?;
let boot_node_ids = {
let mut list = HashSet::new();
@@ -454,13 +476,13 @@ impl<B: BlockT> Protocol<B> {
pub fn num_active_peers(&self) -> usize {
self.peers
.values()
.filter(|p| p.block_request.is_some())
.filter(|p| p.request.is_some())
.count()
}
/// Current global sync state.
pub fn sync_state(&self) -> SyncState {
self.sync.status().state
pub fn sync_state(&self) -> SyncStatus<B> {
self.sync.status()
}
/// Target sync block number.
@@ -656,6 +678,27 @@ impl<B: BlockT> Protocol<B> {
}
}
/// Must be called in response to a [`CustomMessageOutcome::StateRequest`] being emitted.
/// Must contain the same `PeerId` and request that have been emitted.
pub fn on_state_response(
&mut self,
peer_id: PeerId,
response: StateResponse,
) -> CustomMessageOutcome<B> {
match self.sync.on_state_data(&peer_id, response) {
Ok(sync::OnStateData::Import(origin, block)) =>
CustomMessageOutcome::BlockImport(origin, vec![block]),
Ok(sync::OnStateData::Request(peer, req)) => {
prepare_state_request::<B>(&mut self.peers, peer, req)
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
}
}
}
/// Perform time based maintenance.
///
/// > **Note**: This method normally doesn't have to be called except for testing purposes.
@@ -736,7 +779,7 @@ impl<B: BlockT> Protocol<B> {
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
request: None,
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
};
@@ -1137,7 +1180,7 @@ fn prepare_block_request<B: BlockT>(
let (tx, rx) = oneshot::channel();
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.block_request = Some((request.clone(), rx));
peer.request = Some((PeerRequest::Block(request.clone()), rx));
}
let request = crate::schema::v1::BlockRequest {
@@ -1161,6 +1204,23 @@ fn prepare_block_request<B: BlockT>(
}
}
fn prepare_state_request<B: BlockT>(
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: crate::schema::v1::StateRequest,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.request = Some((PeerRequest::State, rx));
}
CustomMessageOutcome::StateRequest {
target: who,
request: request,
pending_response: tx,
}
}
/// Outcome of an incoming custom message.
#[derive(Debug)]
#[must_use]
@@ -1192,6 +1252,12 @@ pub enum CustomMessageOutcome<B: BlockT> {
request: crate::schema::v1::BlockRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new storage request must be emitted.
StateRequest {
target: PeerId,
request: crate::schema::v1::StateRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// Peer has a reported a new head of chain.
PeerNewBest(PeerId, NumberFor<B>),
/// Now connected to a new peer for syncing purposes.
@@ -1254,27 +1320,54 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
// Check for finished outgoing requests.
let mut finished_block_requests = Vec::new();
let mut finished_state_requests = Vec::new();
for (id, peer) in self.peers.iter_mut() {
if let Peer { block_request: Some((_, pending_response)), .. } = peer {
if let Peer { request: Some((_, pending_response)), .. } = peer {
match pending_response.poll_unpin(cx) {
Poll::Ready(Ok(Ok(resp))) => {
let (req, _) = peer.block_request.take().unwrap();
let (req, _) = peer.request.take().unwrap();
match req {
PeerRequest::Block(req) => {
let protobuf_response = match crate::schema::v1::BlockResponse::decode(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode block response from peer {:?}: {:?}.",
id,
e
);
self.peerset_handle.report_peer(id.clone(), rep::BAD_MESSAGE);
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
continue;
}
};
let protobuf_response = match crate::schema::v1::BlockResponse::decode(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(target: "sync", "Failed to decode block request to peer {:?}: {:?}.", id, e);
self.peerset_handle.report_peer(id.clone(), rep::BAD_MESSAGE);
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
continue;
}
};
finished_block_requests.push((id.clone(), req, protobuf_response));
},
PeerRequest::State => {
let protobuf_response = match crate::schema::v1::StateResponse::decode(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode state response from peer {:?}: {:?}.",
id,
e
);
self.peerset_handle.report_peer(id.clone(), rep::BAD_MESSAGE);
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
continue;
}
};
finished_block_requests.push((id.clone(), req, protobuf_response));
finished_state_requests.push((id.clone(), protobuf_response));
},
}
},
Poll::Ready(Ok(Err(e))) => {
peer.block_request.take();
debug!(target: "sync", "Block request to peer {:?} failed: {:?}.", id, e);
peer.request.take();
debug!(target: "sync", "Request to peer {:?} failed: {:?}.", id, e);
match e {
RequestFailure::Network(OutboundFailure::Timeout) => {
@@ -1309,10 +1402,10 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
}
},
Poll::Ready(Err(oneshot::Canceled)) => {
peer.block_request.take();
peer.request.take();
trace!(
target: "sync",
"Block request to peer {:?} failed due to oneshot being canceled.",
"Request to peer {:?} failed due to oneshot being canceled.",
id,
);
self.behaviour.disconnect_peer(id, HARDCODED_PEERSETS_SYNC);
@@ -1325,6 +1418,10 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let ev = self.on_block_response(id, req, protobuf_response);
self.pending_messages.push_back(ev);
}
for (id, protobuf_response) in finished_state_requests {
let ev = self.on_state_response(id, protobuf_response);
self.pending_messages.push_back(ev);
}
while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) {
self.tick();
@@ -1334,6 +1431,10 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let event = prepare_block_request(&mut self.peers, id.clone(), request);
self.pending_messages.push_back(event);
}
if let Some((id, request)) = self.sync.state_request() {
let event = prepare_state_request(&mut self.peers, id, request);
self.pending_messages.push_back(event);
}
for (id, request) in self.sync.justification_requests() {
let event = prepare_block_request(&mut self.peers, id, request);
self.pending_messages.push_back(event);
+263 -65
View File
@@ -31,14 +31,16 @@
use codec::Encode;
use blocks::BlockCollection;
use sp_blockchain::{Error as ClientError, Info as BlockchainInfo, HeaderMetadata};
use state::StateSync;
use sp_blockchain::{Error as ClientError, HeaderMetadata};
use sp_consensus::{BlockOrigin, BlockStatus,
block_validation::{BlockAnnounceValidator, Validation},
import_queue::{IncomingBlock, BlockImportResult, BlockImportError}
};
use crate::protocol::message::{
self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse, Roles,
self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse,
};
use crate::schema::v1::{StateResponse, StateRequest};
use either::Either;
use extra_requests::ExtraRequests;
use libp2p::PeerId;
@@ -59,6 +61,7 @@ use futures::{task::Poll, Future, stream::FuturesUnordered, FutureExt, StreamExt
mod blocks;
mod extra_requests;
mod state;
/// Maximum blocks to request in a single packet.
const MAX_BLOCKS_TO_REQUEST: usize = 128;
@@ -84,6 +87,9 @@ const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS: usize = 256;
/// See [`MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS`] for more information.
const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4;
/// Pick the state to sync as the latest finalized number minus this.
const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
/// We use a heuristic that with a high likelihood, by the time
/// `MAJOR_SYNC_BLOCKS` have been imported we'll be on the same
/// chain as (or at least closer to) the peer so we want to delay
@@ -183,11 +189,8 @@ pub struct ChainSync<B: BlockT> {
best_queued_number: NumberFor<B>,
/// The best block hash in our queue of blocks to import
best_queued_hash: B::Hash,
/// The role of this node, e.g. light or full
role: Roles,
/// What block attributes we require for this node, usually derived from
/// what role we are, but could be customized
required_block_attributes: message::BlockAttributes,
/// Current mode (full/light)
mode: SyncMode,
/// Any extra justification requests.
extra_justifications: ExtraRequests<B>,
/// A set of hashes of blocks that are being downloaded or have been
@@ -209,6 +212,11 @@ pub struct ChainSync<B: BlockT> {
>,
/// Stats per peer about the number of concurrent block announce validations.
block_announce_validation_per_peer_stats: HashMap<PeerId, usize>,
/// State sync in progress, if any.
state_sync: Option<StateSync<B>>,
/// Enable importing existing blocks. This is used used after the state download to
/// catch up to the latest state while re-importing blocks.
import_existing: bool,
}
/// All the data we have about a Peer that we are trying to sync with
@@ -281,6 +289,8 @@ pub enum PeerSyncState<B: BlockT> {
DownloadingStale(B::Hash),
/// Downloading justification for given block hash.
DownloadingJustification(B::Hash),
/// Downloading state.
DownloadingState,
}
impl<B: BlockT> PeerSyncState<B> {
@@ -298,6 +308,15 @@ pub enum SyncState {
Downloading
}
/// Reported state download progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct StateDownloadProgress {
/// Estimated download percentage.
pub percentage: u32,
/// Total state size in bytes downloaded so far.
pub size: u64,
}
/// Syncing status and statistics.
#[derive(Clone)]
pub struct Status<B: BlockT> {
@@ -309,6 +328,8 @@ pub struct Status<B: BlockT> {
pub num_peers: u32,
/// Number of blocks queued for import
pub queued_blocks: u32,
/// State sync status in progress, if any.
pub state_sync: Option<StateDownloadProgress>,
}
/// A peer did not behave as expected and should be reported.
@@ -344,6 +365,15 @@ impl<B: BlockT> OnBlockData<B> {
}
}
/// Result of [`ChainSync::on_state_data`].
#[derive(Debug)]
pub enum OnStateData<B: BlockT> {
/// The block and state that should be imported.
Import(BlockOrigin, IncomingBlock<B>),
/// A new state request needs to be made to the given peer.
Request(PeerId, StateRequest)
}
/// Result of [`ChainSync::poll_block_announce_validation`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PollBlockAnnounceValidation<H> {
@@ -429,6 +459,20 @@ pub enum OnBlockJustification<B: BlockT> {
}
}
/// Operation mode.
#[derive(Debug, PartialEq, Eq)]
pub enum SyncMode {
// Sync headers only
Light,
// Sync headers and block bodies
Full,
// Sync headers and the last finalied state
LightState {
skip_proofs: bool
},
}
/// Result of [`ChainSync::has_slot_for_block_announce_validation`].
enum HasSlotForBlockAnnounceValidation {
/// Yes, there is a slot for the block announce validation.
@@ -442,27 +486,19 @@ enum HasSlotForBlockAnnounceValidation {
impl<B: BlockT> ChainSync<B> {
/// Create a new instance.
pub fn new(
role: Roles,
mode: SyncMode,
client: Arc<dyn crate::chain::Client<B>>,
info: &BlockchainInfo<B>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
) -> Self {
let mut required_block_attributes = BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION;
if role.is_full() {
required_block_attributes |= BlockAttributes::BODY
}
ChainSync {
) -> Result<Self, ClientError> {
let mut sync = ChainSync {
client,
peers: HashMap::new(),
blocks: BlockCollection::new(),
best_queued_hash: info.best_hash,
best_queued_number: info.best_number,
best_queued_hash: Default::default(),
best_queued_number: Zero::zero(),
extra_justifications: ExtraRequests::new("justification"),
role,
required_block_attributes,
mode,
queue_blocks: Default::default(),
fork_targets: Default::default(),
pending_requests: Default::default(),
@@ -471,6 +507,27 @@ impl<B: BlockT> ChainSync<B> {
downloaded_blocks: 0,
block_announce_validation: Default::default(),
block_announce_validation_per_peer_stats: Default::default(),
state_sync: None,
import_existing: false,
};
sync.reset_sync_start_point()?;
Ok(sync)
}
fn required_block_attributes(&self) -> BlockAttributes {
match self.mode {
SyncMode::Full => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
SyncMode::Light => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
SyncMode::LightState { .. } =>
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
}
}
fn skip_execution(&self) -> bool {
match self.mode {
SyncMode::Full => false,
SyncMode::Light => true,
SyncMode::LightState { .. } => true,
}
}
@@ -502,6 +559,7 @@ impl<B: BlockT> ChainSync<B> {
best_seen_block: best_seen,
num_peers: self.peers.len() as u32,
queued_blocks: self.queue_blocks.len() as u32,
state_sync: self.state_sync.as_ref().map(|s| s.progress()),
}
}
@@ -607,7 +665,7 @@ impl<B: BlockT> ChainSync<B> {
);
self.peers.insert(who.clone(), PeerSync {
peer_id: who.clone(),
common_number: best_number,
common_number: std::cmp::min(self.best_queued_number, best_number),
best_hash,
best_number,
state: PeerSyncState::Available,
@@ -718,7 +776,7 @@ impl<B: BlockT> ChainSync<B> {
/// Get an iterator over all block requests of all peers.
pub fn block_requests(&mut self) -> impl Iterator<Item = (&PeerId, BlockRequest<B>)> + '_ {
if self.pending_requests.is_empty() {
if self.pending_requests.is_empty() || self.state_sync.is_some() {
return Either::Left(std::iter::empty())
}
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
@@ -726,10 +784,10 @@ impl<B: BlockT> ChainSync<B> {
return Either::Left(std::iter::empty())
}
let major_sync = self.status().state == SyncState::Downloading;
let attrs = self.required_block_attributes();
let blocks = &mut self.blocks;
let attrs = &self.required_block_attributes;
let fork_targets = &mut self.fork_targets;
let last_finalized = self.client.info().finalized_number;
let last_finalized = std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
let best_queued = self.best_queued_number;
let client = &self.client;
let queue = &self.queue_blocks;
@@ -804,6 +862,28 @@ impl<B: BlockT> ChainSync<B> {
Either::Right(iter)
}
/// Get a state request, if any
pub fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
if let Some(sync) = &self.state_sync {
if sync.is_complete() {
return None;
}
if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) {
// Only one pending state request is allowed.
return None;
}
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.common_number >= sync.target_block_num() {
trace!(target: "sync", "New StateRequest for {}", id);
peer.state = PeerSyncState::DownloadingState;
let request = sync.next_request();
return Some((id.clone(), request))
}
}
}
None
}
/// Handle a response from the remote to a block request that we made.
///
/// `request` must be the original request that triggered `response`.
@@ -848,7 +928,9 @@ impl<B: BlockT> ChainSync<B> {
justifications,
origin: block_data.origin,
allow_missing_state: true,
import_existing: false,
import_existing: self.import_existing,
skip_execution: self.skip_execution(),
state: None,
}
}).collect()
}
@@ -870,7 +952,9 @@ impl<B: BlockT> ChainSync<B> {
justifications,
origin: Some(who.clone()),
allow_missing_state: true,
import_existing: false,
import_existing: self.import_existing,
skip_execution: self.skip_execution(),
state: None,
}
}).collect()
}
@@ -963,10 +1047,11 @@ impl<B: BlockT> ChainSync<B> {
peer.state = PeerSyncState::Available;
Vec::new()
}
}
| PeerSyncState::Available
| PeerSyncState::DownloadingJustification(..) => Vec::new()
},
PeerSyncState::Available
| PeerSyncState::DownloadingJustification(..)
| PeerSyncState::DownloadingState
=> Vec::new()
}
} else {
// When request.is_none() this is a block announcement. Just accept blocks.
@@ -983,6 +1068,8 @@ impl<B: BlockT> ChainSync<B> {
origin: Some(who.clone()),
allow_missing_state: true,
import_existing: false,
skip_execution: true,
state: None,
}
}).collect()
}
@@ -994,6 +1081,60 @@ impl<B: BlockT> ChainSync<B> {
Ok(self.validate_and_queue_blocks(new_blocks))
}
/// Handle a response from the remote to a state request that we made.
///
/// Returns next request if any.
pub fn on_state_data(
&mut self,
who: &PeerId,
response: StateResponse,
) -> Result<OnStateData<B>, BadPeer> {
let import_result = if let Some(sync) = &mut self.state_sync {
debug!(
target: "sync",
"Importing state data from {} with {} keys, {} proof nodes.",
who,
response.entries.len(),
response.proof.len(),
);
sync.import(response)
} else {
debug!(target: "sync", "Ignored obsolete state response from {}", who);
return Err(BadPeer(who.clone(), rep::NOT_REQUESTED));
};
match import_result {
state::ImportResult::Import(hash, header, state) => {
let origin = if self.status().state != SyncState::Downloading {
BlockOrigin::NetworkBroadcast
} else {
BlockOrigin::NetworkInitialSync
};
let block = IncomingBlock {
hash,
header: Some(header),
body: None,
justifications: None,
origin: None,
allow_missing_state: true,
import_existing: true,
skip_execution: self.skip_execution(),
state: Some(state),
};
debug!(target: "sync", "State sync is complete. Import is queued");
Ok(OnStateData::Import(origin, block))
}
state::ImportResult::Continue(request) => {
Ok(OnStateData::Request(who.clone(), request))
}
state::ImportResult::BadResponse => {
debug!(target: "sync", "Bad state data received from {}", who);
Err(BadPeer(who.clone(), rep::BAD_BLOCK))
}
}
}
fn validate_and_queue_blocks(
&mut self,
mut new_blocks: Vec<IncomingBlock<B>>,
@@ -1048,7 +1189,7 @@ impl<B: BlockT> ChainSync<B> {
// We only request one justification at a time
let justification = if let Some(block) = response.blocks.into_iter().next() {
if hash != block.hash {
info!(
warn!(
target: "sync",
"💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", who, hash, block.hash
);
@@ -1137,7 +1278,7 @@ impl<B: BlockT> ChainSync<B> {
if aux.bad_justification {
if let Some(ref peer) = who {
info!("💔 Sent block with bad justification to import");
warn!("💔 Sent block with bad justification to import");
output.push(Err(BadPeer(peer.clone(), rep::BAD_JUSTIFICATION)));
}
}
@@ -1145,6 +1286,17 @@ impl<B: BlockT> ChainSync<B> {
if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) {
peer.update_common_number(number);
}
let state_sync_complete = self.state_sync.as_ref().map_or(false, |s| s.target() == hash);
if state_sync_complete {
info!(
target: "sync",
"State sync is complete ({} MiB), restarting block sync.",
self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
);
self.state_sync = None;
self.mode = SyncMode::Full;
output.extend(self.restart());
}
},
Err(BlockImportError::IncompleteHeader(who)) => {
if let Some(peer) = who {
@@ -1171,7 +1323,7 @@ impl<B: BlockT> ChainSync<B> {
},
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
info!(
warn!(
target: "sync",
"💔 Block {:?} received from peer {} has been blacklisted",
hash,
@@ -1189,6 +1341,7 @@ impl<B: BlockT> ChainSync<B> {
e @ Err(BlockImportError::UnknownParent) |
e @ Err(BlockImportError::Other(_)) => {
warn!(target: "sync", "💔 Error importing block {:?}: {:?}", hash, e);
self.state_sync = None;
output.extend(self.restart());
},
Err(BlockImportError::Cancelled) => {}
@@ -1214,6 +1367,29 @@ impl<B: BlockT> ChainSync<B> {
is_descendent_of(&**client, base, block)
});
if let SyncMode::LightState { skip_proofs } = &self.mode {
if self.state_sync.is_none()
&& !self.peers.is_empty()
&& self.queue_blocks.is_empty()
{
// Finalized a recent block.
let mut heads: Vec<_> = self.peers.iter().map(|(_, peer)| peer.best_number).collect();
heads.sort();
let median = heads[heads.len() / 2];
if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
if let Ok(Some(header)) = self.client.header(BlockId::hash(hash.clone())) {
log::debug!(
target: "sync",
"Starting state sync for #{} ({})",
number,
hash,
);
self.state_sync = Some(StateSync::new(self.client.clone(), header, *skip_proofs));
}
}
}
}
if let Err(err) = r {
warn!(
target: "sync",
@@ -1536,7 +1712,7 @@ impl<B: BlockT> ChainSync<B> {
return PollBlockAnnounceValidation::Nothing { is_best, who, announce }
}
let requires_additional_data = !self.role.is_light() || !known_parent;
let requires_additional_data = self.mode != SyncMode::Light || !known_parent;
if !requires_additional_data {
trace!(
target: "sync",
@@ -1595,6 +1771,8 @@ impl<B: BlockT> ChainSync<B> {
origin: block_data.origin,
allow_missing_state: true,
import_existing: false,
skip_execution: self.skip_execution(),
state: None,
}
}).collect();
if !blocks.is_empty() {
@@ -1611,9 +1789,9 @@ impl<B: BlockT> ChainSync<B> {
&'a mut self,
) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
self.blocks.clear();
let info = self.client.info();
self.best_queued_hash = info.best_hash;
self.best_queued_number = info.best_number;
if let Err(e) = self.reset_sync_start_point() {
warn!(target: "sync", "💔 Unable to restart sync. :{:?}", e);
}
self.pending_requests.set_all();
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::take(&mut self.peers);
@@ -1624,7 +1802,7 @@ impl<B: BlockT> ChainSync<B> {
match p.state {
PeerSyncState::DownloadingJustification(_) => {
// We make sure our commmon number is at least something we have.
p.common_number = info.best_number;
p.common_number = self.best_queued_number;
self.peers.insert(id, p);
return None;
}
@@ -1640,6 +1818,38 @@ impl<B: BlockT> ChainSync<B> {
})
}
/// Find a block to start sync from. If we sync with state, that's the latest block we have state for.
fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
let info = self.client.info();
if matches!(self.mode, SyncMode::LightState {..}) && info.finalized_state.is_some() {
log::warn!(
target: "sync",
"Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
);
self.mode = SyncMode::Full;
}
self.import_existing = false;
self.best_queued_hash = info.best_hash;
self.best_queued_number = info.best_number;
if self.mode == SyncMode::Full {
if self.client.block_status(&BlockId::hash(info.best_hash))? != BlockStatus::InChainWithState {
self.import_existing = true;
// Latest state is missing, start with the last finalized state or genesis instead.
if let Some((hash, number)) = info.finalized_state {
log::debug!(target: "sync", "Starting from finalized state #{}", number);
self.best_queued_hash = hash;
self.best_queued_number = number;
} else {
log::debug!(target: "sync", "Restarting from genesis");
self.best_queued_hash = Default::default();
self.best_queued_number = Zero::zero();
}
}
}
log::trace!(target: "sync", "Restarted sync at #{} ({:?})", self.best_queued_number, self.best_queued_hash);
Ok(())
}
/// What is the status of the block corresponding to the given hash?
fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
if self.queue_blocks.contains(hash) {
@@ -1764,7 +1974,7 @@ fn peer_block_request<B: BlockT>(
id: &PeerId,
peer: &PeerSync<B>,
blocks: &mut BlockCollection<B>,
attrs: &message::BlockAttributes,
attrs: message::BlockAttributes,
max_parallel_downloads: u32,
finalized: NumberFor<B>,
best_num: NumberFor<B>,
@@ -1815,7 +2025,7 @@ fn fork_sync_request<B: BlockT>(
targets: &mut HashMap<B::Hash, ForkTarget<B>>,
best_num: NumberFor<B>,
finalized: NumberFor<B>,
attributes: &message::BlockAttributes,
attributes: message::BlockAttributes,
check_block: impl Fn(&B::Hash) -> BlockStatus,
) -> Option<(B::Hash, BlockRequest<B>)> {
targets.retain(|hash, r| {
@@ -1994,17 +2204,15 @@ mod test {
// internally we should process the response as the justification not being available.
let client = Arc::new(TestClientBuilder::new().build());
let info = client.info();
let block_announce_validator = Box::new(DefaultBlockAnnounceValidator);
let peer_id = PeerId::random();
let mut sync = ChainSync::new(
Roles::AUTHORITY,
SyncMode::Full,
client.clone(),
&info,
block_announce_validator,
1,
);
).unwrap();
let (a1_hash, a1_number) = {
let a1 = client.new_block(Default::default()).unwrap().build().unwrap().block;
@@ -2067,15 +2275,12 @@ mod test {
#[test]
fn restart_doesnt_affect_peers_downloading_finality_data() {
let mut client = Arc::new(TestClientBuilder::new().build());
let info = client.info();
let mut sync = ChainSync::new(
Roles::AUTHORITY,
SyncMode::Full,
client.clone(),
&info,
Box::new(DefaultBlockAnnounceValidator),
1,
);
).unwrap();
let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();
@@ -2242,15 +2447,13 @@ mod test {
sp_tracing::try_init_simple();
let mut client = Arc::new(TestClientBuilder::new().build());
let info = client.info();
let mut sync = ChainSync::new(
Roles::AUTHORITY,
SyncMode::Full,
client.clone(),
&info,
Box::new(DefaultBlockAnnounceValidator),
5,
);
).unwrap();
let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();
@@ -2359,12 +2562,11 @@ mod test {
let info = client.info();
let mut sync = ChainSync::new(
Roles::AUTHORITY,
SyncMode::Full,
client.clone(),
&info,
Box::new(DefaultBlockAnnounceValidator),
5,
);
).unwrap();
let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();
@@ -2481,12 +2683,11 @@ mod test {
let info = client.info();
let mut sync = ChainSync::new(
Roles::AUTHORITY,
SyncMode::Full,
client.clone(),
&info,
Box::new(DefaultBlockAnnounceValidator),
5,
);
).unwrap();
let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone();
let just = (*b"TEST", Vec::new());
@@ -2592,15 +2793,12 @@ mod test {
.map(|_| build_block(&mut client, None, false))
.collect::<Vec<_>>();
let info = client.info();
let mut sync = ChainSync::new(
Roles::AUTHORITY,
SyncMode::Full,
client.clone(),
&info,
Box::new(DefaultBlockAnnounceValidator),
1,
);
).unwrap();
let peer_id1 = PeerId::random();
let common_block = blocks[1].clone();
@@ -0,0 +1,187 @@
// This file is part of Substrate.
// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::sync::Arc;
use codec::{Encode, Decode};
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use sc_client_api::StorageProof;
use crate::schema::v1::{StateRequest, StateResponse, StateEntry};
use crate::chain::{Client, ImportedState};
use super::StateDownloadProgress;
/// State sync support.
/// State sync state machine. Accumulates partial state data until it
/// is ready to be imported.
pub struct StateSync<B: BlockT> {
target_block: B::Hash,
target_header: B::Header,
target_root: B::Hash,
last_key: Vec<u8>,
state: Vec<(Vec<u8>, Vec<u8>)>,
complete: bool,
client: Arc<dyn Client<B>>,
imported_bytes: u64,
skip_proof: bool,
}
/// Import state chunk result.
pub enum ImportResult<B: BlockT> {
/// State is complete and ready for import.
Import(B::Hash, B::Header, ImportedState<B>),
/// Continue dowloading.
Continue(StateRequest),
/// Bad state chunk.
BadResponse,
}
impl<B: BlockT> StateSync<B> {
/// Create a new instance.
pub fn new(client: Arc<dyn Client<B>>, target: B::Header, skip_proof: bool) -> Self {
StateSync {
client,
target_block: target.hash(),
target_root: target.state_root().clone(),
target_header: target,
last_key: Vec::default(),
state: Vec::default(),
complete: false,
imported_bytes: 0,
skip_proof,
}
}
/// Validate and import a state reponse.
pub fn import(&mut self, response: StateResponse) -> ImportResult<B> {
if response.entries.is_empty() && response.proof.is_empty() && !response.complete {
log::debug!(
target: "sync",
"Bad state response",
);
return ImportResult::BadResponse;
}
if !self.skip_proof && response.proof.is_empty() {
log::debug!(
target: "sync",
"Missing proof",
);
return ImportResult::BadResponse;
}
let complete = if !self.skip_proof {
log::debug!(
target: "sync",
"Importing state from {} trie nodes",
response.proof.len(),
);
let proof_size = response.proof.len() as u64;
let proof = match StorageProof::decode(&mut response.proof.as_ref()) {
Ok(proof) => proof,
Err(e) => {
log::debug!(target: "sync", "Error decoding proof: {:?}", e);
return ImportResult::BadResponse;
}
};
let (values, complete) = match self.client.verify_range_proof(
self.target_root,
proof,
&self.last_key
) {
Err(e) => {
log::debug!(
target: "sync",
"StateResponse failed proof verification: {:?}",
e,
);
return ImportResult::BadResponse;
},
Ok(values) => values,
};
log::debug!(target: "sync", "Imported with {} keys", values.len());
if let Some(last) = values.last().map(|(k, _)| k) {
self.last_key = last.clone();
}
for (key, value) in values {
self.imported_bytes += key.len() as u64;
self.state.push((key, value))
};
self.imported_bytes += proof_size;
complete
} else {
log::debug!(
target: "sync",
"Importing state from {:?} to {:?}",
response.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
response.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
);
if let Some(e) = response.entries.last() {
self.last_key = e.key.clone();
}
for StateEntry { key, value } in response.entries {
self.imported_bytes += (key.len() + value.len()) as u64;
self.state.push((key, value))
}
response.complete
};
if complete {
self.complete = true;
ImportResult::Import(self.target_block.clone(), self.target_header.clone(), ImportedState {
block: self.target_block.clone(),
state: std::mem::take(&mut self.state)
})
} else {
ImportResult::Continue(self.next_request())
}
}
/// Produce next state request.
pub fn next_request(&self) -> StateRequest {
StateRequest {
block: self.target_block.encode(),
start: self.last_key.clone(),
no_proof: self.skip_proof,
}
}
/// Check if the state is complete.
pub fn is_complete(&self) -> bool {
self.complete
}
/// Returns target block number.
pub fn target_block_num(&self) -> NumberFor<B> {
self.target_header.number().clone()
}
/// Returns target block hash.
pub fn target(&self) -> B::Hash {
self.target_block.clone()
}
/// Returns state sync estimated progress.
pub fn progress(&self) -> StateDownloadProgress {
let percent_done = (*self.last_key.get(0).unwrap_or(&0u8) as u32) * 100 / 256;
StateDownloadProgress {
percentage: percent_done,
size: self.imported_bytes,
}
}
}
@@ -68,3 +68,28 @@ message BlockData {
bytes justifications = 8; // optional
}
// Request storage data from a peer.
message StateRequest {
// Block header hash.
bytes block = 1;
// Start from this key. Equivalent to <empty bytes> if omitted.
bytes start = 2; // optional
// if 'true' indicates that response should contain raw key-values, rather than proof.
bool no_proof = 3;
}
message StateResponse {
// A collection of keys-values. Only populated if `no_proof` is `true`
repeated StateEntry entries = 1;
// If `no_proof` is false in request, this contains proof nodes.
bytes proof = 2;
// Set to true when there are no more keys to return.
bool complete = 3;
}
// A key-value pair
message StateEntry {
bytes key = 1;
bytes value = 2;
}
+9 -5
View File
@@ -48,7 +48,7 @@ use crate::{
Protocol,
Ready,
event::Event,
sync::SyncState,
sync::{SyncState, Status as SyncStatus},
},
transactions,
transport, ReputationChange,
@@ -196,6 +196,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
protocol::ProtocolConfig {
roles: From::from(&params.role),
max_parallel_downloads: params.network_config.max_parallel_downloads,
sync_mode: params.network_config.sync_mode.clone(),
},
params.chain.clone(),
params.protocol_id.clone(),
@@ -331,7 +332,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
};
let behaviour = {
let bitswap = if params.network_config.ipfs_server { Some(Bitswap::new(client)) } else { None };
let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(client));
let result = Behaviour::new(
protocol,
user_agent,
@@ -339,6 +340,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
light_client_request_sender,
discovery_config,
params.block_request_protocol_config,
params.state_request_protocol_config,
bitswap,
params.light_client_request_protocol_config,
params.network_config.request_response_protocols,
@@ -442,14 +444,16 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
/// High-level network status information.
pub fn status(&self) -> NetworkStatus<B> {
let status = self.sync_state();
NetworkStatus {
sync_state: self.sync_state(),
sync_state: status.state,
best_seen_block: self.best_seen_block(),
num_sync_peers: self.num_sync_peers(),
num_connected_peers: self.num_connected_peers(),
num_active_peers: self.num_active_peers(),
total_bytes_inbound: self.total_bytes_inbound(),
total_bytes_outbound: self.total_bytes_outbound(),
state_sync: status.state_sync,
}
}
@@ -474,7 +478,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
}
/// Current global sync state.
pub fn sync_state(&self) -> SyncState {
pub fn sync_state(&self) -> SyncStatus<B> {
self.network_service.behaviour().user_protocol().sync_state()
}
@@ -1869,7 +1873,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
*this.external_addresses.lock() = external_addresses;
}
let is_major_syncing = match this.network_service.behaviour_mut().user_protocol_mut().sync_state() {
let is_major_syncing = match this.network_service.behaviour_mut().user_protocol_mut().sync_state().state {
SyncState::Idle => false,
SyncState::Downloading => true,
};
@@ -18,6 +18,7 @@
use crate::{config, Event, NetworkService, NetworkWorker};
use crate::block_request_handler::BlockRequestHandler;
use crate::state_request_handler::StateRequestHandler;
use crate::light_client_requests::handler::LightClientRequestHandler;
use libp2p::PeerId;
@@ -107,6 +108,16 @@ fn build_test_full_node(config: config::NetworkConfiguration)
protocol_config
};
let state_request_protocol_config = {
let (handler, protocol_config) = StateRequestHandler::new(
&protocol_id,
client.clone(),
50,
);
async_std::task::spawn(handler.run().boxed());
protocol_config
};
let light_client_request_protocol_config = {
let (handler, protocol_config) = LightClientRequestHandler::new(
&protocol_id,
@@ -131,6 +142,7 @@ fn build_test_full_node(config: config::NetworkConfiguration)
),
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
})
.unwrap();
@@ -0,0 +1,246 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Helper for handling (i.e. answering) state requests from a remote peer via the
//! [`crate::request_responses::RequestResponsesBehaviour`].
use codec::{Encode, Decode};
use crate::chain::Client;
use crate::config::ProtocolId;
use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
use crate::schema::v1::{StateResponse, StateRequest, StateEntry};
use crate::{PeerId, ReputationChange};
use futures::channel::{mpsc, oneshot};
use futures::stream::StreamExt;
use log::debug;
use lru::LruCache;
use prost::Message;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;
use std::time::Duration;
use std::hash::{Hasher, Hash};
const LOG_TARGET: &str = "sync";
const MAX_RESPONSE_BYTES: usize = 2 * 1024 * 1024; // Actual reponse may be bigger.
const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
mod rep {
use super::ReputationChange as Rep;
/// Reputation change when a peer sent us the same request multiple times.
pub const SAME_REQUEST: Rep = Rep::new(i32::min_value(), "Same state request multiple times");
}
/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests.
pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
ProtocolConfig {
name: generate_protocol_name(protocol_id).into(),
max_request_size: 1024 * 1024,
max_response_size: 16 * 1024 * 1024,
request_timeout: Duration::from_secs(40),
inbound_queue: None,
}
}
/// Generate the state protocol name from chain specific protocol identifier.
fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
let mut s = String::new();
s.push_str("/");
s.push_str(protocol_id.as_ref());
s.push_str("/state/1");
s
}
/// The key of [`BlockRequestHandler::seen_requests`].
#[derive(Eq, PartialEq, Clone)]
struct SeenRequestsKey<B: BlockT> {
peer: PeerId,
block: B::Hash,
start: Vec<u8>,
}
impl<B: BlockT> Hash for SeenRequestsKey<B> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.peer.hash(state);
self.block.hash(state);
self.start.hash(state);
}
}
/// The value of [`StateRequestHandler::seen_requests`].
enum SeenRequestsValue {
/// First time we have seen the request.
First,
/// We have fulfilled the request `n` times.
Fulfilled(usize),
}
/// Handler for incoming block requests from a remote peer.
pub struct StateRequestHandler<B: BlockT> {
client: Arc<dyn Client<B>>,
request_receiver: mpsc::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
/// This is used to check if a peer is spamming us with the same request.
seen_requests: LruCache<SeenRequestsKey<B>, SeenRequestsValue>,
}
impl<B: BlockT> StateRequestHandler<B> {
/// Create a new [`StateRequestHandler`].
pub fn new(
protocol_id: &ProtocolId,
client: Arc<dyn Client<B>>,
num_peer_hint: usize,
) -> (Self, ProtocolConfig) {
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
let (tx, request_receiver) = mpsc::channel(num_peer_hint);
let mut protocol_config = generate_protocol_config(protocol_id);
protocol_config.inbound_queue = Some(tx);
let seen_requests = LruCache::new(num_peer_hint * 2);
(Self { client, request_receiver, seen_requests }, protocol_config)
}
/// Run [`StateRequestHandler`].
pub async fn run(mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;
match self.handle_request(payload, pending_response, &peer) {
Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
Err(e) => debug!(
target: LOG_TARGET,
"Failed to handle state request from {}: {}",
peer,
e,
),
}
}
}
fn handle_request(
&mut self,
payload: Vec<u8>,
pending_response: oneshot::Sender<OutgoingResponse>,
peer: &PeerId,
) -> Result<(), HandleRequestError> {
let request = StateRequest::decode(&payload[..])?;
let block: B::Hash = Decode::decode(&mut request.block.as_ref())?;
let key = SeenRequestsKey {
peer: *peer,
block: block.clone(),
start: request.start.clone(),
};
let mut reputation_changes = Vec::new();
match self.seen_requests.get_mut(&key) {
Some(SeenRequestsValue::First) => {},
Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
*requests = requests.saturating_add(1);
if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
reputation_changes.push(rep::SAME_REQUEST);
}
},
None => {
self.seen_requests.put(key.clone(), SeenRequestsValue::First);
}
}
log::trace!(
target: LOG_TARGET,
"Handling state request from {}: Block {:?}, Starting at {:?}, no_proof={}",
peer,
request.block,
sp_core::hexdisplay::HexDisplay::from(&request.start),
request.no_proof,
);
let result = if reputation_changes.is_empty() {
let mut response = StateResponse::default();
if !request.no_proof {
let (proof, count) = self.client.read_proof_collection(
&BlockId::hash(block),
&request.start,
MAX_RESPONSE_BYTES,
)?;
response.proof = proof.encode();
if count == 0 {
response.complete = true;
}
} else {
let entries = self.client.storage_collection(
&BlockId::hash(block),
&request.start,
MAX_RESPONSE_BYTES,
)?;
response.entries = entries.into_iter().map(|(key, value)| StateEntry { key, value }).collect();
if response.entries.is_empty() {
response.complete = true;
}
}
log::trace!(
target: LOG_TARGET,
"StateResponse contains {} keys, {}, proof nodes, complete={}, from {:?} to {:?}",
response.entries.len(),
response.proof.len(),
response.complete,
response.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
response.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
);
if let Some(value) = self.seen_requests.get_mut(&key) {
// If this is the first time we have processed this request, we need to change
// it to `Fulfilled`.
if let SeenRequestsValue::First = value {
*value = SeenRequestsValue::Fulfilled(1);
}
}
let mut data = Vec::with_capacity(response.encoded_len());
response.encode(&mut data)?;
Ok(data)
} else {
Err(())
};
pending_response.send(OutgoingResponse {
result,
reputation_changes,
sent_feedback: None,
}).map_err(|_| HandleRequestError::SendResponse)
}
}
#[derive(derive_more::Display, derive_more::From)]
enum HandleRequestError {
#[display(fmt = "Failed to decode request: {}.", _0)]
DecodeProto(prost::DecodeError),
#[display(fmt = "Failed to encode response: {}.", _0)]
EncodeProto(prost::EncodeError),
#[display(fmt = "Failed to decode block hash: {}.", _0)]
InvalidHash(codec::Error),
Client(sp_blockchain::Error),
#[display(fmt = "Failed to send response.")]
SendResponse,
}
@@ -46,6 +46,8 @@ fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock<Block>)
origin: Some(peer_id.clone()),
allow_missing_state: false,
import_existing: false,
state: None,
skip_execution: false,
})
}
+40 -5
View File
@@ -29,6 +29,7 @@ use std::{
use libp2p::build_multiaddr;
use log::trace;
use sc_network::block_request_handler::{self, BlockRequestHandler};
use sc_network::state_request_handler::{self, StateRequestHandler};
use sc_network::light_client_requests::{self, handler::LightClientRequestHandler};
use sp_blockchain::{
HeaderBackend, Result as ClientResult,
@@ -55,7 +56,7 @@ use sc_network::{
NetworkWorker, NetworkService, config::{ProtocolId, MultiaddrWithPeerId, NonReservedPeerMode},
Multiaddr,
};
use sc_network::config::{NetworkConfiguration, NonDefaultSetConfig, TransportConfig};
use sc_network::config::{NetworkConfiguration, NonDefaultSetConfig, TransportConfig, SyncMode};
use libp2p::PeerId;
use parking_lot::Mutex;
use sp_core::H256;
@@ -179,6 +180,19 @@ impl PeersClient {
}
}
pub fn has_state_at(&self, block: &BlockId<Block>) -> bool {
let header = match self.header(block).unwrap() {
Some(header) => header,
None => return false,
};
match self {
PeersClient::Full(_client, backend) =>
backend.have_state_at(&header.hash(), *header.number()),
PeersClient::Light(_client, backend) =>
backend.have_state_at(&header.hash(), *header.number()),
}
}
pub fn justifications(&self, block: &BlockId<Block>) -> ClientResult<Option<Justifications>> {
match *self {
PeersClient::Full(ref client, ref _backend) => client.justifications(block),
@@ -235,9 +249,9 @@ impl BlockImport<Block> for PeersClient {
) -> Result<ImportResult, Self::Error> {
match self {
PeersClient::Full(client, _) =>
client.import_block(block.convert_transaction(), cache).await,
client.import_block(block.clear_storage_changes_and_mutate(), cache).await,
PeersClient::Light(client, _) =>
client.import_block(block.convert_transaction(), cache).await,
client.import_block(block.clear_storage_changes_and_mutate(), cache).await,
}
}
}
@@ -584,7 +598,7 @@ impl<I> BlockImport<Block> for BlockImportAdapter<I> where
block: BlockImportParams<Block, ()>,
cache: HashMap<well_known_cache_keys::Id, Vec<u8>>,
) -> Result<ImportResult, Self::Error> {
self.inner.import_block(block.convert_transaction(), cache).await
self.inner.import_block(block.clear_storage_changes_and_mutate(), cache).await
}
}
@@ -644,6 +658,8 @@ pub struct FullPeerConfig {
pub connect_to_peers: Option<Vec<usize>>,
/// Whether the full peer should have the authority role.
pub is_authority: bool,
/// Syncing mode
pub sync_mode: SyncMode,
}
pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>::Transaction: Send {
@@ -699,10 +715,13 @@ pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>:
/// Add a full peer.
fn add_full_peer_with_config(&mut self, config: FullPeerConfig) {
let test_client_builder = match config.keep_blocks {
let mut test_client_builder = match config.keep_blocks {
Some(keep_blocks) => TestClientBuilder::with_pruning_window(keep_blocks),
None => TestClientBuilder::with_default_backend(),
};
if matches!(config.sync_mode, SyncMode::Fast{..}) {
test_client_builder = test_client_builder.set_no_genesis();
}
let backend = test_client_builder.backend();
let (c, longest_chain) = test_client_builder.build_with_longest_chain();
let client = Arc::new(c);
@@ -736,6 +755,7 @@ pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>:
Default::default(),
None,
);
network_config.sync_mode = config.sync_mode;
network_config.transport = TransportConfig::MemoryOnly;
network_config.listen_addresses = vec![listen_addr.clone()];
network_config.allow_non_globals_in_dht = true;
@@ -769,6 +789,16 @@ pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>:
protocol_config
};
let state_request_protocol_config = {
let (handler, protocol_config) = StateRequestHandler::new(
&protocol_id,
client.clone(),
50,
);
self.spawn_task(handler.run().boxed());
protocol_config
};
let light_client_request_protocol_config = {
let (handler, protocol_config) = LightClientRequestHandler::new(&protocol_id, client.clone());
self.spawn_task(handler.run().boxed());
@@ -789,6 +819,7 @@ pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>:
.unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)),
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
}).unwrap();
@@ -862,6 +893,9 @@ pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>:
let block_request_protocol_config = block_request_handler::generate_protocol_config(
&protocol_id,
);
let state_request_protocol_config = state_request_handler::generate_protocol_config(
&protocol_id,
);
let light_client_request_protocol_config =
light_client_requests::generate_protocol_config(&protocol_id);
@@ -879,6 +913,7 @@ pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>:
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
}).unwrap();
+40
View File
@@ -1087,3 +1087,43 @@ fn syncs_after_missing_announcement() {
net.block_until_sync();
assert!(net.peer(1).client().header(&BlockId::Hash(final_block)).unwrap().is_some());
}
#[test]
fn syncs_state() {
sp_tracing::try_init_simple();
for skip_proofs in &[ false, true ] {
let mut net = TestNet::new(0);
net.add_full_peer_with_config(Default::default());
net.add_full_peer_with_config(FullPeerConfig {
sync_mode: SyncMode::Fast { skip_proofs: *skip_proofs },
..Default::default()
});
net.peer(0).push_blocks(64, false);
// Wait for peer 1 to sync header chain.
net.block_until_sync();
assert!(!net.peer(1).client().has_state_at(&BlockId::Number(64)));
let just = (*b"FRNK", Vec::new());
net.peer(1).client().finalize_block(BlockId::Number(60), Some(just), true).unwrap();
// Wait for state sync.
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).client.info().finalized_state.is_some() {
Poll::Ready(())
} else {
Poll::Pending
}
}));
assert!(!net.peer(1).client().has_state_at(&BlockId::Number(64)));
// Wait for the rest of the states to be imported.
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).client().has_state_at(&BlockId::Number(64)) {
Poll::Ready(())
} else {
Poll::Pending
}
}));
}
}
+1
View File
@@ -55,6 +55,7 @@ sp-state-machine = { version = "0.9.0", path = "../../primitives/state-machine"
sp-application-crypto = { version = "3.0.0", path = "../../primitives/application-crypto" }
sp-consensus = { version = "0.9.0", path = "../../primitives/consensus/common" }
sp-inherents = { version = "3.0.0", path = "../../primitives/inherents" }
sp-storage = { version = "3.0.0", path = "../../primitives/storage" }
sc-network = { version = "0.9.0", path = "../network" }
sc-chain-spec = { version = "3.0.0", path = "../chain-spec" }
sc-light = { version = "3.0.0", path = "../light" }
+21 -1
View File
@@ -43,6 +43,7 @@ use log::info;
use sc_network::config::{Role, OnDemand};
use sc_network::NetworkService;
use sc_network::block_request_handler::{self, BlockRequestHandler};
use sc_network::state_request_handler::{self, StateRequestHandler};
use sc_network::light_client_requests::{self, handler::LightClientRequestHandler};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
@@ -70,7 +71,7 @@ use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::BuildStorage;
use sc_client_api::{
BlockBackend, BlockchainEvents,
backend::StorageProvider,
StorageProvider,
proof_provider::ProofProvider,
execution_extensions::ExecutionExtensions
};
@@ -377,6 +378,7 @@ pub fn new_full_parts<TBl, TRtApi, TExecDisp>(
offchain_worker_enabled : config.offchain_worker.enabled,
offchain_indexing_api: config.offchain_worker.indexing_enabled,
wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
no_genesis: matches!(config.network.sync_mode, sc_network::config::SyncMode::Fast {..}),
wasm_runtime_substitutes,
},
)?;
@@ -912,6 +914,23 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
}
};
let state_request_protocol_config = {
if matches!(config.role, Role::Light) {
// Allow outgoing requests but deny incoming requests.
state_request_handler::generate_protocol_config(&protocol_id)
} else {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = StateRequestHandler::new(
&protocol_id,
client.clone(),
config.network.default_peers_set.in_peers as usize
+ config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("state_request_handler", handler.run());
protocol_config
}
};
let light_client_request_protocol_config = {
if matches!(config.role, Role::Light) {
// Allow outgoing requests but deny incoming requests.
@@ -950,6 +969,7 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
block_announce_validator,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
};
@@ -172,6 +172,8 @@ fn import_block_to_queue<TBl, TImpQu>(
origin: None,
allow_missing_state: false,
import_existing: force,
state: None,
skip_execution: false,
}
]);
}
+166 -57
View File
@@ -52,11 +52,12 @@ use sp_state_machine::{
DBValue, Backend as StateBackend, ChangesTrieAnchorBlockId,
prove_read, prove_child_read, ChangesTrieRootsStorage, ChangesTrieStorage,
ChangesTrieConfigurationRange, key_changes, key_changes_proof,
prove_range_read_with_size, read_range_proof_check,
};
use sc_executor::RuntimeVersion;
use sp_consensus::{
Error as ConsensusError, BlockStatus, BlockImportParams, BlockCheckParams,
ImportResult, BlockOrigin, ForkChoiceStrategy,
ImportResult, BlockOrigin, ForkChoiceStrategy, StateAction,
};
use sp_blockchain::{
self as blockchain,
@@ -86,7 +87,7 @@ use sc_client_api::{
execution_extensions::ExecutionExtensions,
notifications::{StorageNotifications, StorageEventStream},
KeyIterator, CallExecutor, ExecutorProvider, ProofProvider,
cht, UsageProvider
cht, UsageProvider,
};
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
use sp_blockchain::Error;
@@ -150,6 +151,11 @@ impl<H> PrePostHeader<H> {
}
}
enum PrepareStorageChangesResult<B: backend::Backend<Block>, Block: BlockT> {
Discard(ImportResult),
Import(Option<sp_consensus::StorageChanges<Block, backend::TransactionFor<B, Block>>>),
}
/// Create an instance of in-memory client.
#[cfg(feature="test-helpers")]
pub fn new_in_mem<E, Block, S, RA>(
@@ -191,6 +197,8 @@ pub struct ClientConfig<Block: BlockT> {
pub offchain_indexing_api: bool,
/// Path where WASM files exist to override the on-chain WASM.
pub wasm_runtime_overrides: Option<PathBuf>,
/// Skip writing genesis state on first start.
pub no_genesis: bool,
/// Map of WASM runtime substitute starting at the child of the given block until the runtime
/// version doesn't match anymore.
pub wasm_runtime_substitutes: HashMap<Block::Hash, Vec<u8>>,
@@ -202,6 +210,7 @@ impl<Block: BlockT> Default for ClientConfig<Block> {
offchain_worker_enabled: false,
offchain_indexing_api: false,
wasm_runtime_overrides: None,
no_genesis: false,
wasm_runtime_substitutes: HashMap::new(),
}
}
@@ -324,22 +333,29 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
telemetry: Option<TelemetryHandle>,
config: ClientConfig<Block>,
) -> sp_blockchain::Result<Self> {
if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() {
let info = backend.blockchain().info();
if info.finalized_state.is_none() {
let genesis_storage = build_genesis_storage.build_storage()
.map_err(sp_blockchain::Error::Storage)?;
let mut op = backend.begin_operation()?;
backend.begin_state_operation(&mut op, BlockId::Hash(Default::default()))?;
let state_root = op.reset_storage(genesis_storage)?;
let state_root = op.set_genesis_state(genesis_storage, !config.no_genesis)?;
let genesis_block = genesis::construct_genesis_block::<Block>(state_root.into());
info!("🔨 Initializing Genesis block/state (state: {}, header-hash: {})",
genesis_block.header().state_root(),
genesis_block.header().hash()
);
// Genesis may be written after some blocks have been imported and finalized.
// So we only finalize it when the database is empty.
let block_state = if info.best_hash == Default::default() {
NewBlockState::Final
} else {
NewBlockState::Normal
};
op.set_block_data(
genesis_block.deconstruct().0,
Some(vec![]),
None,
NewBlockState::Final
block_state,
)?;
backend.commit_operation(op)?;
}
@@ -629,6 +645,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
operation: &mut ClientImportOperation<Block, B>,
import_block: BlockImportParams<Block, backend::TransactionFor<B, Block>>,
new_cache: HashMap<CacheKeyId, Vec<u8>>,
storage_changes: Option<sp_consensus::StorageChanges<Block, backend::TransactionFor<B, Block>>>,
) -> sp_blockchain::Result<ImportResult> where
Self: ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> +
@@ -640,7 +657,6 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
justifications,
post_digests,
body,
storage_changes,
finalized,
auxiliary,
fork_choice,
@@ -718,7 +734,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
import_headers: PrePostHeader<Block::Header>,
justifications: Option<Justifications>,
body: Option<Vec<Block::Extrinsic>>,
storage_changes: Option<sp_api::StorageChanges<backend::StateBackendFor<B, Block>, Block>>,
storage_changes: Option<sp_consensus::StorageChanges<Block, backend::TransactionFor<B, Block>>>,
new_cache: HashMap<CacheKeyId, Vec<u8>>,
finalized: bool,
aux: Vec<(Vec<u8>, Option<Vec<u8>>)>,
@@ -735,15 +751,16 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
(false, blockchain::BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain),
(false, blockchain::BlockStatus::Unknown) => {},
(true, blockchain::BlockStatus::InChain) => {},
(true, blockchain::BlockStatus::Unknown) =>
return Err(Error::UnknownBlock(format!("{:?}", hash))),
(true, blockchain::BlockStatus::Unknown) => {},
}
let info = self.backend.blockchain().info();
// the block is lower than our last finalized block so it must revert
// finality, refusing import.
if *import_headers.post().number() <= info.finalized_number {
if status == blockchain::BlockStatus::Unknown
&& *import_headers.post().number() <= info.finalized_number
{
return Err(sp_blockchain::Error::NotInFinalizedChain);
}
@@ -757,7 +774,48 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
let storage_changes = match storage_changes {
Some(storage_changes) => {
self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?;
let storage_changes = match storage_changes {
sp_consensus::StorageChanges::Changes(storage_changes) => {
self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?;
let (
main_sc,
child_sc,
offchain_sc,
tx, _,
changes_trie_tx,
tx_index,
) = storage_changes.into_inner();
if self.config.offchain_indexing_api {
operation.op.update_offchain_storage(offchain_sc)?;
}
operation.op.update_db_storage(tx)?;
operation.op.update_storage(main_sc.clone(), child_sc.clone())?;
operation.op.update_transaction_index(tx_index)?;
if let Some(changes_trie_transaction) = changes_trie_tx {
operation.op.update_changes_trie(changes_trie_transaction)?;
}
Some((main_sc, child_sc))
}
sp_consensus::StorageChanges::Import(changes) => {
let storage = sp_storage::Storage {
top: changes.state.into_iter().collect(),
children_default: Default::default(),
};
let state_root = operation.op.reset_storage(storage)?;
if state_root != *import_headers.post().state_root() {
// State root mismatch when importing state. This should not happen in safe fast sync mode,
// but may happen in unsafe mode.
warn!("Error imporing state: State root mismatch.");
return Err(Error::InvalidStateRoot);
}
None
}
};
// ensure parent block is finalized to maintain invariant that
// finality is called sequentially.
@@ -772,29 +830,8 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
}
operation.op.update_cache(new_cache);
storage_changes
let (
main_sc,
child_sc,
offchain_sc,
tx, _,
changes_trie_tx,
tx_index,
) = storage_changes.into_inner();
if self.config.offchain_indexing_api {
operation.op.update_offchain_storage(offchain_sc)?;
}
operation.op.update_db_storage(tx)?;
operation.op.update_storage(main_sc.clone(), child_sc.clone())?;
operation.op.update_transaction_index(tx_index)?;
if let Some(changes_trie_transaction) = changes_trie_tx {
operation.op.update_changes_trie(changes_trie_transaction)?;
}
Some((main_sc, child_sc))
},
None => None,
};
@@ -867,7 +904,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
fn prepare_block_storage_changes(
&self,
import_block: &mut BlockImportParams<Block, backend::TransactionFor<B, Block>>,
) -> sp_blockchain::Result<Option<ImportResult>>
) -> sp_blockchain::Result<PrepareStorageChangesResult<B, Block>>
where
Self: ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> +
@@ -875,21 +912,28 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
{
let parent_hash = import_block.header.parent_hash();
let at = BlockId::Hash(*parent_hash);
let enact_state = match self.block_status(&at)? {
BlockStatus::Unknown => return Ok(Some(ImportResult::UnknownParent)),
BlockStatus::InChainWithState | BlockStatus::Queued => true,
BlockStatus::InChainPruned if import_block.allow_missing_state => false,
BlockStatus::InChainPruned => return Ok(Some(ImportResult::MissingState)),
BlockStatus::KnownBad => return Ok(Some(ImportResult::KnownBad)),
let state_action = std::mem::replace(&mut import_block.state_action, StateAction::Skip);
let (enact_state, storage_changes) = match (self.block_status(&at)?, state_action) {
(BlockStatus::Unknown, _) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::UnknownParent)),
(BlockStatus::KnownBad, _) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::KnownBad)),
(_, StateAction::Skip) => (false, None),
(BlockStatus::InChainPruned, StateAction::ApplyChanges(sp_consensus::StorageChanges::Changes(_))) =>
return Ok(PrepareStorageChangesResult::Discard(ImportResult::MissingState)),
(BlockStatus::InChainPruned, StateAction::Execute) =>
return Ok(PrepareStorageChangesResult::Discard(ImportResult::MissingState)),
(BlockStatus::InChainPruned, StateAction::ExecuteIfPossible) => (false, None),
(_, StateAction::Execute) => (true, None),
(_, StateAction::ExecuteIfPossible) => (true, None),
(_, StateAction::ApplyChanges(changes)) => (true, Some(changes)),
};
match (enact_state, &mut import_block.storage_changes, &mut import_block.body) {
let storage_changes = match (enact_state, storage_changes, &import_block.body) {
// We have storage changes and should enact the state, so we don't need to do anything
// here
(true, Some(_), _) => {},
(true, changes @ Some(_), _) => changes,
// We should enact state, but don't have any storage changes, so we need to execute the
// block.
(true, ref mut storage_changes @ None, Some(ref body)) => {
(true, None, Some(ref body)) => {
let runtime_api = self.runtime_api();
let execution_context = if import_block.origin == BlockOrigin::NetworkInitialSync {
ExecutionContext::Syncing
@@ -919,19 +963,16 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
!= &gen_storage_changes.transaction_storage_root
{
return Err(Error::InvalidStateRoot)
} else {
**storage_changes = Some(gen_storage_changes);
}
Some(sp_consensus::StorageChanges::Changes(gen_storage_changes))
},
// No block body, no storage changes
(true, None, None) => {},
(true, None, None) => None,
// We should not enact the state, so we set the storage changes to `None`.
(false, changes, _) => {
changes.take();
}
(false, _, _) => None,
};
Ok(None)
Ok(PrepareStorageChangesResult::Import(storage_changes))
}
fn apply_finality_with_block_hash(
@@ -1307,6 +1348,68 @@ impl<B, E, Block, RA> ProofProvider<Block> for Client<B, E, Block, RA> where
cht::size(),
)
}
fn read_proof_collection(
&self,
id: &BlockId<Block>,
start_key: &[u8],
size_limit: usize,
) -> sp_blockchain::Result<(StorageProof, u32)> {
let state = self.state_at(id)?;
Ok(prove_range_read_with_size::<_, HashFor<Block>>(
state,
None,
None,
size_limit,
Some(start_key)
)?)
}
fn storage_collection(
&self,
id: &BlockId<Block>,
start_key: &[u8],
size_limit: usize,
) -> sp_blockchain::Result<Vec<(Vec<u8>, Vec<u8>)>> {
let state = self.state_at(id)?;
let mut current_key = start_key.to_vec();
let mut total_size = 0;
let mut entries = Vec::new();
while let Some(next_key) = state
.next_storage_key(&current_key)
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
{
let value = state
.storage(next_key.as_ref())
.map_err(|e| sp_blockchain::Error::from_state(Box::new(e)))?
.unwrap_or_default();
let size = value.len() + next_key.len();
if total_size + size > size_limit && !entries.is_empty() {
break;
}
total_size += size;
entries.push((next_key.clone(), value));
current_key = next_key;
}
Ok(entries)
}
fn verify_range_proof(
&self,
root: Block::Hash,
proof: StorageProof,
start_key: &[u8],
) -> sp_blockchain::Result<(Vec<(Vec<u8>, Vec<u8>)>, bool)> {
Ok(read_range_proof_check::<HashFor<Block>>(
root,
proof,
None,
None,
None,
Some(start_key),
)?)
}
}
@@ -1751,15 +1854,16 @@ impl<B, E, Block, RA> sp_consensus::BlockImport<Block> for &Client<B, E, Block,
let span = tracing::span!(tracing::Level::DEBUG, "import_block");
let _enter = span.enter();
if let Some(res) = self.prepare_block_storage_changes(&mut import_block).map_err(|e| {
let storage_changes = match self.prepare_block_storage_changes(&mut import_block).map_err(|e| {
warn!("Block prepare storage changes error:\n{:?}", e);
ConsensusError::ClientImport(e.to_string())
})? {
return Ok(res)
}
PrepareStorageChangesResult::Discard(res) => return Ok(res),
PrepareStorageChangesResult::Import(storage_changes) => storage_changes,
};
self.lock_import_and_run(|operation| {
self.apply_block(operation, import_block, new_cache)
self.apply_block(operation, import_block, new_cache, storage_changes)
}).map_err(|e| {
warn!("Block import error:\n{:?}", e);
ConsensusError::ClientImport(e.to_string()).into()
@@ -1801,9 +1905,14 @@ impl<B, E, Block, RA> sp_consensus::BlockImport<Block> for &Client<B, E, Block,
match self.block_status(&BlockId::Hash(hash))
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued if !import_existing => return Ok(ImportResult::AlreadyInChain),
BlockStatus::InChainWithState | BlockStatus::Queued if !import_existing => {
return Ok(ImportResult::AlreadyInChain)
},
BlockStatus::InChainWithState | BlockStatus::Queued => {},
BlockStatus::InChainPruned => return Ok(ImportResult::AlreadyInChain),
BlockStatus::InChainPruned if !import_existing => {
return Ok(ImportResult::AlreadyInChain)
},
BlockStatus::InChainPruned => {},
BlockStatus::Unknown => {},
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}
@@ -272,7 +272,7 @@ fn local_state_is_created_when_genesis_state_is_available() {
);
let mut op = backend.begin_operation().unwrap();
op.set_block_data(header0, None, None, NewBlockState::Final).unwrap();
op.reset_storage(Default::default()).unwrap();
op.set_genesis_state(Default::default(), true).unwrap();
backend.commit_operation(op).unwrap();
match backend.state_at(BlockId::Number(0)).unwrap() {
@@ -269,12 +269,14 @@ pub struct Info<Block: BlockT> {
pub finalized_hash: Block::Hash,
/// Last finalized block number.
pub finalized_number: <<Block as BlockT>::Header as HeaderT>::Number,
/// Last finalized state.
pub finalized_state: Option<(Block::Hash, <<Block as BlockT>::Header as HeaderT>::Number)>,
/// Number of concurrent leave forks.
pub number_leaves: usize
}
/// Block status.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlockStatus {
/// Already in the blockchain.
InChain,
+2 -2
View File
@@ -90,8 +90,8 @@ pub enum Error {
#[error("Failed to get runtime version: {0}")]
VersionInvalid(String),
#[error("Genesis config provided is invalid")]
GenesisInvalid,
#[error("Provided state is invalid")]
InvalidState,
#[error("error decoding justification for header")]
JustificationDecode,
@@ -135,6 +135,43 @@ pub struct BlockCheckParams<Block: BlockT> {
pub import_existing: bool,
}
/// Precomputed storage.
pub enum StorageChanges<Block: BlockT, Transaction> {
/// Changes coming from block execution.
Changes(sp_state_machine::StorageChanges<Transaction, HashFor<Block>, NumberFor<Block>>),
/// Whole new state.
Import(ImportedState<Block>),
}
/// Imported state data. A vector of key-value pairs that should form a trie.
#[derive(PartialEq, Eq, Clone)]
pub struct ImportedState<B: BlockT> {
/// Target block hash.
pub block: B::Hash,
/// State keys and values.
pub state: Vec<(Vec<u8>, Vec<u8>)>,
}
impl<B: BlockT> std::fmt::Debug for ImportedState<B> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("ImportedState")
.field("block", &self.block)
.finish()
}
}
/// Defines how a new state is computed for a given imported block.
pub enum StateAction<Block: BlockT, Transaction> {
/// Apply precomputed changes coming from block execution or state sync.
ApplyChanges(StorageChanges<Block, Transaction>),
/// Execute block body (required) and compute state.
Execute,
/// Execute block body if parent state is available and compute state.
ExecuteIfPossible,
/// Don't execute or import state.
Skip,
}
/// Data required to import a Block.
#[non_exhaustive]
pub struct BlockImportParams<Block: BlockT, Transaction> {
@@ -159,11 +196,8 @@ pub struct BlockImportParams<Block: BlockT, Transaction> {
pub post_digests: Vec<DigestItemFor<Block>>,
/// The body of the block.
pub body: Option<Vec<Block::Extrinsic>>,
/// The changes to the storage to create the state for the block. If this is `Some(_)`,
/// the block import will not need to re-execute the block for importing it.
pub storage_changes: Option<
sp_state_machine::StorageChanges<Transaction, HashFor<Block>, NumberFor<Block>>
>,
/// Specify how the new state is computed.
pub state_action: StateAction<Block, Transaction>,
/// Is this block finalized already?
/// `true` implies instant finality.
pub finalized: bool,
@@ -182,8 +216,6 @@ pub struct BlockImportParams<Block: BlockT, Transaction> {
/// to modify it. If `None` is passed all the way down to bottom block
/// importer, the import fails with an `IncompletePipeline` error.
pub fork_choice: Option<ForkChoiceStrategy>,
/// Allow importing the block skipping state verification if parent state is missing.
pub allow_missing_state: bool,
/// Re-validate existing block.
pub import_existing: bool,
/// Cached full header hash (with post-digests applied).
@@ -201,12 +233,11 @@ impl<Block: BlockT, Transaction> BlockImportParams<Block, Transaction> {
justifications: None,
post_digests: Vec::new(),
body: None,
storage_changes: None,
state_action: StateAction::Execute,
finalized: false,
intermediates: HashMap::new(),
auxiliary: Vec::new(),
fork_choice: None,
allow_missing_state: false,
import_existing: false,
post_hash: None,
}
@@ -237,20 +268,28 @@ impl<Block: BlockT, Transaction> BlockImportParams<Block, Transaction> {
/// Auxiliary function for "converting" the transaction type.
///
/// Actually this just sets `storage_changes` to `None` and makes rustc think that `Self` now
/// Actually this just sets `StorageChanges::Changes` to `None` and makes rustc think that `Self` now
/// uses a different transaction type.
pub fn convert_transaction<Transaction2>(self) -> BlockImportParams<Block, Transaction2> {
pub fn clear_storage_changes_and_mutate<Transaction2>(self) -> BlockImportParams<Block, Transaction2> {
// Preserve imported state.
let state_action = match self.state_action {
StateAction::ApplyChanges(StorageChanges::Import(state)) =>
StateAction::ApplyChanges(StorageChanges::Import(state)),
StateAction::ApplyChanges(StorageChanges::Changes(_)) => StateAction::Skip,
StateAction::Execute => StateAction::Execute,
StateAction::ExecuteIfPossible => StateAction::ExecuteIfPossible,
StateAction::Skip => StateAction::Skip,
};
BlockImportParams {
origin: self.origin,
header: self.header,
justifications: self.justifications,
post_digests: self.post_digests,
body: self.body,
storage_changes: None,
state_action,
finalized: self.finalized,
auxiliary: self.auxiliary,
intermediates: self.intermediates,
allow_missing_state: self.allow_missing_state,
fork_choice: self.fork_choice,
import_existing: self.import_existing,
post_hash: self.post_hash,
@@ -34,7 +34,7 @@ use crate::{
error::Error as ConsensusError,
block_import::{
BlockImport, BlockOrigin, BlockImportParams, ImportedAux, JustificationImport, ImportResult,
BlockCheckParams,
BlockCheckParams, ImportedState, StateAction,
},
metrics::Metrics,
};
@@ -74,8 +74,12 @@ pub struct IncomingBlock<B: BlockT> {
pub origin: Option<Origin>,
/// Allow importing the block skipping state verification if parent state is missing.
pub allow_missing_state: bool,
/// Skip block exection and state verification.
pub skip_execution: bool,
/// Re-validate existing block.
pub import_existing: bool,
/// Do not compute new state, but rather set it to the given set.
pub state: Option<ImportedState<B>>,
}
/// Type of keys in the blockchain cache that consensus module could use for its needs.
@@ -264,9 +268,17 @@ pub(crate) async fn import_single_block_metered<B: BlockT, V: Verifier<B>, Trans
if let Some(keys) = maybe_keys {
cache.extend(keys.into_iter());
}
import_block.allow_missing_state = block.allow_missing_state;
import_block.import_existing = block.import_existing;
let mut import_block = import_block.clear_storage_changes_and_mutate();
if let Some(state) = block.state {
import_block.state_action = StateAction::ApplyChanges(crate::StorageChanges::Import(state));
} else if block.skip_execution {
import_block.state_action = StateAction::Skip;
} else if block.allow_missing_state {
import_block.state_action = StateAction::ExecuteIfPossible;
}
let imported = import_handle.import_block(import_block.convert_transaction(), cache).await;
let imported = import_handle.import_block(import_block, cache).await;
if let Some(metrics) = metrics.as_ref() {
metrics.report_verification_and_import(started.elapsed());
}
@@ -564,6 +564,8 @@ mod tests {
origin: None,
allow_missing_state: false,
import_existing: false,
state: None,
skip_execution: false,
}],
)))
.unwrap();
@@ -50,7 +50,8 @@ mod metrics;
pub use self::error::Error;
pub use block_import::{
BlockCheckParams, BlockImport, BlockImportParams, BlockOrigin, ForkChoiceStrategy,
ImportResult, ImportedAux, JustificationImport, JustificationSyncLink,
ImportResult, ImportedAux, ImportedState, JustificationImport, JustificationSyncLink,
StateAction, StorageChanges,
};
pub use select_chain::SelectChain;
pub use sp_state_machine::Backend as StateBackend;
@@ -54,6 +54,19 @@ impl<Block: BlockT> BlockId<Block> {
pub fn number(number: NumberFor<Block>) -> Self {
BlockId::Number(number)
}
/// Check if this block ID refers to the pre-genesis state.
pub fn is_pre_genesis(&self) -> bool {
match self {
BlockId::Hash(hash) => hash == &Default::default(),
BlockId::Number(_) => false,
}
}
/// Create a block ID for a pre-genesis state.
pub fn pre_genesis() -> Self {
BlockId::Hash(Default::default())
}
}
impl<Block: BlockT> Copy for BlockId<Block> {}
@@ -93,6 +93,22 @@ pub trait Backend<H: Hasher>: sp_std::fmt::Debug {
key: &[u8]
) -> Result<Option<StorageKey>, Self::Error>;
/// Iterate over storage starting at key, for a given prefix and child trie.
/// Aborts as soon as `f` returns false.
/// Warning, this fails at first error when usual iteration skips errors.
/// If `allow_missing` is true, iteration stops when it reaches a missing trie node.
/// Otherwise an error is produced.
///
/// Returns `true` if trie end is reached.
fn apply_to_key_values_while<F: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: F,
allow_missing: bool,
) -> Result<bool, Self::Error>;
/// Retrieve all entries keys of storage and call `f` for each of those keys.
/// Aborts as soon as `f` returns false.
fn apply_to_keys_while<F: FnMut(&[u8]) -> bool>(
+145 -1
View File
@@ -726,6 +726,50 @@ mod execution {
prove_read_on_trie_backend(trie_backend, keys)
}
/// Generate range storage read proof.
pub fn prove_range_read_with_size<B, H>(
mut backend: B,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
size_limit: usize,
start_at: Option<&[u8]>,
) -> Result<(StorageProof, u32), Box<dyn Error>>
where
B: Backend<H>,
H: Hasher,
H::Out: Ord + Codec,
{
let trie_backend = backend.as_trie_backend()
.ok_or_else(|| Box::new(ExecutionError::UnableToGenerateProof) as Box<dyn Error>)?;
prove_range_read_with_size_on_trie_backend(trie_backend, child_info, prefix, size_limit, start_at)
}
/// Generate range storage read proof on an existing trie backend.
pub fn prove_range_read_with_size_on_trie_backend<S, H>(
trie_backend: &TrieBackend<S, H>,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
size_limit: usize,
start_at: Option<&[u8]>,
) -> Result<(StorageProof, u32), Box<dyn Error>>
where
S: trie_backend_essence::TrieBackendStorage<H>,
H: Hasher,
H::Out: Ord + Codec,
{
let proving_backend = proving_backend::ProvingBackend::<S, H>::new(trie_backend);
let mut count = 0;
proving_backend.apply_to_key_values_while(child_info, prefix, start_at, |_key, _value| {
if count == 0 || proving_backend.estimate_encoded_size() <= size_limit {
count += 1;
true
} else {
false
}
}, false).map_err(|e| Box::new(e) as Box<dyn Error>)?;
Ok((proving_backend.extract_proof(), count))
}
/// Generate child storage read proof.
pub fn prove_child_read<B, H, I>(
mut backend: B,
@@ -808,6 +852,29 @@ mod execution {
Ok(result)
}
/// Check child storage range proof, generated by `prove_range_read` call.
pub fn read_range_proof_check<H>(
root: H::Out,
proof: StorageProof,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
count: Option<u32>,
start_at: Option<&[u8]>,
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), Box<dyn Error>>
where
H: Hasher,
H::Out: Ord + Codec,
{
let proving_backend = create_proof_check_backend::<H>(root, proof)?;
read_range_proof_check_on_proving_backend(
&proving_backend,
child_info,
prefix,
count,
start_at,
)
}
/// Check child storage read proof, generated by `prove_child_read` call.
pub fn read_child_proof_check<H, I>(
root: H::Out,
@@ -859,6 +926,32 @@ mod execution {
proving_backend.child_storage(child_info, key)
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
/// Check storage range proof on pre-created proving backend.
///
/// Returns a vector with the read `key => value` pairs and a `bool` that is set to `true` when
/// all `key => value` pairs could be read and no more are left.
pub fn read_range_proof_check_on_proving_backend<H>(
proving_backend: &TrieBackend<MemoryDB<H>, H>,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
count: Option<u32>,
start_at: Option<&[u8]>,
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), Box<dyn Error>>
where
H: Hasher,
H::Out: Ord + Codec,
{
let mut values = Vec::new();
let result = proving_backend.apply_to_key_values_while(child_info, prefix, start_at, |key, value| {
values.push((key.to_vec(), value.to_vec()));
count.as_ref().map_or(true, |c| (values.len() as u32) < *c)
}, true);
match result {
Ok(completed) => Ok((values, completed)),
Err(e) => Err(Box::new(e) as Box<dyn Error>),
}
}
}
#[cfg(test)]
@@ -1457,7 +1550,7 @@ mod tests {
remote_proof.clone(),
&[&[0xff]],
).is_ok();
// check that results are correct
// check that results are correct
assert_eq!(
local_result1.into_iter().collect::<Vec<_>>(),
vec![(b"value2".to_vec(), Some(vec![24]))],
@@ -1494,6 +1587,57 @@ mod tests {
);
}
#[test]
fn prove_read_with_size_limit_works() {
let remote_backend = trie_backend::tests::test_trie();
let remote_root = remote_backend.storage_root(::std::iter::empty()).0;
let (proof, count) = prove_range_read_with_size(remote_backend, None, None, 0, None).unwrap();
// Alwasys contains at least some nodes.
assert_eq!(proof.into_memory_db::<BlakeTwo256>().drain().len(), 3);
assert_eq!(count, 1);
let remote_backend = trie_backend::tests::test_trie();
let (proof, count) = prove_range_read_with_size(remote_backend, None, None, 800, Some(&[])).unwrap();
assert_eq!(proof.clone().into_memory_db::<BlakeTwo256>().drain().len(), 9);
assert_eq!(count, 85);
let (results, completed) = read_range_proof_check::<BlakeTwo256>(
remote_root,
proof.clone(),
None,
None,
Some(count),
None,
).unwrap();
assert_eq!(results.len() as u32, count);
assert_eq!(completed, false);
// When checking without count limit, proof may actually contain extra values.
let (results, completed) = read_range_proof_check::<BlakeTwo256>(
remote_root,
proof,
None,
None,
None,
None,
).unwrap();
assert_eq!(results.len() as u32, 101);
assert_eq!(completed, false);
let remote_backend = trie_backend::tests::test_trie();
let (proof, count) = prove_range_read_with_size(remote_backend, None, None, 50000, Some(&[])).unwrap();
assert_eq!(proof.clone().into_memory_db::<BlakeTwo256>().drain().len(), 11);
assert_eq!(count, 132);
let (results, completed) = read_range_proof_check::<BlakeTwo256>(
remote_root,
proof.clone(),
None,
None,
None,
None,
).unwrap();
assert_eq!(results.len() as u32, count);
assert_eq!(completed, true);
}
#[test]
fn compact_multiple_child_trie() {
// this root will be queried
@@ -303,7 +303,7 @@ impl OverlayedChanges {
/// Set a new value for the specified key.
///
/// Can be rolled back or committed when called inside a transaction.
pub(crate) fn set_storage(&mut self, key: StorageKey, val: Option<StorageValue>) {
pub fn set_storage(&mut self, key: StorageKey, val: Option<StorageValue>) {
let size_write = val.as_ref().map(|x| x.len() as u64).unwrap_or(0);
self.stats.tally_write_overlay(size_write);
self.top.set(key, val, self.extrinsic_index());
@@ -212,6 +212,14 @@ impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> ProvingBackend<'a, S, H>
pub fn extract_proof(&self) -> StorageProof {
self.0.essence().backend_storage().proof_recorder.to_storage_proof()
}
/// Returns the estimated encoded size of the proof.
///
/// The estimation is maybe bigger (by in maximum 4 bytes), but never smaller than the actual
/// encoded proof.
pub fn estimate_encoded_size(&self) -> usize {
self.0.essence().backend_storage().proof_recorder.estimate_encoded_size()
}
}
impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> TrieBackendStorage<H>
@@ -260,6 +268,17 @@ impl<'a, S, H> Backend<H> for ProvingBackend<'a, S, H>
self.0.child_storage(child_info, key)
}
fn apply_to_key_values_while<F: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: F,
allow_missing: bool,
) -> Result<bool, Self::Error> {
self.0.apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing)
}
fn apply_to_keys_while<F: FnMut(&[u8]) -> bool>(
&self,
child_info: Option<&ChildInfo>,
@@ -113,6 +113,17 @@ impl<S: TrieBackendStorage<H>, H: Hasher> Backend<H> for TrieBackend<S, H> where
self.essence.for_key_values_with_prefix(prefix, f)
}
fn apply_to_key_values_while<F: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: F,
allow_missing: bool,
) -> Result<bool, Self::Error> {
self.essence.apply_to_key_values_while(child_info, prefix, start_at, f, allow_missing)
}
fn apply_to_keys_while<F: FnMut(&[u8]) -> bool>(
&self,
child_info: Option<&ChildInfo>,
@@ -189,6 +189,43 @@ impl<S: TrieBackendStorage<H>, H: Hasher> TrieBackendEssence<S, H> where H::Out:
.map_err(map_e)
}
/// Retrieve all entries keys of storage and call `f` for each of those keys.
/// Aborts as soon as `f` returns false.
///
/// Returns `true` when all keys were iterated.
pub fn apply_to_key_values_while(
&self,
child_info: Option<&ChildInfo>,
prefix: Option<&[u8]>,
start_at: Option<&[u8]>,
f: impl FnMut(Vec<u8>, Vec<u8>) -> bool,
allow_missing_nodes: bool,
) -> Result<bool> {
let mut child_root;
let root = if let Some(child_info) = child_info.as_ref() {
if let Some(fetched_child_root) = self.child_root(child_info)? {
child_root = H::Out::default();
// root is fetched from DB, not writable by runtime, so it's always valid.
child_root.as_mut().copy_from_slice(fetched_child_root.as_slice());
&child_root
} else {
return Ok(true);
}
} else {
&self.root
};
self.trie_iter_inner(
&root,
prefix,
f,
child_info,
start_at,
allow_missing_nodes,
)
}
/// Retrieve all entries keys of a storage and call `f` for each of those keys.
/// Aborts as soon as `f` returns false.
pub fn apply_to_keys_while<F: FnMut(&[u8]) -> bool>(
@@ -212,15 +249,15 @@ impl<S: TrieBackendStorage<H>, H: Hasher> TrieBackendEssence<S, H> where H::Out:
&self.root
};
self.trie_iter_inner(root, prefix, |k, _v| f(k), child_info)
let _ = self.trie_iter_inner(root, prefix, |k, _v| { f(&k); true}, child_info, None, false);
}
/// Execute given closure for all keys starting with prefix.
pub fn for_child_keys_with_prefix<F: FnMut(&[u8])>(
pub fn for_child_keys_with_prefix(
&self,
child_info: &ChildInfo,
prefix: &[u8],
mut f: F,
mut f: impl FnMut(&[u8]),
) {
let root_vec = match self.child_root(child_info) {
Ok(v) => v.unwrap_or_else(|| empty_child_trie_root::<Layout<H>>().encode()),
@@ -231,41 +268,43 @@ impl<S: TrieBackendStorage<H>, H: Hasher> TrieBackendEssence<S, H> where H::Out:
};
let mut root = H::Out::default();
root.as_mut().copy_from_slice(&root_vec);
self.trie_iter_inner(&root, Some(prefix), |k, _v| { f(k); true }, Some(child_info))
let _ = self.trie_iter_inner(&root, Some(prefix), |k, _v| { f(&k); true }, Some(child_info), None, false);
}
/// Execute given closure for all keys starting with prefix.
pub fn for_keys_with_prefix<F: FnMut(&[u8])>(&self, prefix: &[u8], mut f: F) {
self.trie_iter_inner(&self.root, Some(prefix), |k, _v| { f(k); true }, None)
let _ = self.trie_iter_inner(&self.root, Some(prefix), |k, _v| { f(&k); true }, None, None, false);
}
fn trie_iter_inner<F: FnMut(&[u8], &[u8]) -> bool>(
fn trie_iter_inner<F: FnMut(Vec<u8>, Vec<u8>) -> bool>(
&self,
root: &H::Out,
prefix: Option<&[u8]>,
mut f: F,
child_info: Option<&ChildInfo>,
) {
let mut iter = move |db| -> sp_std::result::Result<(), Box<TrieError<H::Out>>> {
start_at: Option<&[u8]>,
allow_missing_nodes: bool,
) -> Result<bool> {
let mut iter = move |db| -> sp_std::result::Result<bool, Box<TrieError<H::Out>>> {
let trie = TrieDB::<H>::new(db, root)?;
let iter = if let Some(prefix) = prefix.as_ref() {
TrieDBIterator::new_prefixed(&trie, prefix)?
let prefix = prefix.unwrap_or(&[]);
let iterator = if let Some(start_at) = start_at {
TrieDBIterator::new_prefixed_then_seek(&trie, prefix, start_at)?
} else {
TrieDBIterator::new(&trie)?
TrieDBIterator::new_prefixed(&trie, prefix)?
};
for x in iter {
for x in iterator {
let (key, value) = x?;
debug_assert!(prefix.as_ref().map(|prefix| key.starts_with(prefix)).unwrap_or(true));
debug_assert!(key.starts_with(prefix));
if !f(&key, &value) {
break;
if !f(key, value) {
return Ok(false)
}
}
Ok(())
Ok(true)
};
let result = if let Some(child_info) = child_info {
@@ -274,14 +313,16 @@ impl<S: TrieBackendStorage<H>, H: Hasher> TrieBackendEssence<S, H> where H::Out:
} else {
iter(self)
};
if let Err(e) = result {
debug!(target: "trie", "Error while iterating by prefix: {}", e);
match result {
Ok(completed) => Ok(completed),
Err(e) if matches!(*e, TrieError::IncompleteDatabase(_)) && allow_missing_nodes => Ok(false),
Err(e) => Err(format!("TrieDB iteration error: {}", e)),
}
}
/// Execute given closure for all key and values starting with prefix.
pub fn for_key_values_with_prefix<F: FnMut(&[u8], &[u8])>(&self, prefix: &[u8], mut f: F) {
self.trie_iter_inner(&self.root, Some(prefix), |k, v| { f(k, v); true }, None)
let _ = self.trie_iter_inner(&self.root, Some(prefix), |k, v| {f(&k, &v); true}, None, None, false);
}
}
+9
View File
@@ -80,6 +80,7 @@ pub struct TestClientBuilder<Block: BlockT, Executor, Backend, G: GenesisInit> {
fork_blocks: ForkBlocks<Block>,
bad_blocks: BadBlocks<Block>,
enable_offchain_indexing_api: bool,
no_genesis: bool,
}
impl<Block: BlockT, Executor, G: GenesisInit> Default
@@ -116,6 +117,7 @@ impl<Block: BlockT, Executor, Backend, G: GenesisInit> TestClientBuilder<Block,
fork_blocks: None,
bad_blocks: None,
enable_offchain_indexing_api: false,
no_genesis: false,
}
}
@@ -183,6 +185,12 @@ impl<Block: BlockT, Executor, Backend, G: GenesisInit> TestClientBuilder<Block,
self
}
/// Disable writing genesis.
pub fn set_no_genesis(mut self) -> Self {
self.no_genesis = true;
self
}
/// Build the test client with the given native executor.
pub fn build_with_executor<RuntimeApi>(
self,
@@ -232,6 +240,7 @@ impl<Block: BlockT, Executor, Backend, G: GenesisInit> TestClientBuilder<Block,
None,
ClientConfig {
offchain_indexing_api: self.enable_offchain_indexing_api,
no_genesis: self.no_genesis,
..Default::default()
},
).expect("Creates new client");