Introduce notion of finality to substrate (#760)

* finalization for in_mem

* fetch last finalized block

* pruning: use canonical term instead of final

* finalize blocks in full node

* begin to port light client DB

* add tree-route

* keep number index consistent in full nodes

* fix tests

* disable cache and finish porting light client

* add AsMut to system module

* final leaf is always best

* fix all tests

* Fix comment and trace

* removed unused Into call

* add comment on behavior of `finalize_block`
This commit is contained in:
Robert Habermeier
2018-09-21 15:56:21 +02:00
committed by Gav Wood
parent 28cc4d0fd6
commit b7d095a2e0
19 changed files with 976 additions and 370 deletions
+205 -74
View File
@@ -21,6 +21,7 @@ use parking_lot::RwLock;
use kvdb::{KeyValueDB, DBTransaction};
use client::backend::NewBlockState;
use client::blockchain::{BlockStatus, Cache as BlockchainCache,
HeaderBackend as BlockchainHeaderBackend, Info as BlockchainInfo};
use client::cht;
@@ -29,29 +30,30 @@ use client::light::blockchain::Storage as LightBlockchainStorage;
use codec::{Decode, Encode};
use primitives::{AuthorityId, H256, Blake2Hasher};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor,
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT,
Zero, One, As, NumberFor};
use cache::DbCache;
use utils::{meta_keys, Meta, db_err, number_to_db_key, db_key_to_number, open_database,
use utils::{meta_keys, Meta, db_err, number_to_lookup_key, open_database,
read_db, read_id, read_meta};
use DatabaseSettings;
pub(crate) mod columns {
pub const META: Option<u32> = ::utils::COLUMN_META;
pub const BLOCK_INDEX: Option<u32> = Some(1);
pub const HASH_LOOKUP: Option<u32> = Some(1);
pub const HEADER: Option<u32> = Some(2);
pub const AUTHORITIES: Option<u32> = Some(3);
pub const CHT: Option<u32> = Some(4);
}
/// Keep authorities for last 'AUTHORITIES_ENTRIES_TO_KEEP' blocks.
#[allow(unused)]
pub(crate) const AUTHORITIES_ENTRIES_TO_KEEP: u64 = cht::SIZE;
/// Light blockchain storage. Stores most recent headers + CHTs for older headers.
pub struct LightStorage<Block: BlockT> {
db: Arc<KeyValueDB>,
meta: RwLock<Meta<<<Block as BlockT>::Header as HeaderT>::Number, Block::Hash>>,
cache: DbCache<Block>,
_cache: DbCache<Block>,
}
#[derive(Clone, PartialEq, Debug)]
@@ -83,13 +85,18 @@ impl<Block> LightStorage<Block>
}
fn from_kvdb(db: Arc<KeyValueDB>) -> ClientResult<Self> {
let cache = DbCache::new(db.clone(), columns::BLOCK_INDEX, columns::AUTHORITIES)?;
let cache = DbCache::new(
db.clone(),
columns::HASH_LOOKUP,
columns::HEADER,
columns::AUTHORITIES
)?;
let meta = RwLock::new(read_meta::<Block>(&*db, columns::HEADER)?);
Ok(LightStorage {
db,
meta,
cache,
_cache: cache,
})
}
@@ -100,12 +107,18 @@ impl<Block> LightStorage<Block>
#[cfg(test)]
pub(crate) fn cache(&self) -> &DbCache<Block> {
&self.cache
&self._cache
}
fn update_meta(&self, hash: Block::Hash, number: <<Block as BlockT>::Header as HeaderT>::Number, is_best: bool) {
fn update_meta(
&self,
hash: Block::Hash,
number: <<Block as BlockT>::Header as HeaderT>::Number,
is_best: bool,
is_finalized: bool,
) {
let mut meta = self.meta.write();
if is_best {
let mut meta = self.meta.write();
if number == <<Block as BlockT>::Header as HeaderT>::Number::zero() {
meta.genesis_hash = hash;
}
@@ -113,6 +126,15 @@ impl<Block> LightStorage<Block>
meta.best_number = number;
meta.best_hash = hash;
}
if is_finalized {
if number == <<Block as BlockT>::Header as HeaderT>::Number::zero() {
meta.genesis_hash = hash;
}
meta.finalized_number = number;
meta.finalized_hash = hash;
}
}
}
@@ -121,13 +143,7 @@ impl<Block> BlockchainHeaderBackend<Block> for LightStorage<Block>
Block: BlockT,
{
fn header(&self, id: BlockId<Block>) -> ClientResult<Option<Block::Header>> {
match read_db(&*self.db, columns::BLOCK_INDEX, columns::HEADER, id)? {
Some(header) => match Block::Header::decode(&mut &header[..]) {
Some(header) => Ok(Some(header)),
None => return Err(ClientErrorKind::Backend("Error decoding header".into()).into()),
}
None => Ok(None),
}
::utils::read_header(&*self.db, columns::HASH_LOOKUP, columns::HEADER, id)
}
fn info(&self) -> ClientResult<BlockchainInfo<Block>> {
@@ -141,7 +157,12 @@ impl<Block> BlockchainHeaderBackend<Block> for LightStorage<Block>
fn status(&self, id: BlockId<Block>) -> ClientResult<BlockStatus> {
let exists = match id {
BlockId::Hash(_) => read_id(&*self.db, columns::BLOCK_INDEX, id)?.is_some(),
BlockId::Hash(_) => read_db(
&*self.db,
columns::HASH_LOOKUP,
columns::HEADER,
id
)?.is_some(),
BlockId::Number(n) => n <= self.meta.read().best_number,
};
match exists {
@@ -151,17 +172,90 @@ impl<Block> BlockchainHeaderBackend<Block> for LightStorage<Block>
}
fn number(&self, hash: Block::Hash) -> ClientResult<Option<<<Block as BlockT>::Header as HeaderT>::Number>> {
read_id::<Block>(&*self.db, columns::BLOCK_INDEX, BlockId::Hash(hash))
.and_then(|key| match key {
Some(key) => Ok(Some(db_key_to_number(&key)?)),
None => Ok(None),
})
self.header(BlockId::Hash(hash)).and_then(|key| match key {
Some(hdr) => Ok(Some(hdr.number().clone())),
None => Ok(None),
})
}
fn hash(&self, number: <<Block as BlockT>::Header as HeaderT>::Number) -> ClientResult<Option<Block::Hash>> {
read_db::<Block>(&*self.db, columns::BLOCK_INDEX, columns::HEADER, BlockId::Number(number)).map(|x|
x.map(|raw| HashFor::<Block>::hash(&raw[..])).map(Into::into)
)
read_id::<Block>(&*self.db, columns::HASH_LOOKUP, BlockId::Number(number))
}
}
impl<Block: BlockT> LightStorage<Block> where Block::Hash: From<H256> {
// note that a block is finalized. ensure that best chain contains the finalized
// block number first.
fn note_finalized(&self, transaction: &mut DBTransaction, header: &Block::Header, hash: Block::Hash) -> ClientResult<()> {
const NOTEWORTHY_FINALIZATION_GAP: u64 = 32;
// TODO: ensure this doesn't conflict with old finalized block.
let meta = self.meta.read();
let f_num = header.number().clone();
let number_u64: u64 = f_num.as_();
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, hash.as_ref());
let (last_finalized_hash, last_finalized_number)
= (meta.finalized_hash.clone(), meta.finalized_number);
let finalized_gap = f_num - last_finalized_number;
if finalized_gap.as_() >= NOTEWORTHY_FINALIZATION_GAP {
info!(target: "db", "Finalizing large run of blocks from {:?} to {:?}",
(&last_finalized_hash, last_finalized_number), (&hash, f_num));
} else {
debug!(target: "db", "Finalizing blocks from {:?} to {:?}",
(&last_finalized_hash, last_finalized_number), (&hash, f_num));
}
// build new CHT if required
let mut build_cht = |header: &Block::Header| -> ClientResult<()> {
if let Some(new_cht_number) = cht::is_build_required(cht::SIZE, *header.number()) {
let new_cht_start: NumberFor<Block> = cht::start_number(cht::SIZE, new_cht_number);
let new_cht_root: Option<Block::Hash> = cht::compute_root::<Block::Header, Blake2Hasher, _>(
cht::SIZE, new_cht_number, (new_cht_start.as_()..)
.map(|num| self.hash(As::sa(num)).unwrap_or_default())
);
if let Some(new_cht_root) = new_cht_root {
transaction.put(columns::CHT, &number_to_lookup_key(new_cht_start), new_cht_root.as_ref());
let mut prune_block = new_cht_start;
let new_cht_end = cht::end_number(cht::SIZE, new_cht_number);
trace!(target: "db", "Replacing blocks [{}..{}] with CHT#{}", new_cht_start, new_cht_end, new_cht_number);
while prune_block <= new_cht_end {
let id = read_id::<Block>(&*self.db, columns::HASH_LOOKUP, BlockId::Number(prune_block))?;
if let Some(hash) = id {
let lookup_key = number_to_lookup_key(prune_block);
transaction.delete(columns::HASH_LOOKUP, &lookup_key);
transaction.delete(columns::HEADER, hash.as_ref());
}
prune_block += <<Block as BlockT>::Header as HeaderT>::Number::one();
}
}
}
Ok(())
};
// attempt to build CHT for all newly finalized blocks.
let last_finalized_u64 = last_finalized_number.as_();
for num in (last_finalized_u64..number_u64).map(|x| x + 1) {
let num = As::sa(num);
if num == f_num {
build_cht(header)?;
} else {
let old_header = match self.header(BlockId::Number(num))? {
Some(x) => x,
None => panic!("finalizing block {} implies existence of block {}; qed", f_num, num),
};
build_cht(&old_header)?;
}
}
Ok(())
}
}
@@ -170,63 +264,75 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
Block: BlockT,
Block::Hash: From<H256>,
{
fn import_header(&self, is_new_best: bool, header: Block::Header, authorities: Option<Vec<AuthorityId>>) -> ClientResult<()> {
fn import_header(
&self,
header: Block::Header,
_authorities: Option<Vec<AuthorityId>>,
leaf_state: NewBlockState,
) -> ClientResult<()> {
let mut transaction = DBTransaction::new();
let hash = header.hash();
let number = *header.number();
let key = number_to_db_key(number);
transaction.put(columns::HEADER, &key, &header.encode());
transaction.put(columns::BLOCK_INDEX, hash.as_ref(), &key);
transaction.put(columns::HEADER, hash.as_ref(), &header.encode());
transaction.put(columns::HASH_LOOKUP, &number_to_lookup_key(number), hash.as_ref());
let best_authorities = if is_new_best {
transaction.put(columns::META, meta_keys::BEST_BLOCK, &key);
if leaf_state.is_best() {
transaction.put(columns::META, meta_keys::BEST_BLOCK, hash.as_ref());
// cache authorities for previous block
let number: u64 = number.as_();
let previous_number = number.checked_sub(1);
let best_authorities = previous_number
.and_then(|previous_number| self.cache.authorities_at_cache()
.commit_best_entry(&mut transaction, As::sa(previous_number), authorities));
// handle reorg.
{
let meta = self.meta.read();
if meta.best_hash != Default::default() {
let parent_hash = *header.parent_hash();
let tree_route = ::utils::tree_route::<Block>(
&*self.db,
columns::HEADER,
meta.best_hash,
parent_hash,
)?;
// prune authorities from 'ancient' blocks
if let Some(ancient_number) = number.checked_sub(AUTHORITIES_ENTRIES_TO_KEEP) {
self.cache.authorities_at_cache().prune_entries(&mut transaction, As::sa(ancient_number))?;
}
// update block number to hash lookup entries.
for retracted in tree_route.retracted() {
if retracted.hash == meta.finalized_hash {
// TODO: can we recover here?
warn!("Safety failure: reverting finalized block {:?}",
(&retracted.number, &retracted.hash));
}
best_authorities
} else {
None
};
transaction.delete(
columns::HASH_LOOKUP,
&::utils::number_to_lookup_key(retracted.number)
);
}
// build new CHT if required
if let Some(new_cht_number) = cht::is_build_required(cht::SIZE, *header.number()) {
let new_cht_start: NumberFor<Block> = cht::start_number(cht::SIZE, new_cht_number);
let new_cht_root: Option<Block::Hash> = cht::compute_root::<Block::Header, Blake2Hasher, _>(
cht::SIZE, new_cht_number, (new_cht_start.as_()..)
.map(|num| self.hash(As::sa(num)).unwrap_or_default()));
if let Some(new_cht_root) = new_cht_root {
transaction.put(columns::CHT, &number_to_db_key(new_cht_start), new_cht_root.as_ref());
let mut prune_block = new_cht_start;
let new_cht_end = cht::end_number(cht::SIZE, new_cht_number);
trace!(target: "db", "Replacing blocks [{}..{}] with CHT#{}", new_cht_start, new_cht_end, new_cht_number);
while prune_block <= new_cht_end {
transaction.delete(columns::HEADER, &number_to_db_key(prune_block));
prune_block += <<Block as BlockT>::Header as HeaderT>::Number::one();
for enacted in tree_route.enacted() {
let hash: &Block::Hash = &enacted.hash;
transaction.put(
columns::HASH_LOOKUP,
&::utils::number_to_lookup_key(enacted.number),
hash.as_ref(),
)
}
}
}
// TODO: cache authorities for previous block, accounting for reorgs.
}
let finalized = match leaf_state {
NewBlockState::Final => true,
_ => false,
};
if finalized {
self.note_finalized(&mut transaction, &header, hash)?;
}
debug!("Light DB Commit {:?} ({})", hash, number);
self.db.write(transaction).map_err(db_err)?;
self.update_meta(hash, number, is_new_best);
if let Some(best_authorities) = best_authorities {
self.cache.authorities_at_cache().update_best_entry(Some(best_authorities));
}
self.update_meta(hash, number, leaf_state.is_best(), finalized);
Ok(())
}
@@ -236,13 +342,31 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
let cht_number = cht::block_to_cht_number(cht_size, block).ok_or_else(no_cht_for_block)?;
let cht_start = cht::start_number(cht_size, cht_number);
self.db.get(columns::CHT, &number_to_db_key(cht_start)).map_err(db_err)?
self.db.get(columns::CHT, &number_to_lookup_key(cht_start)).map_err(db_err)?
.ok_or_else(no_cht_for_block)
.and_then(|hash| Block::Hash::decode(&mut &*hash).ok_or_else(no_cht_for_block))
}
fn finalize_header(&self, id: BlockId<Block>) -> ClientResult<()> {
if let Some(header) = self.header(id)? {
let mut transaction = DBTransaction::new();
// TODO: ensure best chain contains this block.
let hash = header.hash();
self.note_finalized(&mut transaction, &header, hash.clone())?;
self.db.write(transaction).map_err(db_err)?;
self.update_meta(hash, header.number().clone(), false, true);
Ok(())
} else {
Err(ClientErrorKind::UnknownBlock(format!("Cannot finalize block {:?}", id)).into())
}
}
fn last_finalized(&self) -> ClientResult<Block::Hash> {
Ok(self.meta.read().finalized_hash.clone())
}
fn cache(&self) -> Option<&BlockchainCache<Block>> {
Some(&self.cache)
None
}
}
@@ -269,7 +393,7 @@ pub(crate) mod tests {
};
let hash = header.hash();
db.import_header(true, header, authorities).unwrap();
db.import_header(header, authorities, NewBlockState::Best).unwrap();
hash
}
@@ -328,15 +452,15 @@ pub(crate) mod tests {
let genesis_hash = insert_block(&db, &Default::default(), 0, None);
assert_eq!(db.db.iter(columns::HEADER).count(), 1);
assert_eq!(db.db.iter(columns::BLOCK_INDEX).count(), 1);
assert_eq!(db.db.iter(columns::HASH_LOOKUP).count(), 1);
let _ = insert_block(&db, &genesis_hash, 1, None);
assert_eq!(db.db.iter(columns::HEADER).count(), 2);
assert_eq!(db.db.iter(columns::BLOCK_INDEX).count(), 2);
assert_eq!(db.db.iter(columns::HASH_LOOKUP).count(), 2);
}
#[test]
fn ancient_headers_are_replaced_with_cht() {
fn finalized_ancient_headers_are_replaced_with_cht() {
let db = LightStorage::new_test();
// insert genesis block header (never pruned)
@@ -357,10 +481,16 @@ pub(crate) mod tests {
assert_eq!(db.db.iter(columns::CHT).count(), 0);
// insert block #{2 * cht::SIZE + 1} && check that new CHT is created + headers of this CHT are pruned
insert_block(&db, &prev_hash, 1 + cht::SIZE + cht::SIZE, None);
// nothing is yet finalized, so nothing is pruned.
prev_hash = insert_block(&db, &prev_hash, 1 + cht::SIZE + cht::SIZE, None);
assert_eq!(db.db.iter(columns::HEADER).count(), (2 + cht::SIZE + cht::SIZE) as usize);
assert_eq!(db.db.iter(columns::CHT).count(), 0);
// now finalize the block.
db.finalize_header(BlockId::Hash(prev_hash)).unwrap();
assert_eq!(db.db.iter(columns::HEADER).count(), (1 + cht::SIZE + 1) as usize);
assert_eq!(db.db.iter(columns::CHT).count(), 1);
assert!((0..cht::SIZE).all(|i| db.db.get(columns::HEADER, &number_to_db_key(1 + i)).unwrap().is_none()));
assert!((0..cht::SIZE).all(|i| db.db.get(columns::HEADER, &number_to_lookup_key(1 + i)).unwrap().is_none()));
}
#[test]
@@ -383,6 +513,7 @@ pub(crate) mod tests {
prev_hash = insert_block(&db, &prev_hash, i as u64, None);
}
db.finalize_header(BlockId::Hash(prev_hash)).unwrap();
let cht_root_1 = db.cht_root(cht::SIZE, cht::start_number(cht::SIZE, 0)).unwrap();
let cht_root_2 = db.cht_root(cht::SIZE, (cht::start_number(cht::SIZE, 0) + cht::SIZE / 2) as u64).unwrap();
let cht_root_3 = db.cht_root(cht::SIZE, cht::end_number(cht::SIZE, 0)).unwrap();