diff --git a/substrate/core/client/db/src/lib.rs b/substrate/core/client/db/src/lib.rs index 7937969c1c..0cca2c0680 100644 --- a/substrate/core/client/db/src/lib.rs +++ b/substrate/core/client/db/src/lib.rs @@ -255,6 +255,7 @@ pub struct BlockImportOperation { changes_trie_updates: MemoryDB, pending_block: Option>, aux_ops: Vec<(Vec, Option>)>, + finalized_blocks: Vec<(BlockId, Option)>, } impl BlockImportOperation { @@ -338,10 +339,10 @@ where Block: BlockT, Ok(()) } - fn set_aux(&mut self, ops: I) -> Result<(), client::error::Error> + fn insert_aux(&mut self, ops: I) -> Result<(), client::error::Error> where I: IntoIterator, Option>)> { - self.aux_ops = ops.into_iter().collect(); + self.aux_ops.append(&mut ops.into_iter().collect()); Ok(()) } @@ -349,6 +350,11 @@ where Block: BlockT, self.storage_updates = update; Ok(()) } + + fn mark_finalized(&mut self, block: BlockId, justification: Option) -> Result<(), client::error::Error> { + self.finalized_blocks.push((block, justification)); + Ok(()) + } } struct StorageDb { @@ -405,7 +411,7 @@ impl DbChangesTrieStorage { } /// Prune obsolete changes tries. - pub fn prune(&self, config: Option, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor) { + pub fn prune(&self, config: Option, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor) { // never prune on archive nodes let min_blocks_to_keep = match self.min_blocks_to_keep { Some(min_blocks_to_keep) => min_blocks_to_keep, @@ -508,7 +514,7 @@ pub struct Backend { shared_cache: SharedCache, } -impl Backend { +impl> Backend { /// Create a new instance of database backend. /// /// The pruning window is how old a block must be before the state is pruned. @@ -557,6 +563,27 @@ impl Backend { }) } + fn finalize_block_with_transaction(&self, transaction: &mut DBTransaction, block: BlockId, justification: Option) -> Result<(Block::Hash, ::Number, bool, bool), client::error::Error> { + use runtime_primitives::traits::Header; + + if let Some(header) = ::client::blockchain::HeaderBackend::header(&self.blockchain, block)? { + // TODO: ensure best chain contains this block. + let hash = header.hash(); + self.note_finalized(transaction, &header, hash.clone())?; + if let Some(justification) = justification { + let number = header.number().clone(); + transaction.put( + columns::JUSTIFICATION, + &utils::number_and_hash_to_lookup_key(number, hash.clone()), + &justification.encode(), + ); + } + Ok((hash, header.number().clone(), false, true)) + } else { + Err(client::error::ErrorKind::UnknownBlock(format!("Cannot finalize block {:?}", block)).into()) + } + } + // performs forced canonicaliziation with a delay after importning a non-finalized block. fn force_delayed_canonicalize( &self, @@ -676,24 +703,42 @@ impl client::backend::Backend for Backend whe type State = CachingState; type ChangesTrieStorage = DbChangesTrieStorage; - fn begin_operation(&self, block: BlockId) -> Result { - let state = self.state_at(block)?; + fn begin_operation(&self) -> Result { + let old_state = self.state_at(BlockId::Hash(Default::default()))?; Ok(BlockImportOperation { pending_block: None, - old_state: state, + old_state, db_updates: MemoryDB::default(), storage_updates: Default::default(), changes_trie_updates: MemoryDB::default(), aux_ops: Vec::new(), + finalized_blocks: Vec::new(), }) } + fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId) -> Result<(), client::error::Error> { + operation.old_state = self.state_at(block)?; + Ok(()) + } + fn commit_operation(&self, mut operation: Self::BlockImportOperation) -> Result<(), client::error::Error> { let mut transaction = DBTransaction::new(); operation.apply_aux(&mut transaction); + if operation.finalized_blocks.len() > 0 { + let mut meta_updates = Vec::new(); + + for (block, justification) in operation.finalized_blocks { + meta_updates.push(self.finalize_block_with_transaction(&mut transaction, block, justification)?); + } + + for (hash, number, is_best, is_finalized) in meta_updates { + self.blockchain.update_meta(hash, number, is_best, is_finalized); + } + } + if let Some(pending_block) = operation.pending_block { let hash = pending_block.header.hash(); let parent_hash = *pending_block.header.parent_hash(); @@ -844,27 +889,11 @@ impl client::backend::Backend for Backend whe fn finalize_block(&self, block: BlockId, justification: Option) -> Result<(), client::error::Error> { - use runtime_primitives::traits::Header; - - if let Some(header) = ::client::blockchain::HeaderBackend::header(&self.blockchain, block)? { - let mut transaction = DBTransaction::new(); - // TODO: ensure best chain contains this block. - let hash = header.hash(); - self.note_finalized(&mut transaction, &header, hash.clone())?; - if let Some(justification) = justification { - let number = header.number().clone(); - transaction.put( - columns::JUSTIFICATION, - &utils::number_and_hash_to_lookup_key(number, hash.clone()), - &justification.encode(), - ); - } - self.storage.db.write(transaction).map_err(db_err)?; - self.blockchain.update_meta(hash, header.number().clone(), false, true); - Ok(()) - } else { - Err(client::error::ErrorKind::UnknownBlock(format!("Cannot finalize block {:?}", block)).into()) - } + let mut transaction = DBTransaction::new(); + let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(&mut transaction, block, justification)?; + self.storage.db.write(transaction).map_err(db_err)?; + self.blockchain.update_meta(hash, number, is_best, is_finalized); + Ok(()) } fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage> { @@ -1009,7 +1038,8 @@ mod tests { } else { BlockId::Number(number - 1) }; - let mut op = backend.begin_operation(block_id).unwrap(); + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, block_id).unwrap(); op.set_block_data(header, None, None, NewBlockState::Best).unwrap(); op.update_changes_trie(changes_trie_update).unwrap(); backend.commit_operation(op).unwrap(); @@ -1031,7 +1061,8 @@ mod tests { BlockId::Number(i - 1) }; - let mut op = db.begin_operation(id).unwrap(); + let mut op = db.begin_operation().unwrap(); + db.begin_state_operation(&mut op, id).unwrap(); let header = Header { number: i, parent_hash: if i == 0 { @@ -1069,7 +1100,8 @@ mod tests { fn set_state_data() { let db = Backend::::new_test(2, 0); let hash = { - let mut op = db.begin_operation(BlockId::Hash(Default::default())).unwrap(); + let mut op = db.begin_operation().unwrap(); + db.begin_state_operation(&mut op, BlockId::Hash(Default::default())).unwrap(); let mut header = Header { number: 0, parent_hash: Default::default(), @@ -1110,7 +1142,8 @@ mod tests { }; { - let mut op = db.begin_operation(BlockId::Number(0)).unwrap(); + let mut op = db.begin_operation().unwrap(); + db.begin_state_operation(&mut op, BlockId::Number(0)).unwrap(); let mut header = Header { number: 1, parent_hash: hash, @@ -1151,7 +1184,8 @@ mod tests { let backend = Backend::::new_test(0, 0); let hash = { - let mut op = backend.begin_operation(BlockId::Hash(Default::default())).unwrap(); + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::Hash(Default::default())).unwrap(); let mut header = Header { number: 0, parent_hash: Default::default(), @@ -1186,7 +1220,8 @@ mod tests { }; let hash = { - let mut op = backend.begin_operation(BlockId::Number(0)).unwrap(); + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::Number(0)).unwrap(); let mut header = Header { number: 1, parent_hash: hash, @@ -1220,7 +1255,8 @@ mod tests { }; { - let mut op = backend.begin_operation(BlockId::Number(1)).unwrap(); + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::Number(1)).unwrap(); let mut header = Header { number: 2, parent_hash: hash, @@ -1561,7 +1597,8 @@ mod tests { let backend = Backend::::new_test(0, 0); { - let mut op = backend.begin_operation(BlockId::Hash(Default::default())).unwrap(); + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::Hash(Default::default())).unwrap(); let header = Header { number: 0, parent_hash: Default::default(), diff --git a/substrate/core/client/src/backend.rs b/substrate/core/client/src/backend.rs index b4040f59a9..60a0069db4 100644 --- a/substrate/core/client/src/backend.rs +++ b/substrate/core/client/src/backend.rs @@ -76,9 +76,11 @@ pub trait BlockImportOperation where fn update_storage(&mut self, update: Vec<(Vec, Option>)>) -> error::Result<()>; /// Inject changes trie data into the database. fn update_changes_trie(&mut self, update: MemoryDB) -> error::Result<()>; - /// Update auxiliary keys. Values are `None` if should be deleted. - fn set_aux(&mut self, ops: I) -> error::Result<()> + /// Insert auxiliary keys. Values are `None` if should be deleted. + fn insert_aux(&mut self, ops: I) -> error::Result<()> where I: IntoIterator, Option>)>; + /// Mark a block as finalized. + fn mark_finalized(&mut self, id: BlockId, justification: Option) -> error::Result<()>; } /// Provides access to an auxiliary database. @@ -108,7 +110,7 @@ pub trait Backend: AuxStore + Send + Sync where H: Hasher, { /// Associated block insertion operation type. - type BlockImportOperation: BlockImportOperation; + type BlockImportOperation: BlockImportOperation; /// Associated blockchain backend type. type Blockchain: crate::blockchain::Backend; /// Associated state backend type. @@ -118,7 +120,9 @@ pub trait Backend: AuxStore + Send + Sync where /// Begin a new block insertion transaction with given parent block id. /// When constructing the genesis, this is called with all-zero hash. - fn begin_operation(&self, block: BlockId) -> error::Result; + fn begin_operation(&self) -> error::Result; + /// Note an operation to contain state transition. + fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId) -> error::Result<()>; /// Commit block insertion. fn commit_operation(&self, transaction: Self::BlockImportOperation) -> error::Result<()>; /// Finalize block with given Id. This should only be called if the parent of the given diff --git a/substrate/core/client/src/blockchain.rs b/substrate/core/client/src/blockchain.rs index 94eecf1505..986360764d 100644 --- a/substrate/core/client/src/blockchain.rs +++ b/substrate/core/client/src/blockchain.rs @@ -92,18 +92,6 @@ pub trait Cache: Send + Sync { fn authorities_at(&self, block: BlockId) -> Option>>; } -/// Block import outcome -pub enum ImportResult { - /// Imported successfully. - Imported, - /// Block already exists, skippped. - AlreadyInChain, - /// Unknown parent. - UnknownParent, - /// Other errror. - Err(E), -} - /// Blockchain info #[derive(Debug)] pub struct Info { diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 30cccc214e..86a83ac7ad 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -45,6 +45,7 @@ use state_machine::{ ChangesTrieRootsStorage, ChangesTrieStorage, key_changes, key_changes_proof, OverlayedChanges, }; +use hash_db::Hasher; use crate::backend::{self, BlockImportOperation, PrunableStateChangesTrieStorage}; use crate::blockchain::{ @@ -90,6 +91,13 @@ pub struct Client where Block: BlockT { _phantom: PhantomData, } +/// Client import operation, a wrapper for the backend. +pub struct ClientImportOperation, B: backend::Backend> { + op: B::BlockImportOperation, + notify_imported: Option<(Block::Hash, BlockOrigin, Block::Header, bool, Option, Option>)>>)>, + notify_finalized: Vec, +} + /// A source of blockchain events. pub trait BlockchainEvents { /// Get block import event stream. Not guaranteed to be fired for every @@ -248,7 +256,8 @@ impl Client where ) -> error::Result { if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() { let (genesis_storage, children_genesis_storage) = build_genesis_storage.build_storage()?; - let mut op = backend.begin_operation(BlockId::Hash(Default::default()))?; + let mut op = backend.begin_operation()?; + backend.begin_state_operation(&mut op, BlockId::Hash(Default::default()))?; let state_root = op.reset_storage(genesis_storage, children_genesis_storage)?; let genesis_block = genesis::construct_genesis_block::(state_root.into()); info!("Initialising Genesis block/state (state: {}, header-hash: {})", genesis_block.header().state_root(), genesis_block.header().hash()); @@ -586,8 +595,110 @@ impl Client where block_builder::BlockBuilder::at_block(parent, &self) } + /// Lock the import lock, and run operations inside. + pub fn lock_import_and_run) -> error::Result>( + &self, f: F + ) -> error::Result { + let inner = || { + let _import_lock = self.import_lock.lock(); + + let mut op = ClientImportOperation { + op: self.backend.begin_operation()?, + notify_imported: None, + notify_finalized: Vec::new(), + }; + + let r = f(&mut op)?; + + let ClientImportOperation { op, notify_imported, notify_finalized } = op; + self.backend.commit_operation(op)?; + self.notify_finalized(notify_finalized)?; + + if let Some(notify_imported) = notify_imported { + self.notify_imported(notify_imported)?; + } + + Ok(r) + }; + + let result = inner(); + *self.importing_block.write() = None; + + result + } + + /// Apply a checked and validated block to an operation. If a justification is provided + /// then `finalized` *must* be true. + pub fn apply_block( + &self, + operation: &mut ClientImportOperation, + import_block: ImportBlock, + new_authorities: Option>>, + ) -> error::Result where + E: CallExecutor + Send + Sync + Clone, + { + use runtime_primitives::traits::Digest; + + let ImportBlock { + origin, + header, + justification, + post_digests, + body, + finalized, + auxiliary, + fork_choice, + } = import_block; + + assert!(justification.is_some() && finalized || justification.is_none()); + + let parent_hash = header.parent_hash().clone(); + + match self.backend.blockchain().status(BlockId::Hash(parent_hash))? { + blockchain::BlockStatus::InChain => {}, + blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), + } + + let import_headers = if post_digests.is_empty() { + PrePostHeader::Same(header) + } else { + let mut post_header = header.clone(); + for item in post_digests { + post_header.digest_mut().push(item); + } + PrePostHeader::Different(header, post_header) + }; + + let hash = import_headers.post().hash(); + let height: u64 = import_headers.post().number().as_(); + + *self.importing_block.write() = Some(hash); + + let result = self.execute_and_import_block( + operation, + origin, + hash, + import_headers, + justification, + body, + new_authorities, + finalized, + auxiliary, + fork_choice, + ); + + telemetry!("block.import"; + "height" => height, + "best" => ?hash, + "origin" => ?origin + ); + + result + } + fn execute_and_import_block( &self, + operation: &mut ClientImportOperation, origin: BlockOrigin, hash: Block::Hash, import_headers: PrePostHeader, @@ -619,17 +730,17 @@ impl Client where BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false, }; + self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?; + // ensure parent block is finalized to maintain invariant that // finality is called sequentially. if finalized { - self.apply_finality(parent_hash, None, last_best, make_notifications)?; + self.apply_finality_with_block_hash(operation, parent_hash, None, last_best, make_notifications)?; } - let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?; - // TODO: correct path logic for when to execute this function // https://github.com/paritytech/substrate/issues/1232 - let (storage_update,changes_update,storage_changes) = self.block_execution(&import_headers, origin, hash, body.clone(), &transaction)?; + let (storage_update,changes_update,storage_changes) = self.block_execution(&operation.op, &import_headers, origin, hash, body.clone())?; // TODO: non longest-chain rule. let is_new_best = finalized || match fork_choice { @@ -646,7 +757,7 @@ impl Client where trace!("Imported {}, (#{}), best={}, origin={:?}", hash, import_headers.post().number(), is_new_best, origin); - transaction.set_block_data( + operation.op.set_block_data( import_headers.post().clone(), body, justification, @@ -654,47 +765,26 @@ impl Client where )?; if let Some(authorities) = authorities { - transaction.update_authorities(authorities); + operation.op.update_authorities(authorities); } if let Some(storage_update) = storage_update { - transaction.update_db_storage(storage_update)?; + operation.op.update_db_storage(storage_update)?; } if let Some(storage_changes) = storage_changes.clone() { - transaction.update_storage(storage_changes)?; + operation.op.update_storage(storage_changes)?; } if let Some(Some(changes_update)) = changes_update { - transaction.update_changes_trie(changes_update)?; + operation.op.update_changes_trie(changes_update)?; } - transaction.set_aux(aux)?; - self.backend.commit_operation(transaction)?; + operation.op.insert_aux(aux)?; if make_notifications { - if let Some(storage_changes) = storage_changes { - // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? - self.storage_notifications.lock() - .trigger(&hash, storage_changes.into_iter()); - } - if finalized { - let notification = FinalityNotification:: { - hash, - header: import_headers.post().clone(), - }; - - self.finality_notification_sinks.lock() - .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + operation.notify_finalized.push(hash); } - let notification = BlockImportNotification:: { - hash, - origin, - header: import_headers.into_post(), - is_new_best, - }; - - self.import_notification_sinks.lock() - .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + operation.notify_imported = Some((hash, origin, import_headers.into_post(), is_new_best, storage_changes)); } Ok(ImportResult::Queued) @@ -702,16 +792,16 @@ impl Client where fn block_execution( &self, + transaction: &B::BlockImportOperation, import_headers: &PrePostHeader, origin: BlockOrigin, hash: Block::Hash, body: Option>, - transaction: &B::BlockImportOperation, ) -> error::Result<( - Option>, - Option>, + Option>, + Option>, Option, Option>)>>, - )> + )> where E: CallExecutor + Send + Sync + Clone, { @@ -752,11 +842,9 @@ impl Client where } } - /// Finalizes all blocks up to given. If a justification is provided it is - /// stored with the given finalized block (any other finalized blocks are - /// left unjustified). - fn apply_finality( + fn apply_finality_with_block_hash( &self, + operation: &mut ClientImportOperation, block: Block::Hash, justification: Option, best_block: Block::Hash, @@ -799,11 +887,11 @@ impl Client where let enacted = route_from_finalized.enacted(); assert!(enacted.len() > 0); for finalize_new in &enacted[..enacted.len() - 1] { - self.backend.finalize_block(BlockId::Hash(finalize_new.hash), None)?; + operation.op.mark_finalized(BlockId::Hash(finalize_new.hash), None)?; } assert_eq!(enacted.last().map(|e| e.hash), Some(block)); - self.backend.finalize_block(BlockId::Hash(block), justification)?; + operation.op.mark_finalized(BlockId::Hash(block), justification)?; if notify { // sometimes when syncing, tons of blocks can be finalized at once. @@ -811,22 +899,95 @@ impl Client where const MAX_TO_NOTIFY: usize = 256; let enacted = route_from_finalized.enacted(); let start = enacted.len() - ::std::cmp::min(enacted.len(), MAX_TO_NOTIFY); - let mut sinks = self.finality_notification_sinks.lock(); for finalized in &enacted[start..] { - let header = self.header(&BlockId::Hash(finalized.hash))? - .expect("header already known to exist in DB because it is indicated in the tree route; qed"); - let notification = FinalityNotification { - header, - hash: finalized.hash, - }; - - sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + operation.notify_finalized.push(finalized.hash); } } Ok(()) } + fn notify_finalized( + &self, + notify_finalized: Vec, + ) -> error::Result<()> { + let mut sinks = self.finality_notification_sinks.lock(); + + for finalized_hash in notify_finalized { + let header = self.header(&BlockId::Hash(finalized_hash))? + .expect("header already known to exist in DB because it is indicated in the tree route; qed"); + + let notification = FinalityNotification { + header, + hash: finalized_hash, + }; + + sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + } + + Ok(()) + } + + fn notify_imported( + &self, + notify_import: (Block::Hash, BlockOrigin, Block::Header, bool, Option, Option>)>>), + ) -> error::Result<()> { + let (hash, origin, header, is_new_best, storage_changes) = notify_import; + + if let Some(storage_changes) = storage_changes { + // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? + self.storage_notifications.lock() + .trigger(&hash, storage_changes.into_iter()); + } + + let notification = BlockImportNotification:: { + hash, + origin, + header, + is_new_best, + }; + + self.import_notification_sinks.lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + + Ok(()) + } + + /// Apply auxiliary data insertion into an operation. + pub fn apply_aux< + 'a, + 'b: 'a, + 'c: 'a, + I: IntoIterator, + D: IntoIterator, + >( + &self, + operation: &mut ClientImportOperation, + insert: I, + delete: D + ) -> error::Result<()> { + operation.op.insert_aux( + insert.into_iter() + .map(|(k, v)| (k.to_vec(), Some(v.to_vec()))) + .chain(delete.into_iter().map(|k| (k.to_vec(), None))) + ) + } + + /// Mark all blocks up to given as finalized in operation. If a + /// justification is provided it is stored with the given finalized + /// block (any other finalized blocks are left unjustified). + pub fn apply_finality( + &self, + operation: &mut ClientImportOperation, + id: BlockId, + justification: Option, + notify: bool, + ) -> error::Result<()> { + let last_best = self.backend.blockchain().info()?.best_hash; + let to_finalize_hash = self.backend.blockchain().expect_block_hash_from_id(&id)?; + self.apply_finality_with_block_hash(operation, to_finalize_hash, justification, last_best, notify) + } + /// Finalize a block. This will implicitly finalize all blocks up to it and /// fire finality notifications. /// @@ -834,9 +995,11 @@ impl Client where /// This is usually tied to some synchronization state, where we don't send notifications /// while performing major synchronization work. pub fn finalize_block(&self, id: BlockId, justification: Option, notify: bool) -> error::Result<()> { - let last_best = self.backend.blockchain().info()?.best_hash; - let to_finalize_hash = self.backend.blockchain().expect_block_hash_from_id(&id)?; - self.apply_finality(to_finalize_hash, justification, last_best, notify) + self.lock_import_and_run(|operation| { + let last_best = self.backend.blockchain().info()?.best_hash; + let to_finalize_hash = self.backend.blockchain().expect_block_hash_from_id(&id)?; + self.apply_finality_with_block_hash(operation, to_finalize_hash, justification, last_best, notify) + }) } /// Attempts to revert the chain by `n` blocks. Returns the number of blocks that were @@ -1120,63 +1283,9 @@ impl consensus::BlockImport for Client import_block: ImportBlock, new_authorities: Option>>, ) -> Result { - use runtime_primitives::traits::Digest; - - let ImportBlock { - origin, - header, - justification, - post_digests, - body, - finalized, - auxiliary, - fork_choice, - } = import_block; - - assert!(justification.is_some() && finalized || justification.is_none()); - - let parent_hash = header.parent_hash().clone(); - - match self.backend.blockchain().status(BlockId::Hash(parent_hash)) { - Ok(blockchain::BlockStatus::InChain) => {}, - Ok(blockchain::BlockStatus::Unknown) => return Ok(ImportResult::UnknownParent), - Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), - } - - let import_headers = if post_digests.is_empty() { - PrePostHeader::Same(header) - } else { - let mut post_header = header.clone(); - for item in post_digests { - post_header.digest_mut().push(item); - } - PrePostHeader::Different(header, post_header) - }; - - let hash = import_headers.post().hash(); - let _import_lock = self.import_lock.lock(); - let height: u64 = import_headers.post().number().as_(); - *self.importing_block.write() = Some(hash); - - let result = self.execute_and_import_block( - origin, - hash, - import_headers, - justification, - body, - new_authorities, - finalized, - auxiliary, - fork_choice, - ); - - *self.importing_block.write() = None; - telemetry!("block.import"; - "height" => height, - "best" => ?hash, - "origin" => ?origin - ); - result.map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into()) + self.lock_import_and_run(|operation| { + self.apply_block(operation, import_block, new_authorities) + }).map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()).into()) } } @@ -1279,7 +1388,13 @@ impl backend::AuxStore for Client I: IntoIterator, D: IntoIterator, >(&self, insert: I, delete: D) -> error::Result<()> { - crate::backend::AuxStore::insert_aux(&*self.backend, insert, delete) + // Import is locked here because we may have other block import + // operations that tries to set aux data. Note that for consensus + // layer, one can always use atomic operations to make sure + // import is only locked once. + self.lock_import_and_run(|operation| { + self.apply_aux(operation, insert, delete) + }) } /// Query auxiliary data from key-value store. fn get_aux(&self, key: &[u8]) -> error::Result>> { diff --git a/substrate/core/client/src/in_mem.rs b/substrate/core/client/src/in_mem.rs index d5065d7641..ae6703a78d 100644 --- a/substrate/core/client/src/in_mem.rs +++ b/substrate/core/client/src/in_mem.rs @@ -413,7 +413,8 @@ pub struct BlockImportOperation { old_state: InMemory, new_state: Option>, changes_trie_update: Option>, - aux: Option, Option>)>>, + aux: Vec<(Vec, Option>)>, + finalized_blocks: Vec<(BlockId, Option)>, } impl backend::BlockImportOperation for BlockImportOperation @@ -485,16 +486,21 @@ where Ok(root) } - fn set_aux(&mut self, ops: I) -> error::Result<()> + fn insert_aux(&mut self, ops: I) -> error::Result<()> where I: IntoIterator, Option>)> { - self.aux = Some(ops.into_iter().collect()); + self.aux.append(&mut ops.into_iter().collect()); Ok(()) } fn update_storage(&mut self, _update: Vec<(Vec, Option>)>) -> error::Result<()> { Ok(()) } + + fn mark_finalized(&mut self, block: BlockId, justification: Option) -> error::Result<()> { + self.finalized_blocks.push((block, justification)); + Ok(()) + } } /// In-memory backend. Keeps all states and blocks in memory. Useful for testing. @@ -557,23 +563,31 @@ where type State = InMemory; type ChangesTrieStorage = ChangesTrieStorage; - fn begin_operation(&self, block: BlockId) -> error::Result { - let state = match block { - BlockId::Hash(ref h) if h.clone() == Default::default() => Self::State::default(), - _ => self.state_at(block)?, - }; - + fn begin_operation(&self) -> error::Result { + let old_state = self.state_at(BlockId::Hash(Default::default()))?; Ok(BlockImportOperation { pending_block: None, pending_authorities: None, - old_state: state, + old_state, new_state: None, changes_trie_update: None, - aux: None, + aux: Default::default(), + finalized_blocks: Default::default(), }) } + fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId) -> error::Result<()> { + operation.old_state = self.state_at(block)?; + Ok(()) + } + fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> { + if !operation.finalized_blocks.is_empty() { + for (block, justification) in operation.finalized_blocks { + self.blockchain.finalize_header(block, justification)?; + } + } + if let Some(pending_block) = operation.pending_block { let old_state = &operation.old_state; let (header, body, justification) = pending_block.block.into_inner(); @@ -598,9 +612,10 @@ where } } - if let Some(ops) = operation.aux { - self.blockchain.write_aux(ops); + if !operation.aux.is_empty() { + self.blockchain.write_aux(operation.aux); } + Ok(()) } @@ -617,6 +632,13 @@ where } fn state_at(&self, block: BlockId) -> error::Result { + match block { + BlockId::Hash(h) if h == Default::default() => { + return Ok(Self::State::default()); + }, + _ => {}, + } + match self.blockchain.id(block).and_then(|id| self.states.read().get(&id).cloned()) { Some(state) => Ok(state), None => Err(error::ErrorKind::UnknownBlock(format!("{}", block)).into()), diff --git a/substrate/core/client/src/light/backend.rs b/substrate/core/client/src/light/backend.rs index c144ffa520..fcc58e0ad7 100644 --- a/substrate/core/client/src/light/backend.rs +++ b/substrate/core/client/src/light/backend.rs @@ -45,6 +45,7 @@ pub struct ImportOperation { authorities: Option>>, leaf_state: NewBlockState, aux_ops: Vec<(Vec, Option>)>, + finalized_blocks: Vec>, _phantom: ::std::marker::PhantomData<(S, F)>, } @@ -96,24 +97,42 @@ impl ClientBackend for Backend where type State = OnDemandState; type ChangesTrieStorage = in_mem::ChangesTrieStorage; - fn begin_operation(&self, _block: BlockId) -> ClientResult { + fn begin_operation(&self) -> ClientResult { Ok(ImportOperation { header: None, authorities: None, leaf_state: NewBlockState::Normal, aux_ops: Vec::new(), + finalized_blocks: Vec::new(), _phantom: Default::default(), }) } + fn begin_state_operation( + &self, + _operation: &mut Self::BlockImportOperation, + _block: BlockId + ) -> ClientResult<()> { + Ok(()) + } + fn commit_operation(&self, operation: Self::BlockImportOperation) -> ClientResult<()> { - let header = operation.header.expect("commit is called after set_block_data; set_block_data sets header; qed"); - self.blockchain.storage().import_header( - header, - operation.authorities, - operation.leaf_state, - operation.aux_ops, - ) + if !operation.finalized_blocks.is_empty() { + for block in operation.finalized_blocks { + self.blockchain.storage().finalize_header(block)?; + } + } + + if let Some(header) = operation.header { + self.blockchain.storage().import_header( + header, + operation.authorities, + operation.leaf_state, + operation.aux_ops, + )?; + } + + Ok(()) } fn finalize_block(&self, block: BlockId, _justification: Option) -> ClientResult<()> { @@ -199,14 +218,14 @@ where fn reset_storage(&mut self, top: StorageMap, children: ChildrenStorageMap) -> ClientResult { let in_mem = in_mem::Backend::::new(); - let mut op = in_mem.begin_operation(BlockId::Hash(Default::default()))?; + let mut op = in_mem.begin_operation()?; op.reset_storage(top, children) } - fn set_aux(&mut self, ops: I) -> ClientResult<()> + fn insert_aux(&mut self, ops: I) -> ClientResult<()> where I: IntoIterator, Option>)> { - self.aux_ops = ops.into_iter().collect(); + self.aux_ops.append(&mut ops.into_iter().collect()); Ok(()) } @@ -214,6 +233,11 @@ where // we're not storing anything locally => ignore changes Ok(()) } + + fn mark_finalized(&mut self, block: BlockId, _justification: Option) -> ClientResult<()> { + self.finalized_blocks.push(block); + Ok(()) + } } impl StateBackend for OnDemandState