Merge branch 'master' into rh-grandpa-dynamic2

This commit is contained in:
Robert Habermeier
2018-11-14 14:32:30 +01:00
235 changed files with 9413 additions and 3020 deletions
+142 -50
View File
@@ -14,7 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
// tag::description[]
//! Client backend that uses RocksDB database as storage.
//!
//! # Canonicality vs. Finality
@@ -24,8 +23,6 @@
//! having discarded heavy state that will allow a chain reorganization.
//!
//! Finality implies canonicality but not vice-versa.
//!
// end::description[]
extern crate substrate_client as client;
extern crate kvdb_rocksdb;
@@ -67,7 +64,7 @@ use hash_db::Hasher;
use kvdb::{KeyValueDB, DBTransaction};
use trie::MemoryDB;
use parking_lot::RwLock;
use primitives::{H256, AuthorityId, Blake2Hasher, ChangesTrieConfiguration};
use primitives::{H256, AuthorityId, Blake2Hasher, ChangesTrieConfiguration, convert_hash};
use primitives::storage::well_known_keys;
use runtime_primitives::{generic::BlockId, Justification, StorageMap, ChildrenStorageMap};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero, Digest, DigestItem};
@@ -97,17 +94,17 @@ pub struct DatabaseSettings {
}
/// Create an instance of db-backed client.
pub fn new_client<E, S, Block>(
pub fn new_client<E, S, Block, RA>(
settings: DatabaseSettings,
executor: E,
genesis_storage: S,
block_execution_strategy: ExecutionStrategy,
api_execution_strategy: ExecutionStrategy,
) -> Result<client::Client<Backend<Block>, client::LocalCallExecutor<Backend<Block>, E>, Block>, client::error::Error>
where
Block: BlockT<Hash=H256>,
E: CodeExecutor<Blake2Hasher> + RuntimeInfo,
S: BuildStorage,
) -> Result<client::Client<Backend<Block>, client::LocalCallExecutor<Backend<Block>, E>, Block, RA>, client::error::Error>
where
Block: BlockT<Hash=H256>,
E: CodeExecutor<Blake2Hasher> + RuntimeInfo,
S: BuildStorage,
{
let backend = Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?);
let executor = client::LocalCallExecutor::new(backend.clone(), executor);
@@ -148,7 +145,7 @@ impl<'a> state_db::MetaDb for StateMetaDb<'a> {
/// Block database
pub struct BlockchainDb<Block: BlockT> {
db: Arc<KeyValueDB>,
meta: RwLock<Meta<NumberFor<Block>, Block::Hash>>,
meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
}
@@ -159,7 +156,7 @@ impl<Block: BlockT> BlockchainDb<Block> {
Ok(BlockchainDb {
db,
leaves: RwLock::new(leaves),
meta: RwLock::new(meta),
meta: Arc::new(RwLock::new(meta)),
})
}
@@ -376,7 +373,7 @@ struct StorageDb<Block: BlockT> {
impl<Block: BlockT> state_machine::Storage<Blake2Hasher> for StorageDb<Block> {
fn get(&self, key: &H256) -> Result<Option<DBValue>, String> {
self.state_db.get(&key.0.into(), self).map(|r| r.map(|v| DBValue::from_slice(&v)))
self.state_db.get(key, self).map(|r| r.map(|v| DBValue::from_slice(&v)))
.map_err(|e| format!("Database backend error: {:?}", e))
}
}
@@ -386,7 +383,7 @@ impl<Block: BlockT> state_db::HashDb for StorageDb<Block> {
type Hash = H256;
fn get(&self, key: &H256) -> Result<Option<Vec<u8>>, Self::Error> {
self.db.get(columns::STATE, &key[..]).map(|r| r.map(|v| v.to_vec()))
self.db.get(columns::STATE, key.as_bytes()).map(|r| r.map(|v| v.to_vec()))
}
}
@@ -409,6 +406,7 @@ impl state_machine::Storage<Blake2Hasher> for DbGenesisStorage {
pub struct DbChangesTrieStorage<Block: BlockT> {
db: Arc<KeyValueDB>,
meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
min_blocks_to_keep: Option<u64>,
_phantom: ::std::marker::PhantomData<Block>,
}
@@ -422,7 +420,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
}
/// Prune obsolete changes tries.
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block: NumberFor<Block>) {
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor<Block>) {
// never prune on archive nodes
let min_blocks_to_keep = match self.min_blocks_to_keep {
Some(min_blocks_to_keep) => min_blocks_to_keep,
@@ -440,23 +438,54 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
&config,
&*self,
min_blocks_to_keep,
block.as_(),
&state_machine::ChangesTrieAnchorBlockId {
hash: convert_hash(&block_hash),
number: block_num.as_(),
},
|node| tx.delete(columns::CHANGES_TRIE, node.as_ref()));
}
}
impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
fn root(&self, block: u64) -> Result<Option<H256>, String> {
Ok(read_db::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Number(As::sa(block)))
.map_err(|err| format!("{}", err))
.and_then(|header| match header {
Some(header) => Block::Header::decode(&mut &header[..])
.ok_or_else(|| format!("Failed to parse header of block {}", block))
.map(Some),
None => Ok(None)
})?
.and_then(|header| header.digest().log(DigestItem::as_changes_trie_root)
.map(|root| H256::from_slice(root.as_ref()))))
fn root(&self, anchor: &state_machine::ChangesTrieAnchorBlockId<H256>, block: u64) -> Result<Option<H256>, String> {
// check API requirement
assert!(block <= anchor.number, "API requirement");
// we need to get hash of the block to resolve changes trie root
let block_id = if block <= self.meta.read().finalized_number.as_() {
// if block is finalized, we could just read canonical hash
BlockId::Number(As::sa(block))
} else {
// the block is not finalized
let mut current_num = anchor.number;
let mut current_hash: Block::Hash = convert_hash(&anchor.hash);
let maybe_anchor_header: Block::Header = ::utils::require_header::<Block>(
&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Number(As::sa(current_num))
).map_err(|e| e.to_string())?;
if maybe_anchor_header.hash() == current_hash {
// if anchor is canonicalized, then the block is also canonicalized
BlockId::Number(As::sa(block))
} else {
// else (block is not finalized + anchor is not canonicalized):
// => we should find the required block hash by traversing
// back from the anchor to the block with given number
while current_num != block {
let current_header: Block::Header = ::utils::require_header::<Block>(
&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Hash(current_hash)
).map_err(|e| e.to_string())?;
current_hash = *current_header.parent_hash();
current_num = current_num - 1;
}
BlockId::Hash(current_hash)
}
};
Ok(::utils::require_header::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, block_id)
.map_err(|e| e.to_string())?
.digest().log(DigestItem::as_changes_trie_root)
.map(|root| H256::from_slice(root.as_ref())))
}
}
@@ -502,6 +531,7 @@ impl<Block: BlockT> Backend<Block> {
fn from_kvdb(db: Arc<KeyValueDB>, pruning: PruningMode, canonicalization_delay: u64) -> Result<Self, client::error::Error> {
let is_archive_pruning = pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone())?;
let meta = blockchain.meta.clone();
let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e));
let state_db: StateDb<Block::Hash, H256> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?;
let storage_db = StorageDb {
@@ -510,6 +540,7 @@ impl<Block: BlockT> Backend<Block> {
};
let changes_tries_storage = DbChangesTrieStorage {
db,
meta,
min_blocks_to_keep: if is_archive_pruning { None } else { Some(MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR) },
_phantom: Default::default(),
};
@@ -589,7 +620,7 @@ impl<Block: BlockT> Backend<Block> {
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v));
self.changes_tries_storage.prune(changes_trie_config, transaction, f_num);
self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num);
}
Ok(())
@@ -755,9 +786,9 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
let mut changeset: state_db::ChangeSet<H256> = state_db::ChangeSet::default();
for (key, (val, rc)) in operation.updates.drain() {
if rc > 0 {
changeset.inserted.push((key.0.into(), val.to_vec()));
changeset.inserted.push((key, val.to_vec()));
} else if rc < 0 {
changeset.deleted.push(key.0.into());
changeset.deleted.push(key);
}
}
let number_u64 = number.as_();
@@ -1138,7 +1169,7 @@ mod tests {
backend.commit_operation(op).unwrap();
assert_eq!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
assert_eq!(backend.storage.db.get(::columns::STATE, key.as_bytes()).unwrap().unwrap(), &b"hello"[..]);
hash
};
@@ -1172,7 +1203,7 @@ mod tests {
backend.commit_operation(op).unwrap();
assert_eq!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
assert_eq!(backend.storage.db.get(::columns::STATE, key.as_bytes()).unwrap().unwrap(), &b"hello"[..]);
hash
};
@@ -1204,21 +1235,24 @@ mod tests {
backend.commit_operation(op).unwrap();
assert!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().is_none());
assert!(backend.storage.db.get(::columns::STATE, key.as_bytes()).unwrap().is_none());
}
backend.finalize_block(BlockId::Number(1)).unwrap();
backend.finalize_block(BlockId::Number(2)).unwrap();
assert!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().is_none());
assert!(backend.storage.db.get(::columns::STATE, key.as_bytes()).unwrap().is_none());
}
#[test]
fn changes_trie_storage_works() {
let backend = Backend::<Block>::new_test(1000, 100);
backend.changes_tries_storage.meta.write().finalized_number = 1000;
let check_changes = |backend: &Backend<Block>, block: u64, changes: Vec<(Vec<u8>, Vec<u8>)>| {
let (changes_root, mut changes_trie_update) = prepare_changes(changes);
assert_eq!(backend.changes_tries_storage.root(block), Ok(Some(changes_root)));
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: Default::default(), number: block };
assert_eq!(backend.changes_tries_storage.root(&anchor, block), Ok(Some(changes_root)));
for (key, (val, _)) in changes_trie_update.drain() {
assert_eq!(backend.changes_trie_storage().unwrap().get(&key), Ok(Some(val)));
@@ -1242,9 +1276,66 @@ mod tests {
check_changes(&backend, 2, changes2);
}
#[test]
fn changes_trie_storage_works_with_forks() {
let backend = Backend::<Block>::new_test(1000, 100);
let changes0 = vec![(b"k0".to_vec(), b"v0".to_vec())];
let changes1 = vec![(b"k1".to_vec(), b"v1".to_vec())];
let changes2 = vec![(b"k2".to_vec(), b"v2".to_vec())];
let block0 = insert_header(&backend, 0, Default::default(), changes0.clone(), Default::default());
let block1 = insert_header(&backend, 1, block0, changes1.clone(), Default::default());
let block2 = insert_header(&backend, 2, block1, changes2.clone(), Default::default());
let changes2_1_0 = vec![(b"k3".to_vec(), b"v3".to_vec())];
let changes2_1_1 = vec![(b"k4".to_vec(), b"v4".to_vec())];
let block2_1_0 = insert_header(&backend, 3, block2, changes2_1_0.clone(), Default::default());
let block2_1_1 = insert_header(&backend, 4, block2_1_0, changes2_1_1.clone(), Default::default());
let changes2_2_0 = vec![(b"k5".to_vec(), b"v5".to_vec())];
let changes2_2_1 = vec![(b"k6".to_vec(), b"v6".to_vec())];
let block2_2_0 = insert_header(&backend, 3, block2, changes2_2_0.clone(), Default::default());
let block2_2_1 = insert_header(&backend, 4, block2_2_0, changes2_2_1.clone(), Default::default());
// finalize block1
backend.changes_tries_storage.meta.write().finalized_number = 1;
// branch1: when asking for finalized block hash
let (changes1_root, _) = prepare_changes(changes1);
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_1_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 1), Ok(Some(changes1_root)));
// branch2: when asking for finalized block hash
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_2_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 1), Ok(Some(changes1_root)));
// branch1: when asking for non-finalized block hash (search by traversal)
let (changes2_1_0_root, _) = prepare_changes(changes2_1_0);
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_1_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_1_0_root)));
// branch2: when asking for non-finalized block hash (search using canonicalized hint)
let (changes2_2_0_root, _) = prepare_changes(changes2_2_0);
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_2_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_2_0_root)));
// finalize first block of branch2 (block2_2_0)
backend.changes_tries_storage.meta.write().finalized_number = 3;
// branch2: when asking for finalized block of this branch
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_2_0_root)));
// branch1: when asking for finalized block of other branch
// => result is incorrect (returned for the block of branch1), but this is expected,
// because the other fork is abandoned (forked before finalized header)
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_1_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_2_0_root)));
}
#[test]
fn changes_tries_are_pruned_on_finalization() {
let mut backend = Backend::<Block>::new_test(1000, 100);
backend.changes_tries_storage.meta.write().finalized_number = 1000;
backend.changes_tries_storage.min_blocks_to_keep = Some(8);
let config = ChangesTrieConfiguration {
digest_interval: 2,
@@ -1267,26 +1358,27 @@ mod tests {
let _ = insert_header(&backend, 12, block11, vec![(b"key_at_12".to_vec(), b"val_at_12".to_vec())], Default::default());
// check that roots of all tries are in the columns::CHANGES_TRIE
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: Default::default(), number: 100 };
fn read_changes_trie_root(backend: &Backend<Block>, num: u64) -> H256 {
backend.blockchain().header(BlockId::Number(num)).unwrap().unwrap().digest().logs().iter()
.find(|i| i.as_changes_trie_root().is_some()).unwrap().as_changes_trie_root().unwrap().clone()
}
let root1 = read_changes_trie_root(&backend, 1); assert_eq!(backend.changes_tries_storage.root(1).unwrap(), Some(root1));
let root2 = read_changes_trie_root(&backend, 2); assert_eq!(backend.changes_tries_storage.root(2).unwrap(), Some(root2));
let root3 = read_changes_trie_root(&backend, 3); assert_eq!(backend.changes_tries_storage.root(3).unwrap(), Some(root3));
let root4 = read_changes_trie_root(&backend, 4); assert_eq!(backend.changes_tries_storage.root(4).unwrap(), Some(root4));
let root5 = read_changes_trie_root(&backend, 5); assert_eq!(backend.changes_tries_storage.root(5).unwrap(), Some(root5));
let root6 = read_changes_trie_root(&backend, 6); assert_eq!(backend.changes_tries_storage.root(6).unwrap(), Some(root6));
let root7 = read_changes_trie_root(&backend, 7); assert_eq!(backend.changes_tries_storage.root(7).unwrap(), Some(root7));
let root8 = read_changes_trie_root(&backend, 8); assert_eq!(backend.changes_tries_storage.root(8).unwrap(), Some(root8));
let root9 = read_changes_trie_root(&backend, 9); assert_eq!(backend.changes_tries_storage.root(9).unwrap(), Some(root9));
let root10 = read_changes_trie_root(&backend, 10); assert_eq!(backend.changes_tries_storage.root(10).unwrap(), Some(root10));
let root11 = read_changes_trie_root(&backend, 11); assert_eq!(backend.changes_tries_storage.root(11).unwrap(), Some(root11));
let root12 = read_changes_trie_root(&backend, 12); assert_eq!(backend.changes_tries_storage.root(12).unwrap(), Some(root12));
let root1 = read_changes_trie_root(&backend, 1); assert_eq!(backend.changes_tries_storage.root(&anchor, 1).unwrap(), Some(root1));
let root2 = read_changes_trie_root(&backend, 2); assert_eq!(backend.changes_tries_storage.root(&anchor, 2).unwrap(), Some(root2));
let root3 = read_changes_trie_root(&backend, 3); assert_eq!(backend.changes_tries_storage.root(&anchor, 3).unwrap(), Some(root3));
let root4 = read_changes_trie_root(&backend, 4); assert_eq!(backend.changes_tries_storage.root(&anchor, 4).unwrap(), Some(root4));
let root5 = read_changes_trie_root(&backend, 5); assert_eq!(backend.changes_tries_storage.root(&anchor, 5).unwrap(), Some(root5));
let root6 = read_changes_trie_root(&backend, 6); assert_eq!(backend.changes_tries_storage.root(&anchor, 6).unwrap(), Some(root6));
let root7 = read_changes_trie_root(&backend, 7); assert_eq!(backend.changes_tries_storage.root(&anchor, 7).unwrap(), Some(root7));
let root8 = read_changes_trie_root(&backend, 8); assert_eq!(backend.changes_tries_storage.root(&anchor, 8).unwrap(), Some(root8));
let root9 = read_changes_trie_root(&backend, 9); assert_eq!(backend.changes_tries_storage.root(&anchor, 9).unwrap(), Some(root9));
let root10 = read_changes_trie_root(&backend, 10); assert_eq!(backend.changes_tries_storage.root(&anchor, 10).unwrap(), Some(root10));
let root11 = read_changes_trie_root(&backend, 11); assert_eq!(backend.changes_tries_storage.root(&anchor, 11).unwrap(), Some(root11));
let root12 = read_changes_trie_root(&backend, 12); assert_eq!(backend.changes_tries_storage.root(&anchor, 12).unwrap(), Some(root12));
// now simulate finalization of block#12, causing prune of tries at #1..#4
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 12);
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, Default::default(), 12);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root1).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root2).unwrap().is_none());
@@ -1299,7 +1391,7 @@ mod tests {
// now simulate finalization of block#16, causing prune of tries at #5..#8
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 16);
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, Default::default(), 16);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root5).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root6).unwrap().is_none());
@@ -1310,7 +1402,7 @@ mod tests {
// => no changes tries are pruned, because we never prune in archive mode
backend.changes_tries_storage.min_blocks_to_keep = None;
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config), &mut tx, 20);
backend.changes_tries_storage.prune(Some(config), &mut tx, Default::default(), 20);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root9).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root10).unwrap().is_some());
+11
View File
@@ -187,6 +187,17 @@ pub fn read_header<Block: BlockT>(
}
}
/// Required header from the database.
pub fn require_header<Block: BlockT>(
db: &KeyValueDB,
col_index: Option<u32>,
col: Option<u32>,
id: BlockId<Block>,
) -> client::error::Result<Block::Header> {
read_header(db, col_index, col, id)
.and_then(|header| header.ok_or_else(|| client::error::ErrorKind::UnknownBlock(format!("{}", id)).into()))
}
/// Read meta from the database.
pub fn read_meta<Block>(db: &KeyValueDB, col_meta: Option<u32>, col_header: Option<u32>) -> Result<
Meta<<<Block as BlockT>::Header as HeaderT>::Number, Block::Hash>,