Move authorities interface from Core to consensus (#1412)

* Move authorities interface from Core to consensus

f

* notify all caches of block insert + create with up-to-date best_fin

* merged authorities_are_cached from light_grandpa_import2

* Add ProvideCache trait

* Create helper function for 'get_cache'

* Fix some formatting

* Bump impl version

* Resolve wasm conflicts

* Apply review comments

* Use try_for_each

* Move authorities interface from Core to consensus

f

* notify all caches of block insert + create with up-to-date best_fin

* merged authorities_are_cached from light_grandpa_import2

* Add ProvideCache trait

* Create helper function for 'get_cache'

* Fix some formatting

* Bump impl version

* Resolve wasm conflicts

* Apply review comments

* Use try_for_each

* Move authorities interface from Core to consensus

f

* notify all caches of block insert + create with up-to-date best_fin

* merged authorities_are_cached from light_grandpa_import2

* Add ProvideCache trait

* Create helper function for 'get_cache'

* Fix some formatting

* Bump impl version

* Resolve wasm conflicts

* Apply review comments

* Use try_for_each

* Increment impl_version

* Update lib.rs
This commit is contained in:
Stanislav Tkach
2019-03-29 18:41:22 +02:00
committed by Gav Wood
parent 55788fdf77
commit cbfc36b39f
44 changed files with 650 additions and 409 deletions
+123 -47
View File
@@ -16,7 +16,7 @@
//! DB-backed cache of blockchain data.
use std::sync::Arc;
use std::{sync::Arc, collections::HashMap};
use parking_lot::RwLock;
use kvdb::{KeyValueDB, DBTransaction};
@@ -25,7 +25,7 @@ use client::blockchain::Cache as BlockchainCache;
use client::error::Result as ClientResult;
use parity_codec::{Encode, Decode};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, As, AuthorityIdFor};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, As};
use crate::utils::{self, COLUMN_META};
use self::list_cache::ListCache;
@@ -64,7 +64,12 @@ impl<T> CacheItemT for T where T: Clone + Decode + Encode + PartialEq {}
/// Database-backed blockchain data cache.
pub struct DbCache<Block: BlockT> {
authorities_at: ListCache<Block, Vec<AuthorityIdFor<Block>>, self::list_storage::DbStorage>,
cache_at: HashMap<Vec<u8>, ListCache<Block, Vec<u8>, self::list_storage::DbStorage>>,
db: Arc<KeyValueDB>,
key_lookup_column: Option<u32>,
header_column: Option<u32>,
authorities_column: Option<u32>,
best_finalized_block: ComplexBlockId<Block>,
}
impl<Block: BlockT> DbCache<Block> {
@@ -76,19 +81,13 @@ impl<Block: BlockT> DbCache<Block> {
authorities_column: Option<u32>,
best_finalized_block: ComplexBlockId<Block>,
) -> Self {
DbCache {
authorities_at: ListCache::new(
self::list_storage::DbStorage::new(b"auth".to_vec(), db,
self::list_storage::DbColumns {
meta: COLUMN_META,
key_lookup: key_lookup_column,
header: header_column,
cache: authorities_column,
},
),
As::sa(PRUNE_DEPTH),
best_finalized_block,
),
Self {
cache_at: HashMap::new(),
db,
key_lookup_column,
header_column,
authorities_column,
best_finalized_block,
}
}
@@ -97,35 +96,82 @@ impl<Block: BlockT> DbCache<Block> {
DbCacheTransaction {
cache: self,
tx,
authorities_at_op: None,
cache_at_op: HashMap::new(),
best_finalized_block: None,
}
}
/// Run post-commit cache operations.
pub fn commit(&mut self, ops: DbCacheTransactionOps<Block>) {
if let Some(authorities_at_op) = ops.authorities_at_op {
self.authorities_at.on_transaction_commit(authorities_at_op);
for (name, op) in ops.cache_at_op.into_iter() {
self.get_cache(name).on_transaction_commit(op);
}
if let Some(best_finalized_block) = ops.best_finalized_block {
self.best_finalized_block = best_finalized_block;
}
}
/// Creates `ListCache` with the given name or returns a reference to the existing.
fn get_cache(&mut self, name: Vec<u8>) -> &mut ListCache<Block, Vec<u8>, self::list_storage::DbStorage> {
get_cache_helper(
&mut self.cache_at,
name,
&self.db,
self.key_lookup_column,
self.header_column,
self.authorities_column,
&self.best_finalized_block
)
}
}
// This helper is needed because otherwise the borrow checker will require to
// clone all parameters outside of the closure.
fn get_cache_helper<'a, Block: BlockT>(
cache_at: &'a mut HashMap<Vec<u8>, ListCache<Block, Vec<u8>, self::list_storage::DbStorage>>,
name: Vec<u8>,
db: &Arc<KeyValueDB>,
key_lookup: Option<u32>,
header: Option<u32>,
cache: Option<u32>,
best_finalized_block: &ComplexBlockId<Block>,
) -> &'a mut ListCache<Block, Vec<u8>, self::list_storage::DbStorage> {
cache_at.entry(name.clone()).or_insert_with(|| {
ListCache::new(
self::list_storage::DbStorage::new(name, db.clone(),
self::list_storage::DbColumns {
meta: COLUMN_META,
key_lookup,
header,
cache,
},
),
As::sa(PRUNE_DEPTH),
best_finalized_block.clone(),
)
})
}
/// Cache operations that are to be committed after database transaction is committed.
pub struct DbCacheTransactionOps<Block: BlockT> {
authorities_at_op: Option<self::list_cache::CommitOperation<Block, Vec<AuthorityIdFor<Block>>>>,
cache_at_op: HashMap<Vec<u8>, self::list_cache::CommitOperation<Block, Vec<u8>>>,
best_finalized_block: Option<ComplexBlockId<Block>>,
}
/// Database-backed blockchain data cache transaction valid for single block import.
pub struct DbCacheTransaction<'a, Block: BlockT> {
cache: &'a mut DbCache<Block>,
tx: &'a mut DBTransaction,
authorities_at_op: Option<self::list_cache::CommitOperation<Block, Vec<AuthorityIdFor<Block>>>>,
cache_at_op: HashMap<Vec<u8>, self::list_cache::CommitOperation<Block, Vec<u8>>>,
best_finalized_block: Option<ComplexBlockId<Block>>,
}
impl<'a, Block: BlockT> DbCacheTransaction<'a, Block> {
/// Convert transaction into post-commit operations set.
pub fn into_ops(self) -> DbCacheTransactionOps<Block> {
DbCacheTransactionOps {
authorities_at_op: self.authorities_at_op,
cache_at_op: self.cache_at_op,
best_finalized_block: self.best_finalized_block,
}
}
@@ -134,21 +180,42 @@ impl<'a, Block: BlockT> DbCacheTransaction<'a, Block> {
mut self,
parent: ComplexBlockId<Block>,
block: ComplexBlockId<Block>,
authorities_at: Option<Vec<AuthorityIdFor<Block>>>,
data_at: HashMap<Vec<u8>, Vec<u8>>,
is_final: bool,
) -> ClientResult<Self> {
assert!(self.authorities_at_op.is_none());
assert!(self.cache_at_op.is_empty());
self.authorities_at_op = self.cache.authorities_at.on_block_insert(
&mut self::list_storage::DbStorageTransaction::new(
self.cache.authorities_at.storage(),
&mut self.tx
),
parent,
block,
authorities_at,
is_final,
)?;
// prepare list of caches that are not update
// (we might still need to do some cache maintenance in this case)
let missed_caches = self.cache.cache_at.keys()
.filter(|cache| !data_at.contains_key(cache.clone()))
.cloned()
.collect::<Vec<_>>();
let mut insert_op = |name: Vec<u8>, value: Option<Vec<u8>>| -> Result<(), client::error::Error> {
let cache = self.cache.get_cache(name.clone());
let op = cache.on_block_insert(
&mut self::list_storage::DbStorageTransaction::new(
cache.storage(),
&mut self.tx,
),
parent.clone(),
block.clone(),
value.or(cache.value_at_block(&parent)?),
is_final,
)?;
if let Some(op) = op {
self.cache_at_op.insert(name, op);
}
Ok(())
};
data_at.into_iter().try_for_each(|(name, data)| insert_op(name, Some(data)))?;
missed_caches.into_iter().try_for_each(|name| insert_op(name, None))?;
if is_final {
self.best_finalized_block = Some(block);
}
Ok(self)
}
@@ -159,16 +226,24 @@ impl<'a, Block: BlockT> DbCacheTransaction<'a, Block> {
parent: ComplexBlockId<Block>,
block: ComplexBlockId<Block>
) -> ClientResult<Self> {
assert!(self.authorities_at_op.is_none());
assert!(self.cache_at_op.is_empty());
self.authorities_at_op = self.cache.authorities_at.on_block_finalize(
&mut self::list_storage::DbStorageTransaction::new(
self.cache.authorities_at.storage(),
&mut self.tx
),
parent,
block,
)?;
for (name, cache_at) in self.cache.cache_at.iter() {
let op = cache_at.on_block_finalize(
&mut self::list_storage::DbStorageTransaction::new(
cache_at.storage(),
&mut self.tx
),
parent.clone(),
block.clone(),
)?;
if let Some(op) = op {
self.cache_at_op.insert(name.to_owned(), op);
}
}
self.best_finalized_block = Some(block);
Ok(self)
}
@@ -178,12 +253,12 @@ impl<'a, Block: BlockT> DbCacheTransaction<'a, Block> {
pub struct DbCacheSync<Block: BlockT>(pub RwLock<DbCache<Block>>);
impl<Block: BlockT> BlockchainCache<Block> for DbCacheSync<Block> {
fn authorities_at(&self, at: BlockId<Block>) -> Option<Vec<AuthorityIdFor<Block>>> {
fn get_at(&self, key: &[u8], at: &BlockId<Block>) -> Option<Vec<u8>> {
let cache = self.0.read();
let storage = cache.authorities_at.storage();
let storage = cache.cache_at.get(key)?.storage();
let db = storage.db();
let columns = storage.columns();
let at = match at {
let at = match *at {
BlockId::Hash(hash) => {
let header = utils::read_header::<Block>(
&**db,
@@ -202,6 +277,7 @@ impl<Block: BlockT> BlockchainCache<Block> for DbCacheSync<Block> {
},
};
cache.authorities_at.value_at_block(&at).ok()?
cache.cache_at.get(key)?.value_at_block(&at).ok()?
}
}