Use changes tries in query_storage RPC (#1082)

* use changes tries in query_storage RPC

* let + match + return + call -> match
This commit is contained in:
Svyatoslav Nikolsky
2019-01-17 12:08:50 +03:00
committed by Bastian Köcher
parent eb000fb1ae
commit da1fb3f273
14 changed files with 443 additions and 190 deletions
+18 -1
View File
@@ -430,7 +430,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
}
/// Prune obsolete changes tries.
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor<Block>) {
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor<Block>) {
// never prune on archive nodes
let min_blocks_to_keep = match self.min_blocks_to_keep {
Some(min_blocks_to_keep) => min_blocks_to_keep,
@@ -456,6 +456,23 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
}
}
impl<Block: BlockT> client::backend::PrunableStateChangesTrieStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
fn oldest_changes_trie_block(
&self,
config: &ChangesTrieConfiguration,
best_finalized_block: u64
) -> u64 {
match self.min_blocks_to_keep {
Some(min_blocks_to_keep) => state_machine::oldest_non_pruned_changes_trie(
config,
min_blocks_to_keep,
best_finalized_block,
),
None => 1,
}
}
}
impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
fn root(&self, anchor: &state_machine::ChangesTrieAnchorBlockId<H256>, block: u64) -> Result<Option<H256>, String> {
// check API requirement
+8 -1
View File
@@ -17,6 +17,7 @@
//! Substrate Client data backend
use crate::error;
use primitives::ChangesTrieConfiguration;
use runtime_primitives::{generic::BlockId, Justification, StorageMap, ChildrenStorageMap};
use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, NumberFor};
use state_machine::backend::Backend as StateBackend;
@@ -113,7 +114,7 @@ pub trait Backend<Block, H>: AuxStore + Send + Sync where
/// Associated state backend type.
type State: StateBackend<H>;
/// Changes trie storage.
type ChangesTrieStorage: StateChangesTrieStorage<H>;
type ChangesTrieStorage: PrunableStateChangesTrieStorage<H>;
/// Begin a new block insertion transaction with given parent block id.
/// When constructing the genesis, this is called with all-zero hash.
@@ -154,6 +155,12 @@ pub trait Backend<Block, H>: AuxStore + Send + Sync where
}
}
/// Changes trie storage that supports pruning.
pub trait PrunableStateChangesTrieStorage<H: Hasher>: StateChangesTrieStorage<H> {
/// Get number block of oldest, non-pruned changes trie.
fn oldest_changes_trie_block(&self, config: &ChangesTrieConfiguration, best_finalized: u64) -> u64;
}
/// Mark for all Backend implementations, that are making use of state data, stored locally.
pub trait LocalBackend<Block, H>: Backend<Block, H>
where
+52 -29
View File
@@ -42,7 +42,7 @@ use state_machine::{
key_changes, key_changes_proof, OverlayedChanges
};
use crate::backend::{self, BlockImportOperation};
use crate::backend::{self, BlockImportOperation, PrunableStateChangesTrieStorage};
use crate::blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend};
use crate::call_executor::{CallExecutor, LocalCallExecutor};
use executor::{RuntimeVersion, RuntimeInfo};
@@ -355,35 +355,54 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
Ok((header, proof))
}
/// Get longest range within [first; last] that is possible to use in `key_changes`
/// and `key_changes_proof` calls.
/// Range could be shortened from the beginning if some changes tries have been pruned.
/// Returns Ok(None) if changes trues are not supported.
pub fn max_key_changes_range(
&self,
first: NumberFor<Block>,
last: BlockId<Block>,
) -> error::Result<Option<(NumberFor<Block>, BlockId<Block>)>> {
let (config, storage) = match self.require_changes_trie().ok() {
Some((config, storage)) => (config, storage),
None => return Ok(None),
};
let first = first.as_();
let last_num = self.backend.blockchain().expect_block_number_from_id(&last)?.as_();
if first > last_num {
return Err(error::ErrorKind::ChangesTrieAccessFailed("Invalid changes trie range".into()).into());
}
let finalized_number = self.backend.blockchain().info()?.finalized_number;
let oldest = storage.oldest_changes_trie_block(&config, finalized_number.as_());
let first = As::sa(::std::cmp::max(first, oldest));
Ok(Some((first, last)))
}
/// Get pairs of (block, extrinsic) where key has been changed at given blocks range.
/// Works only for runtimes that are supporting changes tries.
pub fn key_changes(
&self,
first: Block::Hash,
last: Block::Hash,
key: &[u8]
first: NumberFor<Block>,
last: BlockId<Block>,
key: &StorageKey
) -> error::Result<Vec<(NumberFor<Block>, u32)>> {
let config = self.changes_trie_config()?;
let storage = self.backend.changes_trie_storage();
let (config, storage) = match (config, storage) {
(Some(config), Some(storage)) => (config, storage),
_ => return Err(error::ErrorKind::ChangesTriesNotSupported.into()),
};
let (config, storage) = self.require_changes_trie()?;
let last_number = self.backend.blockchain().expect_block_number_from_id(&last)?.as_();
let last_hash = self.backend.blockchain().expect_block_hash_from_id(&last)?;
let first_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(first))?.as_();
let last_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(last))?.as_();
key_changes::<_, Blake2Hasher>(
&config,
storage,
first_number,
&*storage,
first.as_(),
&ChangesTrieAnchorBlockId {
hash: convert_hash(&last),
hash: convert_hash(&last_hash),
number: last_number,
},
self.backend.blockchain().info()?.best_number.as_(),
key)
&key.0)
.and_then(|r| r.map(|r| r.map(|(block, tx)| (As::sa(block), tx))).collect::<Result<_, _>>())
.map_err(|err| error::ErrorKind::ChangesTrieAccessFailed(err).into())
.map(|r| r.into_iter().map(|(b, e)| (As::sa(b), e)).collect())
}
/// Get proof for computation of (block, extrinsic) pairs where key has been changed at given blocks range.
@@ -398,7 +417,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
last: Block::Hash,
min: Block::Hash,
max: Block::Hash,
key: &[u8]
key: &StorageKey
) -> error::Result<ChangesProof<Block::Header>> {
self.key_changes_proof_with_cht_size(
first,
@@ -417,7 +436,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
last: Block::Hash,
min: Block::Hash,
max: Block::Hash,
key: &[u8],
key: &StorageKey,
cht_size: u64,
) -> error::Result<ChangesProof<Block::Header>> {
struct AccessedRootsRecorder<'a, Block: BlockT> {
@@ -447,14 +466,9 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
}
}
let config = self.changes_trie_config()?;
let storage = self.backend.changes_trie_storage();
let (config, storage) = match (config, storage) {
(Some(config), Some(storage)) => (config, storage),
_ => return Err(error::ErrorKind::ChangesTriesNotSupported.into()),
};
let (config, storage) = self.require_changes_trie()?;
let min_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(min))?;
let recording_storage = AccessedRootsRecorder::<Block> {
storage,
min: min_number.as_(),
@@ -478,7 +492,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
number: last_number,
},
max_number.as_(),
key
&key.0
)
.map_err(|err| error::Error::from(error::ErrorKind::ChangesTrieAccessFailed(err)))?;
@@ -528,6 +542,16 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
Ok(proof)
}
/// Returns changes trie configuration and storage or an error if it is not supported.
fn require_changes_trie(&self) -> error::Result<(ChangesTrieConfiguration, &B::ChangesTrieStorage)> {
let config = self.changes_trie_config()?;
let storage = self.backend.changes_trie_storage();
match (config, storage) {
(Some(config), Some(storage)) => Ok((config, storage)),
_ => Err(error::ErrorKind::ChangesTriesNotSupported.into()),
}
}
/// Create a new block, built on the head of the chain.
pub fn new_block<InherentData>(
&self
@@ -1713,9 +1737,8 @@ pub(crate) mod tests {
let (client, _, test_cases) = prepare_client_with_key_changes();
for (index, (begin, end, key, expected_result)) in test_cases.into_iter().enumerate() {
let begin = client.block_hash(begin).unwrap().unwrap();
let end = client.block_hash(end).unwrap().unwrap();
let actual_result = client.key_changes(begin, end, &key).unwrap();
let actual_result = client.key_changes(begin, BlockId::Hash(end), &StorageKey(key)).unwrap();
match actual_result == expected_result {
true => (),
false => panic!(format!("Failed test {}: actual = {:?}, expected = {:?}",
+26 -6
View File
@@ -22,14 +22,14 @@ use parking_lot::RwLock;
use crate::error;
use crate::backend::{self, NewBlockState};
use crate::light;
use primitives::storage::well_known_keys;
use primitives::{ChangesTrieConfiguration, storage::well_known_keys};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero,
NumberFor, As, Digest, DigestItem, AuthorityIdFor};
use runtime_primitives::{Justification, StorageMap, ChildrenStorageMap};
use crate::blockchain::{self, BlockStatus, HeaderBackend};
use state_machine::backend::{Backend as StateBackend, InMemory, Consolidate};
use state_machine::InMemoryChangesTrieStorage;
use state_machine::{self, InMemoryChangesTrieStorage, ChangesTrieAnchorBlockId};
use hash_db::Hasher;
use heapsize::HeapSizeOf;
use crate::leaves::LeafSet;
@@ -505,7 +505,7 @@ where
H::Out: HeapSizeOf + Ord,
{
states: RwLock<HashMap<Block::Hash, InMemory<H>>>,
changes_trie_storage: InMemoryChangesTrieStorage<H>,
changes_trie_storage: ChangesTrieStorage<H>,
blockchain: Blockchain<Block>,
}
@@ -519,7 +519,7 @@ where
pub fn new() -> Backend<Block, H> {
Backend {
states: RwLock::new(HashMap::new()),
changes_trie_storage: InMemoryChangesTrieStorage::new(),
changes_trie_storage: ChangesTrieStorage(InMemoryChangesTrieStorage::new()),
blockchain: Blockchain::new(),
}
}
@@ -555,7 +555,7 @@ where
type BlockImportOperation = BlockImportOperation<Block, H>;
type Blockchain = Blockchain<Block>;
type State = InMemory<H>;
type ChangesTrieStorage = InMemoryChangesTrieStorage<H>;
type ChangesTrieStorage = ChangesTrieStorage<H>;
fn begin_operation(&self, block: BlockId<Block>) -> error::Result<Self::BlockImportOperation> {
let state = match block {
@@ -587,7 +587,7 @@ where
if let Some(changes_trie_root) = changes_trie_root {
if let Some(changes_trie_update) = operation.changes_trie_update {
let changes_trie_root: H::Out = changes_trie_root.into();
self.changes_trie_storage.insert(header.number().as_(), changes_trie_root, changes_trie_update);
self.changes_trie_storage.0.insert(header.number().as_(), changes_trie_root, changes_trie_update);
}
}
@@ -652,6 +652,26 @@ impl<Block: BlockT> blockchain::Cache<Block> for Cache<Block> {
}
}
/// Prunable in-memory changes trie storage.
pub struct ChangesTrieStorage<H: Hasher>(InMemoryChangesTrieStorage<H>) where H::Out: HeapSizeOf;
impl<H: Hasher> backend::PrunableStateChangesTrieStorage<H> for ChangesTrieStorage<H> where H::Out: HeapSizeOf {
fn oldest_changes_trie_block(&self, _config: &ChangesTrieConfiguration, _best_finalized: u64) -> u64 {
0
}
}
impl<H: Hasher> state_machine::ChangesTrieRootsStorage<H> for ChangesTrieStorage<H> where H::Out: HeapSizeOf {
fn root(&self, anchor: &ChangesTrieAnchorBlockId<H::Out>, block: u64) -> Result<Option<H::Out>, String> {
self.0.root(anchor, block)
}
}
impl<H: Hasher> state_machine::ChangesTrieStorage<H> for ChangesTrieStorage<H> where H::Out: HeapSizeOf {
fn get(&self, key: &H::Out) -> Result<Option<state_machine::DBValue>, String> {
self.0.get(key)
}
}
/// Insert authorities entry into in-memory blockchain cache. Extracted as a separate function to use it in tests.
pub fn cache_authorities_at<Block: BlockT>(
blockchain: &Blockchain<Block>,
+2 -3
View File
@@ -22,9 +22,8 @@ use futures::{Future, IntoFuture};
use parking_lot::RwLock;
use runtime_primitives::{generic::BlockId, Justification, StorageMap, ChildrenStorageMap};
use state_machine::{Backend as StateBackend, InMemoryChangesTrieStorage, TrieBackend};
use state_machine::{Backend as StateBackend, TrieBackend};
use runtime_primitives::traits::{Block as BlockT, NumberFor, AuthorityIdFor};
use crate::in_mem;
use crate::backend::{AuxStore, Backend as ClientBackend, BlockImportOperation, RemoteBackend, NewBlockState};
use crate::blockchain::HeaderBackend as BlockchainHeaderBackend;
@@ -95,7 +94,7 @@ impl<S, F, Block, H> ClientBackend<Block, H> for Backend<S, F> where
type BlockImportOperation = ImportOperation<Block, S, F>;
type Blockchain = Blockchain<S, F>;
type State = OnDemandState<Block, S, F>;
type ChangesTrieStorage = InMemoryChangesTrieStorage<H>;
type ChangesTrieStorage = in_mem::ChangesTrieStorage<H>;
fn begin_operation(&self, _block: BlockId<Block>) -> ClientResult<Self::BlockImportOperation> {
Ok(ImportOperation {
+8 -4
View File
@@ -403,7 +403,7 @@ pub mod tests {
RemoteCallRequest, RemoteHeaderRequest};
use crate::light::blockchain::tests::{DummyStorage, DummyBlockchain};
use primitives::{twox_128, Blake2Hasher};
use primitives::storage::well_known_keys;
use primitives::storage::{StorageKey, well_known_keys};
use runtime_primitives::generic::BlockId;
use state_machine::Backend;
use super::*;
@@ -546,6 +546,7 @@ pub mod tests {
let end_hash = remote_client.block_hash(end).unwrap().unwrap();
// 'fetch' changes proof from remote node
let key = StorageKey(key);
let remote_proof = remote_client.key_changes_proof(
begin_hash, end_hash, begin_hash, max_hash, &key
).unwrap();
@@ -558,7 +559,7 @@ pub mod tests {
last_block: (end, end_hash),
max_block: (max, max_hash),
tries_roots: (begin, begin_hash, local_roots_range),
key: key,
key: key.0,
retry_count: None,
};
let local_result = local_checker.check_changes_proof(&request, ChangesProof {
@@ -583,6 +584,7 @@ pub mod tests {
// (1, 4, dave.clone(), vec![(4, 0), (1, 1), (1, 0)]),
let (remote_client, remote_roots, _) = prepare_client_with_key_changes();
let dave = twox_128(&runtime::system::balance_of_key(Keyring::Dave.to_raw_public().into())).to_vec();
let dave = StorageKey(dave);
// 'fetch' changes proof from remote node:
// we're fetching changes for range b1..b4
@@ -611,7 +613,7 @@ pub mod tests {
last_block: (4, b4),
max_block: (4, b4),
tries_roots: (3, b3, vec![remote_roots[2].clone(), remote_roots[3].clone()]),
key: dave,
key: dave.0,
retry_count: None,
};
let local_result = local_checker.check_changes_proof_with_cht_size(&request, ChangesProof {
@@ -640,6 +642,7 @@ pub mod tests {
let end_hash = remote_client.block_hash(end).unwrap().unwrap();
// 'fetch' changes proof from remote node
let key = StorageKey(key);
let remote_proof = remote_client.key_changes_proof(
begin_hash, end_hash, begin_hash, max_hash, &key).unwrap();
@@ -650,7 +653,7 @@ pub mod tests {
last_block: (end, end_hash),
max_block: (max, max_hash),
tries_roots: (begin, begin_hash, local_roots_range.clone()),
key: key,
key: key.0,
retry_count: None,
};
@@ -693,6 +696,7 @@ pub mod tests {
let local_cht_root = cht::compute_root::<Header, Blake2Hasher, _>(
4, 0, remote_roots.iter().cloned().map(|ct| Ok(Some(ct)))).unwrap();
let dave = twox_128(&runtime::system::balance_of_key(Keyring::Dave.to_raw_public().into())).to_vec();
let dave = StorageKey(dave);
// 'fetch' changes proof from remote node:
// we're fetching changes for range b1..b4
+3 -3
View File
@@ -24,7 +24,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, AuthorityId
use runtime_primitives::generic::{BlockId};
use consensus::{ImportBlock, ImportResult};
use runtime_primitives::Justification;
use primitives::{H256, Blake2Hasher};
use primitives::{H256, Blake2Hasher, storage::StorageKey};
/// Local client abstraction for the network.
pub trait Client<Block: BlockT>: Send + Sync {
@@ -66,7 +66,7 @@ pub trait Client<Block: BlockT>: Send + Sync {
last: Block::Hash,
min: Block::Hash,
max: Block::Hash,
key: &[u8]
key: &StorageKey
) -> Result<ChangesProof<Block::Header>, Error>;
}
@@ -125,7 +125,7 @@ impl<B, E, Block, RA> Client<Block> for SubstrateClient<B, E, Block, RA> where
last: Block::Hash,
min: Block::Hash,
max: Block::Hash,
key: &[u8]
key: &StorageKey
) -> Result<ChangesProof<Block::Header>, Error> {
(self as &SubstrateClient<B, E, Block, RA>).key_changes_proof(first, last, min, max, key)
}
+4 -2
View File
@@ -22,6 +22,7 @@ use parking_lot::RwLock;
use rustc_hex::ToHex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, As, Zero};
use runtime_primitives::generic::BlockId;
use primitives::storage::StorageKey;
use network_libp2p::{NodeIndex, Severity};
use codec::{Encode, Decode};
use consensus::import_queue::ImportQueue;
@@ -712,11 +713,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
fn on_remote_changes_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteChangesRequest<B::Hash>) {
trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})",
request.id, who, request.key.to_hex(), request.first, request.last);
let proof = match self.context_data.chain.key_changes_proof(request.first, request.last, request.min, request.max, &request.key) {
let key = StorageKey(request.key);
let proof = match self.context_data.chain.key_changes_proof(request.first, request.last, request.min, request.max, &key) {
Ok(proof) => proof,
Err(error) => {
trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{}) failed with: {}",
request.id, who, request.key.to_hex(), request.first, request.last, error);
request.id, who, key.0.to_hex(), request.first, request.last, error);
ChangesProof::<B::Header> {
max_block: Zero::zero(),
proof: vec![],
+178 -71
View File
@@ -17,7 +17,8 @@
//! Substrate state API.
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
ops::Range,
sync::Arc,
};
@@ -31,7 +32,7 @@ use primitives::storage::{self, StorageKey, StorageData, StorageChangeSet};
use rpc::Result as RpcResult;
use rpc::futures::{stream, Future, Sink, Stream};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Header, ProvideRuntimeApi};
use runtime_primitives::traits::{Block as BlockT, Header, ProvideRuntimeApi, As, NumberFor};
use runtime_version::RuntimeVersion;
use subscriptions::Subscriptions;
@@ -112,7 +113,25 @@ pub struct State<B, E, Block: BlockT, RA> {
subscriptions: Subscriptions,
}
impl<B, E, Block: BlockT, RA> State<B, E, Block, RA> {
/// Ranges to query in state_queryStorage.
struct QueryStorageRange<Block: BlockT> {
/// Hashes of all the blocks in the range.
pub hashes: Vec<Block::Hash>,
/// Number of the first block in the range.
pub first_number: NumberFor<Block>,
/// Blocks subrange ([begin; end) indices within `hashes`) where we should read keys at
/// each state to get changes.
pub unfiltered_range: Range<usize>,
/// Blocks subrange ([begin; end) indices within `hashes`) where we could pre-filter
/// blocks-with-changes by using changes tries.
pub filtered_range: Option<Range<usize>>,
}
impl<B, E, Block: BlockT, RA> State<B, E, Block, RA> where
Block: BlockT<Hash=H256>,
B: client::backend::Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher>,
{
/// Create new State API RPC handler.
pub fn new(client: Arc<Client<B, E, Block, RA>>, subscriptions: Subscriptions) -> Self {
Self {
@@ -120,6 +139,128 @@ impl<B, E, Block: BlockT, RA> State<B, E, Block, RA> {
subscriptions,
}
}
/// Splits the `query_storage` block range into 'filtered' and 'unfiltered' subranges.
/// Blocks that contain changes within filtered subrange could be filtered using changes tries.
/// Blocks that contain changes within unfiltered subrange must be filtered manually.
fn split_query_storage_range(
&self,
from: Block::Hash,
to: Trailing<Block::Hash>
) -> Result<QueryStorageRange<Block>> {
let to = self.unwrap_or_best(to)?;
let from_hdr = self.client.header(&BlockId::hash(from))?;
let to_hdr = self.client.header(&BlockId::hash(to))?;
match (from_hdr, to_hdr) {
(Some(ref from), Some(ref to)) if from.number() <= to.number() => {
// check if we can get from `to` to `from` by going through parent_hashes.
let from_number = *from.number();
let blocks = {
let mut blocks = vec![to.hash()];
let mut last = to.clone();
while *last.number() > from_number {
if let Some(hdr) = self.client.header(&BlockId::hash(*last.parent_hash()))? {
blocks.push(hdr.hash());
last = hdr;
} else {
bail!(invalid_block_range(
Some(from),
Some(to),
format!("Parent of {} ({}) not found", last.number(), last.hash()),
))
}
}
if last.hash() != from.hash() {
bail!(invalid_block_range(
Some(from),
Some(to),
format!("Expected to reach `from`, got {} ({})", last.number(), last.hash()),
))
}
blocks.reverse();
blocks
};
// check if we can filter blocks-with-changes from some (sub)range using changes tries
let changes_trie_range = self.client.max_key_changes_range(from_number, BlockId::Hash(to.hash()))?;
let filtered_range_begin = changes_trie_range.map(|(begin, _)| (begin - from_number).as_() as usize);
let (unfiltered_range, filtered_range) = split_range(blocks.len(), filtered_range_begin);
Ok(QueryStorageRange {
hashes: blocks,
first_number: from_number,
unfiltered_range,
filtered_range,
})
},
(from, to) => bail!(
invalid_block_range(from.as_ref(), to.as_ref(), "Invalid range or unknown block".into())
),
}
}
/// Iterates through range.unfiltered_range and check each block for changes of keys' values.
fn query_storage_unfiltered(
&self,
range: &QueryStorageRange<Block>,
keys: &[StorageKey],
changes: &mut Vec<StorageChangeSet<Block::Hash>>,
) -> Result<()> {
let mut last_state: HashMap<_, Option<_>> = Default::default();
for block in range.unfiltered_range.start..range.unfiltered_range.end {
let block_hash = range.hashes[block].clone();
let mut block_changes = StorageChangeSet { block: block_hash.clone(), changes: Vec::new() };
let id = BlockId::hash(block_hash);
for key in keys {
let (has_changed, data) = {
let curr_data = self.client.storage(&id, key)?;
let prev_data = last_state.get(key).and_then(|x| x.as_ref());
(curr_data.as_ref() != prev_data, curr_data)
};
if has_changed {
block_changes.changes.push((key.clone(), data.clone()));
}
last_state.insert(key.clone(), data);
}
changes.push(block_changes);
}
Ok(())
}
/// Iterates through all blocks that are changing keys within range.filtered_range and collects these changes.
fn query_storage_filtered(
&self,
range: &QueryStorageRange<Block>,
keys: &[StorageKey],
changes: &mut Vec<StorageChangeSet<Block::Hash>>,
) -> Result<()> {
let (begin, end) = match range.filtered_range {
Some(ref filtered_range) => (
range.first_number + As::sa(filtered_range.start as u64),
BlockId::Hash(range.hashes[filtered_range.end - 1].clone())
),
None => return Ok(()),
};
let mut changes_map: BTreeMap<NumberFor<Block>, StorageChangeSet<Block::Hash>> = BTreeMap::new();
for key in keys {
let mut last_block = None;
for (block, _) in self.client.key_changes(begin, end, key)? {
if last_block == Some(block) {
continue;
}
let block_hash = range.hashes[(block - range.first_number).as_() as usize].clone();
let id = BlockId::Hash(block_hash);
let value_at_block = self.client.storage(&id, key)?;
changes_map.entry(block)
.or_insert_with(|| StorageChangeSet { block: block_hash, changes: Vec::new() })
.changes.push((key.clone(), value_at_block));
last_block = Some(block);
}
}
if let Some(additional_capacity) = changes_map.len().checked_sub(changes.len()) {
changes.reserve(additional_capacity);
}
changes.extend(changes_map.into_iter().map(|(_, cs)| cs));
Ok(())
}
}
impl<B, E, Block, RA> State<B, E, Block, RA> where
@@ -178,72 +319,17 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where
self.client.runtime_api().metadata(&BlockId::Hash(block)).map(Into::into).map_err(Into::into)
}
fn query_storage(&self, keys: Vec<StorageKey>, from: Block::Hash, to: Trailing<Block::Hash>) -> Result<Vec<StorageChangeSet<Block::Hash>>> {
let to = self.unwrap_or_best(to)?;
let from_hdr = self.client.header(&BlockId::hash(from))?;
let to_hdr = self.client.header(&BlockId::hash(to))?;
match (from_hdr, to_hdr) {
(Some(ref from), Some(ref to)) if from.number() <= to.number() => {
let from = from.clone();
let to = to.clone();
// check if we can get from `to` to `from` by going through parent_hashes.
let blocks = {
let mut blocks = vec![to.hash()];
let mut last = to.clone();
while last.number() > from.number() {
if let Some(hdr) = self.client.header(&BlockId::hash(*last.parent_hash()))? {
blocks.push(hdr.hash());
last = hdr;
} else {
bail!(invalid_block_range(
Some(from),
Some(to),
format!("Parent of {} ({}) not found", last.number(), last.hash()),
))
}
}
if last.hash() != from.hash() {
bail!(invalid_block_range(
Some(from),
Some(to),
format!("Expected to reach `from`, got {} ({})", last.number(), last.hash()),
))
}
blocks.reverse();
blocks
};
let mut result = Vec::new();
let mut last_state: HashMap<_, Option<_>> = Default::default();
for block in blocks {
let mut changes = vec![];
let id = BlockId::hash(block.clone());
for key in &keys {
let (has_changed, data) = {
let curr_data = self.client.storage(&id, key)?;
let prev_data = last_state.get(key).and_then(|x| x.as_ref());
(curr_data.as_ref() != prev_data, curr_data)
};
if has_changed {
changes.push((key.clone(), data.clone()));
}
last_state.insert(key.clone(), data);
}
result.push(StorageChangeSet {
block,
changes,
});
}
Ok(result)
},
(from, to) => bail!(invalid_block_range(from, to, "Invalid range or unknown block".into())),
}
fn query_storage(
&self,
keys: Vec<StorageKey>,
from: Block::Hash,
to: Trailing<Block::Hash>
) -> Result<Vec<StorageChangeSet<Block::Hash>>> {
let range = self.split_query_storage_range(from, to)?;
let mut changes = Vec::new();
self.query_storage_unfiltered(&range, &keys, &mut changes)?;
self.query_storage_filtered(&range, &keys, &mut changes)?;
Ok(changes)
}
fn subscribe_storage(
@@ -348,8 +434,29 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where
}
}
fn invalid_block_range<H: Header>(from: Option<H>, to: Option<H>, reason: String) -> error::ErrorKind {
let to_string = |x: Option<H>| match x {
/// Splits passed range into two subranges where:
/// - first range has at least one element in it;
/// - second range (optionally) starts at given `middle` element.
pub(crate) fn split_range(size: usize, middle: Option<usize>) -> (Range<usize>, Option<Range<usize>>) {
// check if we can filter blocks-with-changes from some (sub)range using changes tries
let range2_begin = match middle {
// some of required changes tries are pruned => use available tries
Some(middle) if middle != 0 => Some(middle),
// all required changes tries are available, but we still want values at first block
// => do 'unfiltered' read for the first block and 'filtered' for the rest
Some(_) if size > 1 => Some(1),
// range contains single element => do not use changes tries
Some(_) => None,
// changes tries are not available => do 'unfiltered' read for the whole range
None => None,
};
let range1 = 0..range2_begin.unwrap_or(size);
let range2 = range2_begin.map(|begin| begin..size);
(range1, range2)
}
fn invalid_block_range<H: Header>(from: Option<&H>, to: Option<&H>, reason: String) -> error::ErrorKind {
let to_string = |x: Option<&H>| match x {
None => "unknown hash".into(),
Some(h) => format!("{} ({})", h.number(), h.hash()),
};
+82 -53
View File
@@ -117,66 +117,95 @@ fn should_send_initial_storage_changes_and_notifications() {
#[test]
fn should_query_storage() {
let core = ::tokio::runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new());
let api = State::new(client.clone(), Subscriptions::new(core.executor()));
type TestClient = test_client::client::Client<
test_client::Backend,
test_client::Executor,
runtime::Block,
runtime::RuntimeApi
>;
let add_block = |nonce| {
let mut builder = client.new_block().unwrap();
builder.push_transfer(runtime::Transfer {
from: Keyring::Alice.to_raw_public().into(),
to: Keyring::Ferdie.to_raw_public().into(),
amount: 42,
nonce,
}).unwrap();
let block = builder.bake().unwrap();
let hash = block.header.hash();
client.import(BlockOrigin::Own, block).unwrap();
hash
};
let block1_hash = add_block(0);
let block2_hash = add_block(1);
let genesis_hash = client.genesis_hash();
fn run_tests(client: Arc<TestClient>) {
let core = ::tokio::runtime::Runtime::new().unwrap();
let api = State::new(client.clone(), Subscriptions::new(core.executor()));
let add_block = |nonce| {
let mut builder = client.new_block().unwrap();
builder.push_transfer(runtime::Transfer {
from: Keyring::Alice.to_raw_public().into(),
to: Keyring::Ferdie.to_raw_public().into(),
amount: 42,
nonce,
}).unwrap();
let block = builder.bake().unwrap();
let hash = block.header.hash();
client.import(BlockOrigin::Own, block).unwrap();
hash
};
let block1_hash = add_block(0);
let block2_hash = add_block(1);
let genesis_hash = client.genesis_hash();
let mut expected = vec![
StorageChangeSet {
block: genesis_hash,
let mut expected = vec![
StorageChangeSet {
block: genesis_hash,
changes: vec![
(
StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()),
Some(StorageData(vec![232, 3, 0, 0, 0, 0, 0, 0]))
),
],
},
StorageChangeSet {
block: block1_hash,
changes: vec![
(
StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()),
Some(StorageData(vec![190, 3, 0, 0, 0, 0, 0, 0]))
),
],
},
];
// Query changes only up to block1
let result = api.query_storage(
vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())],
genesis_hash,
Some(block1_hash).into(),
);
assert_eq!(result.unwrap(), expected);
// Query all changes
let result = api.query_storage(
vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())],
genesis_hash,
None.into(),
);
expected.push(StorageChangeSet {
block: block2_hash,
changes: vec![
(StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), Some(StorageData(vec![232, 3, 0, 0, 0, 0, 0, 0]))),
(
StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()),
Some(StorageData(vec![148, 3, 0, 0, 0, 0, 0, 0]))
),
],
},
StorageChangeSet {
block: block1_hash,
changes: vec![
(StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), Some(StorageData(vec![190, 3, 0, 0, 0, 0, 0, 0]))),
],
},
];
});
assert_eq!(result.unwrap(), expected);
}
// Query changes only up to block1
let result = api.query_storage(
vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())],
genesis_hash,
Some(block1_hash).into(),
);
run_tests(Arc::new(test_client::new()));
run_tests(Arc::new(test_client::new_with_changes_trie()));
}
assert_eq!(result.unwrap(), expected);
// Query all changes
let result = api.query_storage(
vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())],
genesis_hash,
None.into(),
);
expected.push(StorageChangeSet {
block: block2_hash,
changes: vec![
(StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), Some(StorageData(vec![148, 3, 0, 0, 0, 0, 0, 0]))),
],
});
assert_eq!(result.unwrap(), expected);
#[test]
fn should_split_ranges() {
assert_eq!(split_range(1, None), (0..1, None));
assert_eq!(split_range(100, None), (0..100, None));
assert_eq!(split_range(1, Some(0)), (0..1, None));
assert_eq!(split_range(100, Some(50)), (0..50, Some(50..100)));
assert_eq!(split_range(100, Some(99)), (0..99, Some(99..100)));
}
@@ -31,15 +31,16 @@ use trie_backend_essence::{TrieBackendEssence};
/// Return changes of given key at given blocks range.
/// `max` is the number of best known block.
pub fn key_changes<S: Storage<H>, H: Hasher>(
config: &Configuration,
storage: &S,
/// Changes are returned in descending order (i.e. last block comes first).
pub fn key_changes<'a, S: Storage<H>, H: Hasher>(
config: &'a Configuration,
storage: &'a S,
begin: u64,
end: &AnchorBlockId<H::Out>,
end: &'a AnchorBlockId<H::Out>,
max: u64,
key: &[u8],
) -> Result<Vec<(u64, u32)>, String> where H::Out: HeapSizeOf {
DrilldownIterator {
key: &'a [u8],
) -> Result<DrilldownIterator<'a, S, S, H>, String> where H::Out: HeapSizeOf {
Ok(DrilldownIterator {
essence: DrilldownIteratorEssence {
key,
roots_storage: storage,
@@ -53,7 +54,7 @@ pub fn key_changes<S: Storage<H>, H: Hasher>(
_hasher: ::std::marker::PhantomData::<H>::default(),
},
}.collect()
})
}
/// Returns proof of changes of given key at given blocks range.
@@ -93,6 +94,7 @@ pub fn key_changes_proof<S: Storage<H>, H: Hasher>(
/// Check key changes proog and return changes of the key at given blocks range.
/// `max` is the number of best known block.
/// Changes are returned in descending order (i.e. last block comes first).
pub fn key_changes_proof_check<S: RootsStorage<H>, H: Hasher>(
config: &Configuration,
roots_storage: &S,
@@ -261,7 +263,7 @@ impl<'a, RS: 'a + RootsStorage<H>, S: Storage<H>, H: Hasher> DrilldownIteratorEs
}
/// Exploring drilldown operator.
struct DrilldownIterator<'a, RS: 'a + RootsStorage<H>, S: 'a + Storage<H>, H: Hasher> where H::Out: 'a {
pub struct DrilldownIterator<'a, RS: 'a + RootsStorage<H>, S: 'a + Storage<H>, H: Hasher> where H::Out: 'a {
essence: DrilldownIteratorEssence<'a, RS, S, H>,
}
@@ -379,6 +381,7 @@ fn lower_bound_max_digest(
#[cfg(test)]
mod tests {
use std::iter::FromIterator;
use primitives::Blake2Hasher;
use changes_trie::input::InputPair;
use changes_trie::storage::InMemoryStorage;
@@ -427,23 +430,28 @@ mod tests {
fn drilldown_iterator_works() {
let (config, storage) = prepare_for_drilldown();
let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 16 }, 16, &[42]);
&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 16 }, 16, &[42])
.and_then(Result::from_iter);
assert_eq!(drilldown_result, Ok(vec![(8, 2), (8, 1), (6, 3), (3, 0)]));
let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 2 }, 4, &[42]);
&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 2 }, 4, &[42])
.and_then(Result::from_iter);
assert_eq!(drilldown_result, Ok(vec![]));
let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 3 }, 4, &[42]);
&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 3 }, 4, &[42])
.and_then(Result::from_iter);
assert_eq!(drilldown_result, Ok(vec![(3, 0)]));
let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
&config, &storage, 7, &AnchorBlockId { hash: Default::default(), number: 8 }, 8, &[42]);
&config, &storage, 7, &AnchorBlockId { hash: Default::default(), number: 8 }, 8, &[42])
.and_then(Result::from_iter);
assert_eq!(drilldown_result, Ok(vec![(8, 2), (8, 1)]));
let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
&config, &storage, 5, &AnchorBlockId { hash: Default::default(), number: 7 }, 8, &[42]);
&config, &storage, 5, &AnchorBlockId { hash: Default::default(), number: 7 }, 8, &[42])
.and_then(Result::from_iter);
assert_eq!(drilldown_result, Ok(vec![(6, 3)]));
}
@@ -453,7 +461,8 @@ mod tests {
storage.clear_storage();
assert!(key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 100 }, 1000, &[42]).is_err());
&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 100 }, 1000, &[42])
.and_then(|i| i.collect::<Result<Vec<_>, _>>()).is_err());
}
#[test]
@@ -44,7 +44,7 @@ mod storage;
pub use self::storage::InMemoryStorage;
pub use self::changes_iterator::{key_changes, key_changes_proof, key_changes_proof_check};
pub use self::prune::prune;
pub use self::prune::{prune, oldest_non_pruned_trie};
use hash_db::Hasher;
use heapsize::HeapSizeOf;
@@ -24,6 +24,22 @@ use trie_backend_essence::TrieBackendEssence;
use changes_trie::{AnchorBlockId, Configuration, Storage};
use changes_trie::storage::TrieBackendAdapter;
/// Get number of oldest block for which changes trie is not pruned
/// given changes trie configuration, pruning parameter and number of
/// best finalized block.
pub fn oldest_non_pruned_trie(
config: &Configuration,
min_blocks_to_keep: u64,
best_finalized_block: u64,
) -> u64 {
let max_digest_interval = config.max_digest_interval();
let max_digest_block = best_finalized_block - best_finalized_block % max_digest_interval;
match pruning_range(config, min_blocks_to_keep, max_digest_block) {
Some((_, last_pruned_block)) => last_pruned_block + 1,
None => 1,
}
}
/// Prune obslete changes tries. Puning happens at the same block, where highest
/// level digest is created. Pruning guarantees to save changes tries for last
/// `min_blocks_to_keep` blocks. We only prune changes tries at `max_digest_iterval`
@@ -268,4 +284,23 @@ mod tests {
assert_eq!(max_digest_intervals_to_keep(1024, 511), 2);
assert_eq!(max_digest_intervals_to_keep(1024, 100), 10);
}
#[test]
fn oldest_non_pruned_trie_works() {
// when digests are not created at all
assert_eq!(oldest_non_pruned_trie(&config(0, 0), 100, 10), 1);
assert_eq!(oldest_non_pruned_trie(&config(0, 0), 100, 110), 11);
// when only l1 digests are created
assert_eq!(oldest_non_pruned_trie(&config(100, 1), 100, 50), 1);
assert_eq!(oldest_non_pruned_trie(&config(100, 1), 100, 110), 1);
assert_eq!(oldest_non_pruned_trie(&config(100, 1), 100, 210), 101);
// when l2 digests are created
assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 50), 1);
assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 110), 1);
assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 210), 1);
assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 10110), 1);
assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 20110), 10001);
}
}
+2 -1
View File
@@ -59,7 +59,8 @@ pub use changes_trie::{
RootsStorage as ChangesTrieRootsStorage,
InMemoryStorage as InMemoryChangesTrieStorage,
key_changes, key_changes_proof, key_changes_proof_check,
prune as prune_changes_tries};
prune as prune_changes_tries,
oldest_non_pruned_trie as oldest_non_pruned_changes_trie};
pub use overlayed_changes::OverlayedChanges;
pub use proving_backend::{create_proof_check_backend, create_proof_check_backend_storage};
pub use trie_backend_essence::{TrieBackendStorage, Storage};