Atomic operation and locking when importing a block (#1489)

* Add ClientImportOperation and remove an unused enum

* set_aux to insert_aux so that it can be called multiple times

* [WIP] All basic designs of lock_import_and_run

* [WIP] `apply_block` and `apply_aux` implementation

* Update client db with the new interface

* Always make sure we reset importing_block back to None

* Address grumbles

`apply_block` should be pub

* Add comments on insert_aux

* Fix compile
This commit is contained in:
Wei Tang
2019-01-24 16:51:53 +01:00
committed by Gav Wood
parent 7f05bd959f
commit 997c8b4020
6 changed files with 380 additions and 190 deletions
+229 -114
View File
@@ -45,6 +45,7 @@ use state_machine::{
ChangesTrieRootsStorage, ChangesTrieStorage,
key_changes, key_changes_proof, OverlayedChanges,
};
use hash_db::Hasher;
use crate::backend::{self, BlockImportOperation, PrunableStateChangesTrieStorage};
use crate::blockchain::{
@@ -90,6 +91,13 @@ pub struct Client<B, E, Block, RA> where Block: BlockT {
_phantom: PhantomData<RA>,
}
/// Client import operation, a wrapper for the backend.
pub struct ClientImportOperation<Block: BlockT, H: Hasher<Out=Block::Hash>, B: backend::Backend<Block, H>> {
op: B::BlockImportOperation,
notify_imported: Option<(Block::Hash, BlockOrigin, Block::Header, bool, Option<Vec<(Vec<u8>, Option<Vec<u8>>)>>)>,
notify_finalized: Vec<Block::Hash>,
}
/// A source of blockchain events.
pub trait BlockchainEvents<Block: BlockT> {
/// Get block import event stream. Not guaranteed to be fired for every
@@ -248,7 +256,8 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
) -> error::Result<Self> {
if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() {
let (genesis_storage, children_genesis_storage) = build_genesis_storage.build_storage()?;
let mut op = backend.begin_operation(BlockId::Hash(Default::default()))?;
let mut op = backend.begin_operation()?;
backend.begin_state_operation(&mut op, BlockId::Hash(Default::default()))?;
let state_root = op.reset_storage(genesis_storage, children_genesis_storage)?;
let genesis_block = genesis::construct_genesis_block::<Block>(state_root.into());
info!("Initialising Genesis block/state (state: {}, header-hash: {})", genesis_block.header().state_root(), genesis_block.header().hash());
@@ -586,8 +595,110 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
block_builder::BlockBuilder::at_block(parent, &self)
}
/// Lock the import lock, and run operations inside.
pub fn lock_import_and_run<R, F: FnOnce(&mut ClientImportOperation<Block, Blake2Hasher, B>) -> error::Result<R>>(
&self, f: F
) -> error::Result<R> {
let inner = || {
let _import_lock = self.import_lock.lock();
let mut op = ClientImportOperation {
op: self.backend.begin_operation()?,
notify_imported: None,
notify_finalized: Vec::new(),
};
let r = f(&mut op)?;
let ClientImportOperation { op, notify_imported, notify_finalized } = op;
self.backend.commit_operation(op)?;
self.notify_finalized(notify_finalized)?;
if let Some(notify_imported) = notify_imported {
self.notify_imported(notify_imported)?;
}
Ok(r)
};
let result = inner();
*self.importing_block.write() = None;
result
}
/// Apply a checked and validated block to an operation. If a justification is provided
/// then `finalized` *must* be true.
pub fn apply_block(
&self,
operation: &mut ClientImportOperation<Block, Blake2Hasher, B>,
import_block: ImportBlock<Block>,
new_authorities: Option<Vec<AuthorityIdFor<Block>>>,
) -> error::Result<ImportResult> where
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone,
{
use runtime_primitives::traits::Digest;
let ImportBlock {
origin,
header,
justification,
post_digests,
body,
finalized,
auxiliary,
fork_choice,
} = import_block;
assert!(justification.is_some() && finalized || justification.is_none());
let parent_hash = header.parent_hash().clone();
match self.backend.blockchain().status(BlockId::Hash(parent_hash))? {
blockchain::BlockStatus::InChain => {},
blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
}
let import_headers = if post_digests.is_empty() {
PrePostHeader::Same(header)
} else {
let mut post_header = header.clone();
for item in post_digests {
post_header.digest_mut().push(item);
}
PrePostHeader::Different(header, post_header)
};
let hash = import_headers.post().hash();
let height: u64 = import_headers.post().number().as_();
*self.importing_block.write() = Some(hash);
let result = self.execute_and_import_block(
operation,
origin,
hash,
import_headers,
justification,
body,
new_authorities,
finalized,
auxiliary,
fork_choice,
);
telemetry!("block.import";
"height" => height,
"best" => ?hash,
"origin" => ?origin
);
result
}
fn execute_and_import_block(
&self,
operation: &mut ClientImportOperation<Block, Blake2Hasher, B>,
origin: BlockOrigin,
hash: Block::Hash,
import_headers: PrePostHeader<Block::Header>,
@@ -619,17 +730,17 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false,
};
self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?;
// ensure parent block is finalized to maintain invariant that
// finality is called sequentially.
if finalized {
self.apply_finality(parent_hash, None, last_best, make_notifications)?;
self.apply_finality_with_block_hash(operation, parent_hash, None, last_best, make_notifications)?;
}
let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?;
// TODO: correct path logic for when to execute this function
// https://github.com/paritytech/substrate/issues/1232
let (storage_update,changes_update,storage_changes) = self.block_execution(&import_headers, origin, hash, body.clone(), &transaction)?;
let (storage_update,changes_update,storage_changes) = self.block_execution(&operation.op, &import_headers, origin, hash, body.clone())?;
// TODO: non longest-chain rule.
let is_new_best = finalized || match fork_choice {
@@ -646,7 +757,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
trace!("Imported {}, (#{}), best={}, origin={:?}", hash, import_headers.post().number(), is_new_best, origin);
transaction.set_block_data(
operation.op.set_block_data(
import_headers.post().clone(),
body,
justification,
@@ -654,47 +765,26 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
)?;
if let Some(authorities) = authorities {
transaction.update_authorities(authorities);
operation.op.update_authorities(authorities);
}
if let Some(storage_update) = storage_update {
transaction.update_db_storage(storage_update)?;
operation.op.update_db_storage(storage_update)?;
}
if let Some(storage_changes) = storage_changes.clone() {
transaction.update_storage(storage_changes)?;
operation.op.update_storage(storage_changes)?;
}
if let Some(Some(changes_update)) = changes_update {
transaction.update_changes_trie(changes_update)?;
operation.op.update_changes_trie(changes_update)?;
}
transaction.set_aux(aux)?;
self.backend.commit_operation(transaction)?;
operation.op.insert_aux(aux)?;
if make_notifications {
if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(&hash, storage_changes.into_iter());
}
if finalized {
let notification = FinalityNotification::<Block> {
hash,
header: import_headers.post().clone(),
};
self.finality_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
operation.notify_finalized.push(hash);
}
let notification = BlockImportNotification::<Block> {
hash,
origin,
header: import_headers.into_post(),
is_new_best,
};
self.import_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
operation.notify_imported = Some((hash, origin, import_headers.into_post(), is_new_best, storage_changes));
}
Ok(ImportResult::Queued)
@@ -702,16 +792,16 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
fn block_execution(
&self,
transaction: &B::BlockImportOperation,
import_headers: &PrePostHeader<Block::Header>,
origin: BlockOrigin,
hash: Block::Hash,
body: Option<Vec<Block::Extrinsic>>,
transaction: &B::BlockImportOperation,
) -> error::Result<(
Option<StorageUpdate<B, Block>>,
Option<Option<ChangesUpdate>>,
Option<StorageUpdate<B, Block>>,
Option<Option<ChangesUpdate>>,
Option<Vec<(Vec<u8>, Option<Vec<u8>>)>>,
)>
)>
where
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone,
{
@@ -752,11 +842,9 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
}
}
/// Finalizes all blocks up to given. If a justification is provided it is
/// stored with the given finalized block (any other finalized blocks are
/// left unjustified).
fn apply_finality(
fn apply_finality_with_block_hash(
&self,
operation: &mut ClientImportOperation<Block, Blake2Hasher, B>,
block: Block::Hash,
justification: Option<Justification>,
best_block: Block::Hash,
@@ -799,11 +887,11 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
let enacted = route_from_finalized.enacted();
assert!(enacted.len() > 0);
for finalize_new in &enacted[..enacted.len() - 1] {
self.backend.finalize_block(BlockId::Hash(finalize_new.hash), None)?;
operation.op.mark_finalized(BlockId::Hash(finalize_new.hash), None)?;
}
assert_eq!(enacted.last().map(|e| e.hash), Some(block));
self.backend.finalize_block(BlockId::Hash(block), justification)?;
operation.op.mark_finalized(BlockId::Hash(block), justification)?;
if notify {
// sometimes when syncing, tons of blocks can be finalized at once.
@@ -811,22 +899,95 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
const MAX_TO_NOTIFY: usize = 256;
let enacted = route_from_finalized.enacted();
let start = enacted.len() - ::std::cmp::min(enacted.len(), MAX_TO_NOTIFY);
let mut sinks = self.finality_notification_sinks.lock();
for finalized in &enacted[start..] {
let header = self.header(&BlockId::Hash(finalized.hash))?
.expect("header already known to exist in DB because it is indicated in the tree route; qed");
let notification = FinalityNotification {
header,
hash: finalized.hash,
};
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
operation.notify_finalized.push(finalized.hash);
}
}
Ok(())
}
fn notify_finalized(
&self,
notify_finalized: Vec<Block::Hash>,
) -> error::Result<()> {
let mut sinks = self.finality_notification_sinks.lock();
for finalized_hash in notify_finalized {
let header = self.header(&BlockId::Hash(finalized_hash))?
.expect("header already known to exist in DB because it is indicated in the tree route; qed");
let notification = FinalityNotification {
header,
hash: finalized_hash,
};
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
}
Ok(())
}
fn notify_imported(
&self,
notify_import: (Block::Hash, BlockOrigin, Block::Header, bool, Option<Vec<(Vec<u8>, Option<Vec<u8>>)>>),
) -> error::Result<()> {
let (hash, origin, header, is_new_best, storage_changes) = notify_import;
if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(&hash, storage_changes.into_iter());
}
let notification = BlockImportNotification::<Block> {
hash,
origin,
header,
is_new_best,
};
self.import_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
Ok(())
}
/// Apply auxiliary data insertion into an operation.
pub fn apply_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
D: IntoIterator<Item=&'a &'b [u8]>,
>(
&self,
operation: &mut ClientImportOperation<Block, Blake2Hasher, B>,
insert: I,
delete: D
) -> error::Result<()> {
operation.op.insert_aux(
insert.into_iter()
.map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
.chain(delete.into_iter().map(|k| (k.to_vec(), None)))
)
}
/// Mark all blocks up to given as finalized in operation. If a
/// justification is provided it is stored with the given finalized
/// block (any other finalized blocks are left unjustified).
pub fn apply_finality(
&self,
operation: &mut ClientImportOperation<Block, Blake2Hasher, B>,
id: BlockId<Block>,
justification: Option<Justification>,
notify: bool,
) -> error::Result<()> {
let last_best = self.backend.blockchain().info()?.best_hash;
let to_finalize_hash = self.backend.blockchain().expect_block_hash_from_id(&id)?;
self.apply_finality_with_block_hash(operation, to_finalize_hash, justification, last_best, notify)
}
/// Finalize a block. This will implicitly finalize all blocks up to it and
/// fire finality notifications.
///
@@ -834,9 +995,11 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
/// This is usually tied to some synchronization state, where we don't send notifications
/// while performing major synchronization work.
pub fn finalize_block(&self, id: BlockId<Block>, justification: Option<Justification>, notify: bool) -> error::Result<()> {
let last_best = self.backend.blockchain().info()?.best_hash;
let to_finalize_hash = self.backend.blockchain().expect_block_hash_from_id(&id)?;
self.apply_finality(to_finalize_hash, justification, last_best, notify)
self.lock_import_and_run(|operation| {
let last_best = self.backend.blockchain().info()?.best_hash;
let to_finalize_hash = self.backend.blockchain().expect_block_hash_from_id(&id)?;
self.apply_finality_with_block_hash(operation, to_finalize_hash, justification, last_best, notify)
})
}
/// Attempts to revert the chain by `n` blocks. Returns the number of blocks that were
@@ -1120,63 +1283,9 @@ impl<B, E, Block, RA> consensus::BlockImport<Block> for Client<B, E, Block, RA>
import_block: ImportBlock<Block>,
new_authorities: Option<Vec<AuthorityIdFor<Block>>>,
) -> Result<ImportResult, Self::Error> {
use runtime_primitives::traits::Digest;
let ImportBlock {
origin,
header,
justification,
post_digests,
body,
finalized,
auxiliary,
fork_choice,
} = import_block;
assert!(justification.is_some() && finalized || justification.is_none());
let parent_hash = header.parent_hash().clone();
match self.backend.blockchain().status(BlockId::Hash(parent_hash)) {
Ok(blockchain::BlockStatus::InChain) => {},
Ok(blockchain::BlockStatus::Unknown) => return Ok(ImportResult::UnknownParent),
Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
}
let import_headers = if post_digests.is_empty() {
PrePostHeader::Same(header)
} else {
let mut post_header = header.clone();
for item in post_digests {
post_header.digest_mut().push(item);
}
PrePostHeader::Different(header, post_header)
};
let hash = import_headers.post().hash();
let _import_lock = self.import_lock.lock();
let height: u64 = import_headers.post().number().as_();
*self.importing_block.write() = Some(hash);
let result = self.execute_and_import_block(
origin,
hash,
import_headers,
justification,
body,
new_authorities,
finalized,
auxiliary,
fork_choice,
);
*self.importing_block.write() = None;
telemetry!("block.import";
"height" => height,
"best" => ?hash,
"origin" => ?origin
);
result.map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into())
self.lock_import_and_run(|operation| {
self.apply_block(operation, import_block, new_authorities)
}).map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into())
}
}
@@ -1279,7 +1388,13 @@ impl<B, E, Block, RA> backend::AuxStore for Client<B, E, Block, RA>
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
D: IntoIterator<Item=&'a &'b [u8]>,
>(&self, insert: I, delete: D) -> error::Result<()> {
crate::backend::AuxStore::insert_aux(&*self.backend, insert, delete)
// Import is locked here because we may have other block import
// operations that tries to set aux data. Note that for consensus
// layer, one can always use atomic operations to make sure
// import is only locked once.
self.lock_import_and_run(|operation| {
self.apply_aux(operation, insert, delete)
})
}
/// Query auxiliary data from key-value store.
fn get_aux(&self, key: &[u8]) -> error::Result<Option<Vec<u8>>> {