// Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! In memory client backend use std::collections::{HashMap, HashSet}; use std::sync::Arc; use parking_lot::{RwLock, Mutex}; use primitives::{ChangesTrieConfiguration, storage::well_known_keys}; use sr_primitives::generic::{BlockId, DigestItem}; use sr_primitives::traits::{Block as BlockT, Header as HeaderT, Zero, NumberFor}; use sr_primitives::{Justification, StorageOverlay, ChildrenStorageOverlay}; use state_machine::backend::{Backend as StateBackend, InMemory}; use state_machine::{self, InMemoryChangesTrieStorage, ChangesTrieAnchorBlockId, ChangesTrieTransaction}; use hash_db::{Hasher, Prefix}; use trie::MemoryDB; use header_metadata::{CachedHeaderMetadata, HeaderMetadata}; use crate::error; use crate::backend::{self, NewBlockState, StorageCollection, ChildStorageCollection}; use crate::light; use crate::leaves::LeafSet; use crate::blockchain::{ self, BlockStatus, HeaderBackend, well_known_cache_keys::Id as CacheKeyId }; struct PendingBlock { block: StoredBlock, state: NewBlockState, } #[derive(PartialEq, Eq, Clone)] enum StoredBlock { Header(B::Header, Option), Full(B, Option), } impl StoredBlock { fn new(header: B::Header, body: Option>, just: Option) -> Self { match body { Some(body) => StoredBlock::Full(B::new(header, body), just), None => StoredBlock::Header(header, just), } } fn header(&self) -> &B::Header { match *self { StoredBlock::Header(ref h, _) => h, StoredBlock::Full(ref b, _) => b.header(), } } fn justification(&self) -> Option<&Justification> { match *self { StoredBlock::Header(_, ref j) | StoredBlock::Full(_, ref j) => j.as_ref() } } fn extrinsics(&self) -> Option<&[B::Extrinsic]> { match *self { StoredBlock::Header(_, _) => None, StoredBlock::Full(ref b, _) => Some(b.extrinsics()), } } fn into_inner(self) -> (B::Header, Option>, Option) { match self { StoredBlock::Header(header, just) => (header, None, just), StoredBlock::Full(block, just) => { let (header, body) = block.deconstruct(); (header, Some(body), just) } } } } #[derive(Clone)] struct BlockchainStorage { blocks: HashMap>, hashes: HashMap, Block::Hash>, best_hash: Block::Hash, best_number: NumberFor, finalized_hash: Block::Hash, finalized_number: NumberFor, genesis_hash: Block::Hash, header_cht_roots: HashMap, Block::Hash>, changes_trie_cht_roots: HashMap, Block::Hash>, leaves: LeafSet>, aux: HashMap, Vec>, } /// In-memory blockchain. Supports concurrent reads. pub struct Blockchain { storage: Arc>>, } impl Clone for Blockchain { fn clone(&self) -> Self { let storage = Arc::new(RwLock::new(self.storage.read().clone())); Blockchain { storage: storage.clone(), } } } impl Blockchain { /// Get header hash of given block. pub fn id(&self, id: BlockId) -> Option { match id { BlockId::Hash(h) => Some(h), BlockId::Number(n) => self.storage.read().hashes.get(&n).cloned(), } } /// Create new in-memory blockchain storage. pub fn new() -> Blockchain { let storage = Arc::new(RwLock::new( BlockchainStorage { blocks: HashMap::new(), hashes: HashMap::new(), best_hash: Default::default(), best_number: Zero::zero(), finalized_hash: Default::default(), finalized_number: Zero::zero(), genesis_hash: Default::default(), header_cht_roots: HashMap::new(), changes_trie_cht_roots: HashMap::new(), leaves: LeafSet::new(), aux: HashMap::new(), })); Blockchain { storage: storage.clone(), } } /// Insert a block header and associated data. pub fn insert( &self, hash: Block::Hash, header: ::Header, justification: Option, body: Option::Extrinsic>>, new_state: NewBlockState, ) -> crate::error::Result<()> { let number = header.number().clone(); if new_state.is_best() { self.apply_head(&header)?; } { let mut storage = self.storage.write(); storage.leaves.import(hash.clone(), number.clone(), header.parent_hash().clone()); storage.blocks.insert(hash.clone(), StoredBlock::new(header, body, justification)); if let NewBlockState::Final = new_state { storage.finalized_hash = hash; storage.finalized_number = number.clone(); } if number == Zero::zero() { storage.genesis_hash = hash; } } Ok(()) } /// Get total number of blocks. pub fn blocks_count(&self) -> usize { self.storage.read().blocks.len() } /// Compare this blockchain with another in-mem blockchain pub fn equals_to(&self, other: &Self) -> bool { self.canon_equals_to(other) && self.storage.read().blocks == other.storage.read().blocks } /// Compare canonical chain to other canonical chain. pub fn canon_equals_to(&self, other: &Self) -> bool { let this = self.storage.read(); let other = other.storage.read(); this.hashes == other.hashes && this.best_hash == other.best_hash && this.best_number == other.best_number && this.genesis_hash == other.genesis_hash } /// Insert header CHT root. pub fn insert_cht_root(&self, block: NumberFor, cht_root: Block::Hash) { self.storage.write().header_cht_roots.insert(block, cht_root); } /// Set an existing block as head. pub fn set_head(&self, id: BlockId) -> error::Result<()> { let header = match self.header(id)? { Some(h) => h, None => return Err(error::Error::UnknownBlock(format!("{}", id))), }; self.apply_head(&header) } fn apply_head(&self, header: &::Header) -> error::Result<()> { let hash = header.hash(); let number = header.number(); // Note: this may lock storage, so it must happen before obtaining storage // write lock. let best_tree_route = { let best_hash = self.storage.read().best_hash; if &best_hash == header.parent_hash() { None } else { let route = header_metadata::tree_route(self, best_hash, *header.parent_hash())?; Some(route) } }; let mut storage = self.storage.write(); if let Some(tree_route) = best_tree_route { // apply retraction and enaction when reorganizing up to parent hash let enacted = tree_route.enacted(); for entry in enacted { storage.hashes.insert(entry.number, entry.hash); } for entry in tree_route.retracted().iter().skip(enacted.len()) { storage.hashes.remove(&entry.number); } } storage.best_hash = hash.clone(); storage.best_number = number.clone(); storage.hashes.insert(number.clone(), hash.clone()); Ok(()) } fn finalize_header(&self, id: BlockId, justification: Option) -> error::Result<()> { let hash = match self.header(id)? { Some(h) => h.hash(), None => return Err(error::Error::UnknownBlock(format!("{}", id))), }; let mut storage = self.storage.write(); storage.finalized_hash = hash; if justification.is_some() { let block = storage.blocks.get_mut(&hash) .expect("hash was fetched from a block in the db; qed"); let block_justification = match block { StoredBlock::Header(_, ref mut j) | StoredBlock::Full(_, ref mut j) => j }; *block_justification = justification; } Ok(()) } fn write_aux(&self, ops: Vec<(Vec, Option>)>) { let mut storage = self.storage.write(); for (k, v) in ops { match v { Some(v) => storage.aux.insert(k, v), None => storage.aux.remove(&k), }; } } } impl HeaderBackend for Blockchain { fn header(&self, id: BlockId) -> error::Result::Header>> { Ok(self.id(id).and_then(|hash| { self.storage.read().blocks.get(&hash).map(|b| b.header().clone()) })) } fn info(&self) -> blockchain::Info { let storage = self.storage.read(); blockchain::Info { best_hash: storage.best_hash, best_number: storage.best_number, genesis_hash: storage.genesis_hash, finalized_hash: storage.finalized_hash, finalized_number: storage.finalized_number, } } fn status(&self, id: BlockId) -> error::Result { match self.id(id).map_or(false, |hash| self.storage.read().blocks.contains_key(&hash)) { true => Ok(BlockStatus::InChain), false => Ok(BlockStatus::Unknown), } } fn number(&self, hash: Block::Hash) -> error::Result>> { Ok(self.storage.read().blocks.get(&hash).map(|b| *b.header().number())) } fn hash(&self, number: <::Header as HeaderT>::Number) -> error::Result> { Ok(self.id(BlockId::Number(number))) } } impl HeaderMetadata for Blockchain { type Error = error::Error; fn header_metadata(&self, hash: Block::Hash) -> Result, Self::Error> { self.header(BlockId::hash(hash))?.map(|header| CachedHeaderMetadata::from(&header)) .ok_or(error::Error::UnknownBlock(format!("header not found: {}", hash))) } fn insert_header_metadata(&self, _hash: Block::Hash, _metadata: CachedHeaderMetadata) { // No need to implement. } fn remove_header_metadata(&self, _hash: Block::Hash) { // No need to implement. } } impl blockchain::Backend for Blockchain { fn body(&self, id: BlockId) -> error::Result::Extrinsic>>> { Ok(self.id(id).and_then(|hash| { self.storage.read().blocks.get(&hash) .and_then(|b| b.extrinsics().map(|x| x.to_vec())) })) } fn justification(&self, id: BlockId) -> error::Result> { Ok(self.id(id).and_then(|hash| self.storage.read().blocks.get(&hash).and_then(|b| b.justification().map(|x| x.clone())) )) } fn last_finalized(&self) -> error::Result { Ok(self.storage.read().finalized_hash.clone()) } fn cache(&self) -> Option>> { None } fn leaves(&self) -> error::Result> { Ok(self.storage.read().leaves.hashes()) } fn children(&self, _parent_hash: Block::Hash) -> error::Result> { unimplemented!() } } impl blockchain::ProvideCache for Blockchain { fn cache(&self) -> Option>> { None } } impl backend::AuxStore for Blockchain { fn insert_aux< 'a, 'b: 'a, 'c: 'a, I: IntoIterator, D: IntoIterator, >(&self, insert: I, delete: D) -> error::Result<()> { let mut storage = self.storage.write(); for (k, v) in insert { storage.aux.insert(k.to_vec(), v.to_vec()); } for k in delete { storage.aux.remove(*k); } Ok(()) } fn get_aux(&self, key: &[u8]) -> error::Result>> { Ok(self.storage.read().aux.get(key).cloned()) } } impl light::blockchain::Storage for Blockchain where Block::Hash: From<[u8; 32]>, { fn import_header( &self, header: Block::Header, _cache: HashMap>, state: NewBlockState, aux_ops: Vec<(Vec, Option>)>, ) -> error::Result<()> { let hash = header.hash(); self.insert(hash, header, None, None, state)?; self.write_aux(aux_ops); Ok(()) } fn set_head(&self, id: BlockId) -> error::Result<()> { Blockchain::set_head(self, id) } fn last_finalized(&self) -> error::Result { Ok(self.storage.read().finalized_hash.clone()) } fn finalize_header(&self, id: BlockId) -> error::Result<()> { Blockchain::finalize_header(self, id, None) } fn header_cht_root( &self, _cht_size: NumberFor, block: NumberFor, ) -> error::Result { self.storage.read().header_cht_roots.get(&block).cloned() .ok_or_else(|| error::Error::Backend(format!("Header CHT for block {} not exists", block))) } fn changes_trie_cht_root( &self, _cht_size: NumberFor, block: NumberFor, ) -> error::Result { self.storage.read().changes_trie_cht_roots.get(&block).cloned() .ok_or_else(|| error::Error::Backend(format!("Changes trie CHT for block {} not exists", block))) } fn cache(&self) -> Option>> { None } } /// In-memory operation. pub struct BlockImportOperation { pending_block: Option>, pending_cache: HashMap>, old_state: InMemory, new_state: Option>, changes_trie_update: Option>, aux: Vec<(Vec, Option>)>, finalized_blocks: Vec<(BlockId, Option)>, set_head: Option>, } impl backend::BlockImportOperation for BlockImportOperation where Block: BlockT, H: Hasher, H::Out: Ord, { type State = InMemory; fn state(&self) -> error::Result> { Ok(Some(&self.old_state)) } fn set_block_data( &mut self, header: ::Header, body: Option::Extrinsic>>, justification: Option, state: NewBlockState, ) -> error::Result<()> { assert!(self.pending_block.is_none(), "Only one block per operation is allowed"); self.pending_block = Some(PendingBlock { block: StoredBlock::new(header, body, justification), state, }); Ok(()) } fn update_cache(&mut self, cache: HashMap>) { self.pending_cache = cache; } fn update_db_storage(&mut self, update: as StateBackend>::Transaction) -> error::Result<()> { self.new_state = Some(self.old_state.update(update)); Ok(()) } fn update_changes_trie(&mut self, update: ChangesTrieTransaction>) -> error::Result<()> { self.changes_trie_update = Some(update.0); Ok(()) } fn reset_storage(&mut self, top: StorageOverlay, children: ChildrenStorageOverlay) -> error::Result { check_genesis_storage(&top, &children)?; let child_delta = children.into_iter() .map(|(storage_key, child_overlay)| (storage_key, child_overlay.into_iter().map(|(k, v)| (k, Some(v))))); let (root, transaction) = self.old_state.full_storage_root( top.into_iter().map(|(k, v)| (k, Some(v))), child_delta ); self.new_state = Some(InMemory::from(transaction)); Ok(root) } fn insert_aux(&mut self, ops: I) -> error::Result<()> where I: IntoIterator, Option>)> { self.aux.append(&mut ops.into_iter().collect()); Ok(()) } fn update_storage( &mut self, _update: StorageCollection, _child_update: ChildStorageCollection, ) -> error::Result<()> { Ok(()) } fn mark_finalized(&mut self, block: BlockId, justification: Option) -> error::Result<()> { self.finalized_blocks.push((block, justification)); Ok(()) } fn mark_head(&mut self, block: BlockId) -> error::Result<()> { assert!(self.pending_block.is_none(), "Only one set block per operation is allowed"); self.set_head = Some(block); Ok(()) } } /// In-memory backend. Keeps all states and blocks in memory. /// /// > **Warning**: Doesn't support all the features necessary for a proper database. Only use this /// > struct for testing purposes. Do **NOT** use in production. pub struct Backend where Block: BlockT, H: Hasher, H::Out: Ord, { states: RwLock>>, changes_trie_storage: ChangesTrieStorage, blockchain: Blockchain, import_lock: Mutex<()>, } impl Backend where Block: BlockT, H: Hasher, H::Out: Ord, { /// Create a new instance of in-mem backend. pub fn new() -> Backend { Backend { states: RwLock::new(HashMap::new()), changes_trie_storage: ChangesTrieStorage(InMemoryChangesTrieStorage::new()), blockchain: Blockchain::new(), import_lock: Default::default(), } } } impl backend::AuxStore for Backend where Block: BlockT, H: Hasher, H::Out: Ord, { fn insert_aux< 'a, 'b: 'a, 'c: 'a, I: IntoIterator, D: IntoIterator, >(&self, insert: I, delete: D) -> error::Result<()> { self.blockchain.insert_aux(insert, delete) } fn get_aux(&self, key: &[u8]) -> error::Result>> { self.blockchain.get_aux(key) } } impl backend::Backend for Backend where Block: BlockT, H: Hasher, H::Out: Ord, { type BlockImportOperation = BlockImportOperation; type Blockchain = Blockchain; type State = InMemory; type ChangesTrieStorage = ChangesTrieStorage; type OffchainStorage = OffchainStorage; fn begin_operation(&self) -> error::Result { let old_state = self.state_at(BlockId::Hash(Default::default()))?; Ok(BlockImportOperation { pending_block: None, pending_cache: Default::default(), old_state, new_state: None, changes_trie_update: None, aux: Default::default(), finalized_blocks: Default::default(), set_head: None, }) } fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId) -> error::Result<()> { operation.old_state = self.state_at(block)?; Ok(()) } fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> { if !operation.finalized_blocks.is_empty() { for (block, justification) in operation.finalized_blocks { self.blockchain.finalize_header(block, justification)?; } } if let Some(pending_block) = operation.pending_block { let old_state = &operation.old_state; let (header, body, justification) = pending_block.block.into_inner(); let hash = header.hash(); self.states.write().insert(hash, operation.new_state.unwrap_or_else(|| old_state.clone())); let maybe_changes_trie_root = header.digest().log(DigestItem::as_changes_trie_root).cloned(); if let Some(changes_trie_root) = maybe_changes_trie_root { if let Some(changes_trie_update) = operation.changes_trie_update { self.changes_trie_storage.0.insert( *header.number(), changes_trie_root, changes_trie_update ); } } self.blockchain.insert(hash, header, justification, body, pending_block.state)?; } if !operation.aux.is_empty() { self.blockchain.write_aux(operation.aux); } if let Some(set_head) = operation.set_head { self.blockchain.set_head(set_head)?; } Ok(()) } fn finalize_block(&self, block: BlockId, justification: Option) -> error::Result<()> { self.blockchain.finalize_header(block, justification) } fn blockchain(&self) -> &Self::Blockchain { &self.blockchain } fn used_state_cache_size(&self) -> Option { None } fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage> { Some(&self.changes_trie_storage) } fn offchain_storage(&self) -> Option { None } fn state_at(&self, block: BlockId) -> error::Result { match block { BlockId::Hash(h) if h == Default::default() => { return Ok(Self::State::default()); }, _ => {}, } match self.blockchain.id(block).and_then(|id| self.states.read().get(&id).cloned()) { Some(state) => Ok(state), None => Err(error::Error::UnknownBlock(format!("{}", block))), } } fn revert(&self, _n: NumberFor) -> error::Result> { Ok(Zero::zero()) } fn get_import_lock(&self) -> &Mutex<()> { &self.import_lock } } impl backend::LocalBackend for Backend where Block: BlockT, H: Hasher, H::Out: Ord, {} impl backend::RemoteBackend for Backend where Block: BlockT, H: Hasher, H::Out: Ord, { fn is_local_state_available(&self, block: &BlockId) -> bool { self.blockchain.expect_block_number_from_id(block) .map(|num| num.is_zero()) .unwrap_or(false) } fn remote_blockchain(&self) -> Arc> { unimplemented!() } } /// Prunable in-memory changes trie storage. pub struct ChangesTrieStorage(InMemoryChangesTrieStorage>); impl backend::PrunableStateChangesTrieStorage for ChangesTrieStorage { fn oldest_changes_trie_block( &self, _config: &ChangesTrieConfiguration, _best_finalized: NumberFor, ) -> NumberFor { Zero::zero() } } impl state_machine::ChangesTrieRootsStorage> for ChangesTrieStorage where Block: BlockT, H: Hasher, { fn build_anchor( &self, _hash: H::Out, ) -> Result>, String> { Err("Dummy implementation".into()) } fn root( &self, _anchor: &ChangesTrieAnchorBlockId>, _block: NumberFor, ) -> Result, String> { Err("Dummy implementation".into()) } } impl state_machine::ChangesTrieStorage> for ChangesTrieStorage where Block: BlockT, H: Hasher, { fn as_roots_storage(&self) -> &dyn state_machine::ChangesTrieRootsStorage> { self } fn with_cached_changed_keys( &self, _root: &H::Out, _functor: &mut dyn FnMut(&HashMap>, HashSet>>), ) -> bool { false } fn get(&self, key: &H::Out, prefix: Prefix) -> Result, String> { self.0.get(key, prefix) } } /// Check that genesis storage is valid. pub fn check_genesis_storage(top: &StorageOverlay, children: &ChildrenStorageOverlay) -> error::Result<()> { if top.iter().any(|(k, _)| well_known_keys::is_child_storage_key(k)) { return Err(error::Error::GenesisInvalid.into()); } if children.keys().any(|child_key| !well_known_keys::is_child_storage_key(&child_key)) { return Err(error::Error::GenesisInvalid.into()); } Ok(()) } /// In-memory storage for offchain workers. #[derive(Debug, Clone, Default)] pub struct OffchainStorage { storage: HashMap, Vec>, } impl backend::OffchainStorage for OffchainStorage { fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) { let key = prefix.iter().chain(key).cloned().collect(); self.storage.insert(key, value.to_vec()); } fn get(&self, prefix: &[u8], key: &[u8]) -> Option> { let key: Vec = prefix.iter().chain(key).cloned().collect(); self.storage.get(&key).cloned() } fn compare_and_set( &mut self, prefix: &[u8], key: &[u8], old_value: Option<&[u8]>, new_value: &[u8], ) -> bool { use std::collections::hash_map::Entry; let key = prefix.iter().chain(key).cloned().collect(); match self.storage.entry(key) { Entry::Vacant(entry) => if old_value.is_none() { entry.insert(new_value.to_vec()); true } else { false }, Entry::Occupied(ref mut entry) if Some(entry.get().as_slice()) == old_value => { entry.insert(new_value.to_vec()); true }, _ => false, } } } #[cfg(test)] mod tests { use super::*; use std::sync::Arc; use test_client; use primitives::Blake2Hasher; type TestBackend = test_client::client::in_mem::Backend; #[test] fn test_leaves_with_complex_block_tree() { let backend = Arc::new(TestBackend::new()); test_client::trait_tests::test_leaves_for_backend(backend); } #[test] fn test_blockchain_query_by_number_gets_canonical() { let backend = Arc::new(TestBackend::new()); test_client::trait_tests::test_blockchain_query_by_number_gets_canonical(backend); } #[test] fn in_memory_offchain_storage() { use crate::backend::OffchainStorage as _; let mut storage = OffchainStorage::default(); assert_eq!(storage.get(b"A", b"B"), None); assert_eq!(storage.get(b"B", b"A"), None); storage.set(b"A", b"B", b"C"); assert_eq!(storage.get(b"A", b"B"), Some(b"C".to_vec())); assert_eq!(storage.get(b"B", b"A"), None); storage.compare_and_set(b"A", b"B", Some(b"X"), b"D"); assert_eq!(storage.get(b"A", b"B"), Some(b"C".to_vec())); storage.compare_and_set(b"A", b"B", Some(b"C"), b"D"); assert_eq!(storage.get(b"A", b"B"), Some(b"D".to_vec())); assert!(!storage.compare_and_set(b"B", b"A", Some(b""), b"Y")); assert!(storage.compare_and_set(b"B", b"A", None, b"X")); assert_eq!(storage.get(b"B", b"A"), Some(b"X".to_vec())); } }