Finality notification streams (#791)

* finalization for in_mem

* fetch last finalized block

* pruning: use canonical term instead of final

* finalize blocks in full node

* begin to port light client DB

* add tree-route

* keep number index consistent in full nodes

* fix tests

* disable cache and finish porting light client

* add AsMut to system module

* final leaf is always best

* fix all tests

* Fix comment and trace

* removed unused Into call

* add comment on behavior of `finalize_block`

* move `tree_route` to client common

* tree_route tests

* return slices in TreeRoute

* apply finality up to parent

* add `finalize_block` call

* adjust formatting

* finality notifications and add last finalized block to chain info

* exhaustive match and comments

* fix sync tests by using non-instant finality
This commit is contained in:
Robert Habermeier
2018-09-24 17:45:37 +01:00
committed by Arkadiy Paronyan
parent ef97973178
commit b02c274374
14 changed files with 665 additions and 225 deletions
+2 -5
View File
@@ -108,11 +108,8 @@ where
fn begin_operation(&self, block: BlockId<Block>) -> error::Result<Self::BlockImportOperation>;
/// Commit block insertion.
fn commit_operation(&self, transaction: Self::BlockImportOperation) -> error::Result<()>;
/// Finalize block with given Id. This should also implicitly finalize all ancestors.
///
/// If the finalized block is not an ancestor of the current "best block", then
/// the chain will be implicitly reorganized to the best chain containing the newly
/// finalized block.
/// Finalize block with given Id. This should only be called if the parent of the given
/// block has been finalized.
fn finalize_block(&self, block: BlockId<Block>) -> error::Result<()>;
/// Returns reference to blockchain backend.
fn blockchain(&self) -> &Self::Blockchain;
+130 -2
View File
@@ -77,11 +77,13 @@ pub enum ImportResult<E> {
#[derive(Debug)]
pub struct Info<Block: BlockT> {
/// Best block hash.
pub best_hash: <<Block as BlockT>::Header as HeaderT>::Hash,
pub best_hash: Block::Hash,
/// Best block number.
pub best_number: <<Block as BlockT>::Header as HeaderT>::Number,
/// Genesis block hash.
pub genesis_hash: <<Block as BlockT>::Header as HeaderT>::Hash,
pub genesis_hash: Block::Hash,
/// The head of the finalized chain.
pub finalized_hash: Block::Hash,
}
/// Block status.
@@ -92,3 +94,129 @@ pub enum BlockStatus {
/// Not in the queue or the blockchain.
Unknown,
}
/// An entry in a tree route.
#[derive(Debug)]
pub struct RouteEntry<Block: BlockT> {
/// The number of the block.
pub number: <Block::Header as HeaderT>::Number,
/// The hash of the block.
pub hash: Block::Hash,
}
/// A tree-route from one block to another in the chain.
///
/// All blocks prior to the pivot in the deque is the reverse-order unique ancestry
/// of the first block, the block at the pivot index is the common ancestor,
/// and all blocks after the pivot is the ancestry of the second block, in
/// order.
///
/// The ancestry sets will include the given blocks, and thus the tree-route is
/// never empty.
///
/// ```ignore
/// Tree route from R1 to E2. Retracted is [R1, R2, R3], Common is C, enacted [E1, E2]
/// <- R3 <- R2 <- R1
/// /
/// C
/// \-> E1 -> E2
/// ```
///
/// ```ignore
/// Tree route from C to E2. Retracted empty. Common is C, enacted [E1, E2]
/// C -> E1 -> E2
/// ```
#[derive(Debug)]
pub struct TreeRoute<Block: BlockT> {
route: Vec<RouteEntry<Block>>,
pivot: usize,
}
impl<Block: BlockT> TreeRoute<Block> {
/// Get a slice of all retracted blocks in reverse order (towards common ancestor)
pub fn retracted(&self) -> &[RouteEntry<Block>] {
&self.route[..self.pivot]
}
/// Get the common ancestor block. This might be one of the two blocks of the
/// route.
pub fn common_block(&self) -> &RouteEntry<Block> {
self.route.get(self.pivot).expect("tree-routes are computed between blocks; \
which are included in the route; \
thus it is never empty; qed")
}
/// Get a slice of enacted blocks (descendents of the common ancestor)
pub fn enacted(&self) -> &[RouteEntry<Block>] {
&self.route[self.pivot + 1 ..]
}
}
/// Compute a tree-route between two blocks. See tree-route docs for more details.
pub fn tree_route<Block: BlockT, Backend: HeaderBackend<Block>>(
backend: &Backend,
from: BlockId<Block>,
to: BlockId<Block>,
) -> Result<TreeRoute<Block>> {
use runtime_primitives::traits::Header;
let load_header = |id: BlockId<Block>| {
match backend.header(id) {
Ok(Some(hdr)) => Ok(hdr),
Ok(None) => Err(ErrorKind::UnknownBlock(format!("Unknown block {:?}", id)).into()),
Err(e) => Err(e),
}
};
let mut from = load_header(from)?;
let mut to = load_header(to)?;
let mut from_branch = Vec::new();
let mut to_branch = Vec::new();
while to.number() > from.number() {
to_branch.push(RouteEntry {
number: to.number().clone(),
hash: to.hash(),
});
to = load_header(BlockId::Hash(*to.parent_hash()))?;
}
while from.number() > to.number() {
from_branch.push(RouteEntry {
number: from.number().clone(),
hash: from.hash(),
});
from = load_header(BlockId::Hash(*from.parent_hash()))?;
}
// numbers are equal now. walk backwards until the block is the same
while to != from {
to_branch.push(RouteEntry {
number: to.number().clone(),
hash: to.hash(),
});
to = load_header(BlockId::Hash(*to.parent_hash()))?;
from_branch.push(RouteEntry {
number: from.number().clone(),
hash: from.hash(),
});
from = load_header(BlockId::Hash(*from.parent_hash()))?;
}
// add the pivot block. and append the reversed to-branch (note that it's reverse order originalls)
let pivot = from_branch.len();
from_branch.push(RouteEntry {
number: to.number().clone(),
hash: to.hash(),
});
from_branch.extend(to_branch.into_iter().rev());
Ok(TreeRoute {
route: from_branch,
pivot,
})
}
+143 -12
View File
@@ -21,7 +21,7 @@ use futures::sync::mpsc;
use parking_lot::{Mutex, RwLock};
use primitives::AuthorityId;
use runtime_primitives::{bft::Justification, generic::{BlockId, SignedBlock, Block as RuntimeBlock}};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero, One, As, NumberFor, CurrentHeight, BlockNumberToHash};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero, As, NumberFor, CurrentHeight, BlockNumberToHash};
use runtime_primitives::BuildStorage;
use primitives::{Blake2Hasher, RlpCodec, H256};
use primitives::storage::{StorageKey, StorageData};
@@ -40,7 +40,10 @@ use notifications::{StorageNotifications, StorageEventStream};
use {cht, error, in_mem, block_builder, bft, genesis};
/// Type that implements `futures::Stream` of block import events.
pub type BlockchainEventStream<Block> = mpsc::UnboundedReceiver<BlockImportNotification<Block>>;
pub type ImportNotifications<Block> = mpsc::UnboundedReceiver<BlockImportNotification<Block>>;
/// A stream of block finality notifications.
pub type FinalityNotifications<Block> = mpsc::UnboundedReceiver<FinalityNotification<Block>>;
/// Substrate Client
pub struct Client<B, E, Block> where Block: BlockT {
@@ -48,6 +51,7 @@ pub struct Client<B, E, Block> where Block: BlockT {
executor: E,
storage_notifications: Mutex<StorageNotifications<Block>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification<Block>>>>,
finality_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<FinalityNotification<Block>>>>,
import_lock: Mutex<()>,
importing_block: RwLock<Option<Block::Hash>>, // holds the block hash currently being imported. TODO: replace this with block queue
execution_strategy: ExecutionStrategy,
@@ -55,8 +59,13 @@ pub struct Client<B, E, Block> where Block: BlockT {
/// A source of blockchain evenets.
pub trait BlockchainEvents<Block: BlockT> {
/// Get block import event stream.
fn import_notification_stream(&self) -> BlockchainEventStream<Block>;
/// Get block import event stream. Not guaranteed to be fired for every
/// imported block.
fn import_notification_stream(&self) -> ImportNotifications<Block>;
/// Get a stream of finality notifications. Not guaranteed to be fired for every
/// finalized block.
fn finality_notification_stream(&self) -> FinalityNotifications<Block>;
/// Get storage changes event stream.
///
@@ -146,6 +155,15 @@ pub struct BlockImportNotification<Block: BlockT> {
pub is_new_best: bool,
}
/// Summary of a finalized block.
#[derive(Clone, Debug)]
pub struct FinalityNotification<Block: BlockT> {
/// Imported block header hash.
pub hash: Block::Hash,
/// Imported block header.
pub header: Block::Header,
}
/// A header paired with a justification which has already been checked.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct JustifiedHeader<Block: BlockT> {
@@ -208,6 +226,7 @@ impl<B, E, Block> Client<B, E, Block> where
executor,
storage_notifications: Default::default(),
import_notification_sinks: Default::default(),
finality_notification_sinks: Default::default(),
import_lock: Default::default(),
importing_block: Default::default(),
execution_strategy,
@@ -436,6 +455,25 @@ impl<B, E, Block> Client<B, E, Block> where
blockchain::BlockStatus::Unknown => {},
}
let (last_best, last_best_number) = {
let info = self.backend.blockchain().info()?;
(info.best_hash, info.best_number)
};
// this is a fairly arbitrary choice of where to draw the line on making notifications,
// but the general goal is to only make notifications when we are already fully synced
// and get a new chain head.
let make_notifications = match origin {
BlockOrigin::NetworkBroadcast | BlockOrigin::Own | BlockOrigin::ConsensusBroadcast => true,
BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false,
};
// ensure parent block is finalized to maintain invariant that
// finality is called sequentially.
if finalized {
self.apply_finality(parent_hash, last_best, make_notifications)?;
}
let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?;
let (storage_update, changes_update, storage_changes) = match transaction.state()? {
Some(transaction_state) => {
@@ -470,7 +508,8 @@ impl<B, E, Block> Client<B, E, Block> where
None => (None, None, None)
};
let is_new_best = header.number() == &(self.backend.blockchain().info()?.best_number + One::one());
// TODO: non longest-chain rule.
let is_new_best = finalized || header.number() > &last_best_number;
let leaf_state = if finalized {
::backend::NewBlockState::Final
} else if is_new_best {
@@ -498,26 +537,112 @@ impl<B, E, Block> Client<B, E, Block> where
}
self.backend.commit_operation(transaction)?;
if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast {
if make_notifications {
if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(&hash, storage_changes);
}
if finalized {
let notification = FinalityNotification::<Block> {
hash,
header: header.clone(),
};
self.finality_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
}
let notification = BlockImportNotification::<Block> {
hash: hash,
origin: origin,
header: header,
is_new_best: is_new_best,
hash,
origin,
header,
is_new_best,
};
self.import_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
}
Ok(ImportResult::Queued)
}
/// Finalizes all blocks up to given.
fn apply_finality(&self, block: Block::Hash, best_block: Block::Hash, notify: bool) -> error::Result<()> {
// find tree route from last finalized to given block.
let last_finalized = self.backend.blockchain().last_finalized()?;
if block == last_finalized { return Ok(()) }
let route_from_finalized = ::blockchain::tree_route(
self.backend.blockchain(),
BlockId::Hash(last_finalized),
BlockId::Hash(block),
)?;
if let Some(retracted) = route_from_finalized.retracted().get(0) {
warn!("Safety violation: attempted to revert finalized block {:?} which is not in the \
same chain as last finalized {:?}", retracted, last_finalized);
bail!(error::ErrorKind::NotInFinalizedChain);
}
let route_from_best = ::blockchain::tree_route(
self.backend.blockchain(),
BlockId::Hash(best_block),
BlockId::Hash(block),
)?;
// if the block is not a direct ancestor of the current best chain,
// then some other block is the common ancestor.
if route_from_best.common_block().hash != block {
// TODO: reorganize best block to be the best chain containing
// `block`.
}
for finalize_new in route_from_finalized.enacted() {
self.backend.finalize_block(BlockId::Hash(finalize_new.hash))?;
}
if notify {
// sometimes when syncing, tons of blocks can be finalized at once.
// we'll send notifications spuriously in that case.
const MAX_TO_NOTIFY: usize = 256;
let enacted = route_from_finalized.enacted();
let start = enacted.len() - ::std::cmp::min(enacted.len(), MAX_TO_NOTIFY);
let mut sinks = self.finality_notification_sinks.lock();
for finalized in &enacted[start..] {
let header = self.header(&BlockId::Hash(finalized.hash))?
.expect("header already known to exist in DB because it is indicated in the tree route; qed");
let notification = FinalityNotification {
header,
hash: finalized.hash,
};
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
}
}
Ok(())
}
/// Finalize a block. This will implicitly finalize all blocks up to it and
/// fire finality notifications.
///
/// Pass a flag to indicate whether finality notifications should be propagated.
/// This is usually tied to some synchronization state, where we don't send notifications
/// while performing major synchronization work.
pub fn finalize_block(&self, id: BlockId<Block>, notify: bool) -> error::Result<()> {
let last_best = self.backend.blockchain().info()?.best_hash;
let to_finalize_hash = match id {
BlockId::Hash(h) => h,
BlockId::Number(n) => self.backend.blockchain().hash(n)?
.ok_or_else(|| error::ErrorKind::UnknownBlock(format!("No block with number {:?}", n)))?,
};
self.apply_finality(to_finalize_hash, last_best, notify)
}
/// Attempts to revert the chain by `n` blocks. Returns the number of blocks that were
/// successfully reverted.
pub fn revert(&self, n: NumberFor<Block>) -> error::Result<NumberFor<Block>> {
@@ -681,12 +806,18 @@ where
Block: BlockT,
{
/// Get block import event stream.
fn import_notification_stream(&self) -> BlockchainEventStream<Block> {
fn import_notification_stream(&self) -> ImportNotifications<Block> {
let (sink, stream) = mpsc::unbounded();
self.import_notification_sinks.lock().push(sink);
stream
}
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
let (sink, stream) = mpsc::unbounded();
self.finality_notification_sinks.lock().push(sink);
stream
}
/// Get storage changes event stream.
fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result<StorageEventStream<Block::Hash>> {
Ok(self.storage_notifications.lock().listen(filter_keys))
+12
View File
@@ -99,6 +99,18 @@ error_chain! {
description("Error decoding call result")
display("Error decoding call result of {}", method)
}
/// Last finalized block not parent of current.
NonSequentialFinalization(s: String) {
description("Did not finalize blocks in sequential order."),
display("Did not finalize blocks in sequential order."),
}
/// Safety violation: new best block not descendent of last finalized.
NotInFinalizedChain {
description("Potential long-range attack: block not in finalized chain."),
display("Potential long-range attack: block not in finalized chain."),
}
}
}
+1
View File
@@ -221,6 +221,7 @@ impl<Block: BlockT> HeaderBackend<Block> for Blockchain<Block> {
best_hash: storage.best_hash,
best_number: storage.best_number,
genesis_hash: storage.genesis_hash,
finalized_hash: storage.finalized_hash,
})
}
+1 -1
View File
@@ -61,7 +61,7 @@ pub use blockchain::Info as ChainInfo;
pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor};
pub use client::{
new_in_mem,
BlockBody, BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents,
BlockBody, BlockStatus, BlockOrigin, ImportNotifications, FinalityNotifications, BlockchainEvents,
Client, ClientInfo, ChainHead,
ImportResult, JustifiedHeader,
};