Archive Mode and groundwork for state-preserving handles (#166)

* less eager deletion of DB values

* archive mode
This commit is contained in:
Robert Habermeier
2018-05-17 13:47:53 +02:00
committed by Gav Wood
parent 37029eb42f
commit 491f74ad00
6 changed files with 202 additions and 65 deletions
@@ -198,7 +198,6 @@ fn start_bft<F, C>(
} }
}; };
let input = Messages { let input = Messages {
network_stream: network.bft_messages(parent_hash), network_stream: network.bft_messages(parent_hash),
local_id: bft_service.local_id(), local_id: bft_service.local_id(),
+127 -26
View File
@@ -96,12 +96,6 @@ struct PendingBlock {
is_best: bool, is_best: bool,
} }
/// Database transaction
pub struct BlockImportOperation {
pending_state: DbState,
pending_block: Option<PendingBlock>,
}
#[derive(Clone)] #[derive(Clone)]
struct Meta { struct Meta {
best_hash: HeaderHash, best_hash: HeaderHash,
@@ -261,11 +255,18 @@ impl client::blockchain::Backend for BlockchainDb {
} }
} }
/// Database transaction
pub struct BlockImportOperation {
old_state: DbState,
updates: MemoryDB,
pending_block: Option<PendingBlock>,
}
impl client::backend::BlockImportOperation for BlockImportOperation { impl client::backend::BlockImportOperation for BlockImportOperation {
type State = DbState; type State = DbState;
fn state(&self) -> Result<&Self::State, client::error::Error> { fn state(&self) -> Result<&Self::State, client::error::Error> {
Ok(&self.pending_state) Ok(&self.old_state)
} }
fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_best: bool) -> Result<(), client::error::Error> { fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_best: bool) -> Result<(), client::error::Error> {
@@ -280,14 +281,14 @@ impl client::backend::BlockImportOperation for BlockImportOperation {
} }
fn update_storage(&mut self, update: MemoryDB) -> Result<(), client::error::Error> { fn update_storage(&mut self, update: MemoryDB) -> Result<(), client::error::Error> {
self.pending_state.commit(update); self.updates = update;
Ok(()) Ok(())
} }
fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> Result<(), client::error::Error> { fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> Result<(), client::error::Error> {
// TODO: wipe out existing trie. // TODO: wipe out existing trie.
let (_, update) = self.pending_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v)))); let (_, update) = self.old_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v))));
self.pending_state.commit(update); self.updates = update;
Ok(()) Ok(())
} }
} }
@@ -341,10 +342,10 @@ impl<'a> HashDB for Ephemeral<'a> {
} }
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit. /// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
#[derive(Clone)]
pub struct DbState { pub struct DbState {
db: Arc<KeyValueDB>, db: Arc<KeyValueDB>,
root: TrieH256, root: TrieH256,
updates: MemoryDB,
} }
impl state_machine::Backend for DbState { impl state_machine::Backend for DbState {
@@ -364,10 +365,6 @@ impl state_machine::Backend for DbState {
.get(key).map(|x| x.map(|val| val.to_vec())).map_err(map_e) .get(key).map(|x| x.map(|val| val.to_vec())).map_err(map_e)
} }
fn commit(&mut self, transaction: MemoryDB) {
self.updates = transaction;
}
fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> { fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
let mut read_overlay = MemoryDB::default(); let mut read_overlay = MemoryDB::default();
let eph = Ephemeral { let eph = Ephemeral {
@@ -423,10 +420,12 @@ impl state_machine::Backend for DbState {
} }
} }
/// In-memory backend. Keeps all states and blocks in memory. Useful for testing. /// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks.
/// Otherwise, trie nodes are kept only from the most recent block.
pub struct Backend { pub struct Backend {
db: Arc<KeyValueDB>, db: Arc<KeyValueDB>,
blockchain: BlockchainDb, blockchain: BlockchainDb,
archive: bool,
} }
impl Backend { impl Backend {
@@ -438,22 +437,23 @@ impl Backend {
let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?; let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?;
let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?); let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?);
Backend::from_kvdb(db as Arc<_>) Backend::from_kvdb(db as Arc<_>, true)
} }
#[cfg(test)] #[cfg(test)]
fn new_test() -> Backend { fn new_test() -> Backend {
let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS)); let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS));
Backend::from_kvdb(db as Arc<_>).expect("failed to create test-db") Backend::from_kvdb(db as Arc<_>, false).expect("failed to create test-db")
} }
fn from_kvdb(db: Arc<KeyValueDB>) -> Result<Backend, client::error::Error> { fn from_kvdb(db: Arc<KeyValueDB>, archive: bool) -> Result<Backend, client::error::Error> {
let blockchain = BlockchainDb::new(db.clone())?; let blockchain = BlockchainDb::new(db.clone())?;
Ok(Backend { Ok(Backend {
db, db,
blockchain, blockchain,
archive
}) })
} }
} }
@@ -467,7 +467,8 @@ impl client::backend::Backend for Backend {
let state = self.state_at(block)?; let state = self.state_at(block)?;
Ok(BlockImportOperation { Ok(BlockImportOperation {
pending_block: None, pending_block: None,
pending_state: state, old_state: state,
updates: MemoryDB::default(),
}) })
} }
@@ -488,10 +489,10 @@ impl client::backend::Backend for Backend {
if pending_block.is_best { if pending_block.is_best {
transaction.put(columns::META, meta::BEST_BLOCK, &key); transaction.put(columns::META, meta::BEST_BLOCK, &key);
} }
for (key, (val, rc)) in operation.pending_state.updates.drain() { for (key, (val, rc)) in operation.updates.drain() {
if rc > 0 { if rc > 0 {
transaction.put(columns::STATE, &key.0[..], &val); transaction.put(columns::STATE, &key.0[..], &val);
} else { } else if rc < 0 && !self.archive {
transaction.delete(columns::STATE, &key.0[..]); transaction.delete(columns::STATE, &key.0[..]);
} }
} }
@@ -518,7 +519,6 @@ impl client::backend::Backend for Backend {
return Ok(DbState { return Ok(DbState {
db: self.db.clone(), db: self.db.clone(),
updates: Default::default(),
root, root,
}) })
} }
@@ -528,7 +528,6 @@ impl client::backend::Backend for Backend {
self.blockchain.header(block).and_then(|maybe_hdr| maybe_hdr.map(|hdr| { self.blockchain.header(block).and_then(|maybe_hdr| maybe_hdr.map(|hdr| {
DbState { DbState {
db: self.db.clone(), db: self.db.clone(),
updates: Default::default(),
root: hdr.state_root.0.into(), root: hdr.state_root.0.into(),
} }
}).ok_or_else(|| client::error::ErrorKind::UnknownBlock(block).into())) }).ok_or_else(|| client::error::ErrorKind::UnknownBlock(block).into()))
@@ -595,7 +594,7 @@ mod tests {
(vec![1, 2, 3], vec![9, 9, 9]), (vec![1, 2, 3], vec![9, 9, 9]),
]; ];
header.state_root = op.pending_state.storage_root(storage header.state_root = op.old_state.storage_root(storage
.iter() .iter()
.cloned() .cloned()
.map(|(x, y)| (x, Some(y))) .map(|(x, y)| (x, Some(y)))
@@ -634,7 +633,7 @@ mod tests {
(vec![5, 5, 5], Some(vec![4, 5, 6])), (vec![5, 5, 5], Some(vec![4, 5, 6])),
]; ];
let (root, overlay) = op.pending_state.storage_root(storage.iter().cloned()); let (root, overlay) = op.old_state.storage_root(storage.iter().cloned());
op.update_storage(overlay).unwrap(); op.update_storage(overlay).unwrap();
header.state_root = root.into(); header.state_root = root.into();
@@ -654,4 +653,106 @@ mod tests {
assert_eq!(state.storage(&[5, 5, 5]).unwrap(), Some(vec![4, 5, 6])); assert_eq!(state.storage(&[5, 5, 5]).unwrap(), Some(vec![4, 5, 6]));
} }
} }
#[test]
fn delete_only_when_negative_rc() {
let key;
let db = Backend::new_test();
{
let mut op = db.begin_operation(BlockId::Hash(Default::default())).unwrap();
let mut header = block::Header {
number: 0,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage: Vec<(_, _)> = vec![];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
op.reset_storage(storage.iter().cloned()).unwrap();
key = op.updates.insert(b"hello");
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();
db.commit_operation(op).unwrap();
assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}
{
let mut op = db.begin_operation(BlockId::Number(0)).unwrap();
let mut header = block::Header {
number: 1,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage: Vec<(_, _)> = vec![];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
op.updates.insert(b"hello");
op.updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();
db.commit_operation(op).unwrap();
assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}
{
let mut op = db.begin_operation(BlockId::Number(1)).unwrap();
let mut header = block::Header {
number: 1,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage: Vec<(_, _)> = vec![];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
op.updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();
db.commit_operation(op).unwrap();
assert!(db.db.get(::columns::STATE, &key.0[..]).unwrap().is_none());
}
}
} }
+8 -1
View File
@@ -37,6 +37,13 @@ pub trait BlockImportOperation {
} }
/// Client backend. Manages the data layer. /// Client backend. Manages the data layer.
///
/// Note on state pruning: while an object from `state_at` is alive, the state
/// should not be pruned. The backend should internally reference-count
/// its state objects.
///
/// The same applies for live `BlockImportOperation`s: while an import operation building on a parent `P`
/// is alive, the state for `P` should not be pruned.
pub trait Backend { pub trait Backend {
/// Associated block insertion operation type. /// Associated block insertion operation type.
type BlockImportOperation: BlockImportOperation; type BlockImportOperation: BlockImportOperation;
@@ -52,6 +59,6 @@ pub trait Backend {
fn commit_operation(&self, transaction: Self::BlockImportOperation) -> error::Result<()>; fn commit_operation(&self, transaction: Self::BlockImportOperation) -> error::Result<()>;
/// Returns reference to blockchain backend. /// Returns reference to blockchain backend.
fn blockchain(&self) -> &Self::Blockchain; fn blockchain(&self) -> &Self::Blockchain;
/// Returns state backend for specified block. /// Returns state backend with post-state of given block.
fn state_at(&self, block: BlockId) -> error::Result<Self::State>; fn state_at(&self, block: BlockId) -> error::Result<Self::State>;
} }
+19 -17
View File
@@ -18,14 +18,13 @@
use std::collections::HashMap; use std::collections::HashMap;
use parking_lot::RwLock; use parking_lot::RwLock;
use state_machine;
use error; use error;
use backend; use backend;
use runtime_support::Hashable; use runtime_support::Hashable;
use primitives; use primitives;
use primitives::block::{self, Id as BlockId, HeaderHash}; use primitives::block::{self, Id as BlockId, HeaderHash};
use blockchain::{self, BlockStatus}; use blockchain::{self, BlockStatus};
use state_machine::backend::Backend as StateBackend; use state_machine::backend::{Backend as StateBackend, InMemory};
fn header_hash(header: &block::Header) -> block::HeaderHash { fn header_hash(header: &block::Header) -> block::HeaderHash {
header.blake2_256().into() header.blake2_256().into()
@@ -43,12 +42,6 @@ struct Block {
body: Option<block::Body>, body: Option<block::Body>,
} }
/// In-memory operation.
pub struct BlockImportOperation {
pending_block: Option<PendingBlock>,
pending_state: state_machine::backend::InMemory,
}
#[derive(Clone)] #[derive(Clone)]
struct BlockchainStorage { struct BlockchainStorage {
blocks: HashMap<HeaderHash, Block>, blocks: HashMap<HeaderHash, Block>,
@@ -160,11 +153,18 @@ impl blockchain::Backend for Blockchain {
} }
} }
/// In-memory operation.
pub struct BlockImportOperation {
pending_block: Option<PendingBlock>,
old_state: InMemory,
new_state: Option<InMemory>,
}
impl backend::BlockImportOperation for BlockImportOperation { impl backend::BlockImportOperation for BlockImportOperation {
type State = state_machine::backend::InMemory; type State = InMemory;
fn state(&self) -> error::Result<&Self::State> { fn state(&self) -> error::Result<&Self::State> {
Ok(&self.pending_state) Ok(&self.old_state)
} }
fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_new_best: bool) -> error::Result<()> { fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_new_best: bool) -> error::Result<()> {
@@ -180,20 +180,20 @@ impl backend::BlockImportOperation for BlockImportOperation {
Ok(()) Ok(())
} }
fn update_storage(&mut self, update: <Self::State as StateBackend>::Transaction) -> error::Result<()> { fn update_storage(&mut self, update: <InMemory as StateBackend>::Transaction) -> error::Result<()> {
self.pending_state.commit(update); self.new_state = Some(self.old_state.update(update));
Ok(()) Ok(())
} }
fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> error::Result<()> { fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> error::Result<()> {
self.pending_state = state_machine::backend::InMemory::from(iter.collect()); self.new_state = Some(InMemory::from(iter.collect::<HashMap<_, _>>()));
Ok(()) Ok(())
} }
} }
/// In-memory backend. Keeps all states and blocks in memory. Useful for testing. /// In-memory backend. Keeps all states and blocks in memory. Useful for testing.
pub struct Backend { pub struct Backend {
states: RwLock<HashMap<block::HeaderHash, state_machine::backend::InMemory>>, states: RwLock<HashMap<block::HeaderHash, InMemory>>,
blockchain: Blockchain, blockchain: Blockchain,
} }
@@ -210,7 +210,7 @@ impl Backend {
impl backend::Backend for Backend { impl backend::Backend for Backend {
type BlockImportOperation = BlockImportOperation; type BlockImportOperation = BlockImportOperation;
type Blockchain = Blockchain; type Blockchain = Blockchain;
type State = state_machine::backend::InMemory; type State = InMemory;
fn begin_operation(&self, block: BlockId) -> error::Result<Self::BlockImportOperation> { fn begin_operation(&self, block: BlockId) -> error::Result<Self::BlockImportOperation> {
let state = match block { let state = match block {
@@ -220,14 +220,16 @@ impl backend::Backend for Backend {
Ok(BlockImportOperation { Ok(BlockImportOperation {
pending_block: None, pending_block: None,
pending_state: state, old_state: state,
new_state: None,
}) })
} }
fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> { fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> {
if let Some(pending_block) = operation.pending_block { if let Some(pending_block) = operation.pending_block {
let hash = header_hash(&pending_block.block.header); let hash = header_hash(&pending_block.block.header);
self.states.write().insert(hash, operation.pending_state); let old_state = &operation.old_state;
self.states.write().insert(hash, operation.new_state.unwrap_or_else(|| old_state.clone()));
self.blockchain.insert(hash, pending_block.block.header, pending_block.block.justification, pending_block.block.body, pending_block.is_best); self.blockchain.insert(hash, pending_block.block.header, pending_block.block.justification, pending_block.block.body, pending_block.is_best);
} }
Ok(()) Ok(())
@@ -18,10 +18,13 @@
use std::{error, fmt}; use std::{error, fmt};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
/// A state backend is used to read state data and can have changes committed /// A state backend is used to read state data and can have changes committed
/// to it. /// to it.
pub trait Backend { ///
/// The clone operation should be cheap.
pub trait Backend: Clone {
/// An error type when fetching data is not possible. /// An error type when fetching data is not possible.
type Error: super::Error; type Error: super::Error;
@@ -36,9 +39,6 @@ pub trait Backend {
fn storage_root<I>(&self, delta: I) -> ([u8; 32], Self::Transaction) fn storage_root<I>(&self, delta: I) -> ([u8; 32], Self::Transaction)
where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>; where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>;
/// Commit updates to the backend and get new state.
fn commit(&mut self, tx: Self::Transaction);
/// Get all key/value pairs into a Vec. /// Get all key/value pairs into a Vec.
fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)>; fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)>;
} }
@@ -60,20 +60,54 @@ impl error::Error for Void {
/// In-memory backend. Fully recomputes tries on each commit but useful for /// In-memory backend. Fully recomputes tries on each commit but useful for
/// tests. /// tests.
pub type InMemory = HashMap<Vec<u8>, Vec<u8>>; #[derive(Clone, PartialEq, Eq)]
pub struct InMemory {
inner: Arc<HashMap<Vec<u8>, Vec<u8>>>,
}
impl Default for InMemory {
fn default() -> Self {
InMemory {
inner: Arc::new(Default::default()),
}
}
}
impl InMemory {
/// Copy the state, with applied updates
pub fn update(&self, changes: <Self as Backend>::Transaction) -> Self {
let mut inner: HashMap<_, _> = (&*self.inner).clone();
for (key, val) in changes {
match val {
Some(v) => { inner.insert(key, v); },
None => { inner.remove(&key); },
}
}
inner.into()
}
}
impl From<HashMap<Vec<u8>, Vec<u8>>> for InMemory {
fn from(inner: HashMap<Vec<u8>, Vec<u8>>) -> Self {
InMemory {
inner: Arc::new(inner),
}
}
}
impl Backend for InMemory { impl Backend for InMemory {
type Error = Void; type Error = Void;
type Transaction = Vec<(Vec<u8>, Option<Vec<u8>>)>; type Transaction = Vec<(Vec<u8>, Option<Vec<u8>>)>;
fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> { fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(self.get(key).map(Clone::clone)) Ok(self.inner.get(key).map(Clone::clone))
} }
fn storage_root<I>(&self, delta: I) -> ([u8; 32], Self::Transaction) fn storage_root<I>(&self, delta: I) -> ([u8; 32], Self::Transaction)
where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)> where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>
{ {
let existing_pairs = self.iter().map(|(k, v)| (k.clone(), Some(v.clone()))); let existing_pairs = self.inner.iter().map(|(k, v)| (k.clone(), Some(v.clone())));
let transaction: Vec<_> = delta.into_iter().collect(); let transaction: Vec<_> = delta.into_iter().collect();
let root = ::triehash::trie_root(existing_pairs.chain(transaction.iter().cloned()) let root = ::triehash::trie_root(existing_pairs.chain(transaction.iter().cloned())
@@ -85,17 +119,8 @@ impl Backend for InMemory {
(root, transaction) (root, transaction)
} }
fn commit(&mut self, changes: Self::Transaction) {
for (key, val) in changes {
match val {
Some(v) => { self.insert(key, v); },
None => { self.remove(&key); },
}
}
}
fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> { fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
self.iter().map(|(k, v)| (k.clone(), v.clone())).collect() self.inner.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} }
} }
+6 -3
View File
@@ -145,11 +145,13 @@ pub trait CodeExecutor: Sized + Send + Sync {
} }
/// Execute a call using the given state backend, overlayed changes, and call executor. /// Execute a call using the given state backend, overlayed changes, and call executor.
/// Produces a state-backend-specific "transaction" which can be used to apply the changes
/// to the backing store, such as the disk.
/// ///
/// On an error, no prospective changes are written to the overlay. /// On an error, no prospective changes are written to the overlay.
/// ///
/// Note: changes to code will be in place if this call is made again. For running partial /// Note: changes to code will be in place if this call is made again. For running partial
/// blocks (e.g. a transaction at a time), ensure a differrent method is used. /// blocks (e.g. a transaction at a time), ensure a different method is used.
pub fn execute<B: backend::Backend, Exec: CodeExecutor>( pub fn execute<B: backend::Backend, Exec: CodeExecutor>(
backend: &B, backend: &B,
overlay: &mut OverlayedChanges, overlay: &mut OverlayedChanges,
@@ -228,12 +230,13 @@ mod tests {
#[test] #[test]
fn overlayed_storage_root_works() { fn overlayed_storage_root_works() {
let backend = InMemory::from(map![ let initial: HashMap<_, _> = map![
b"doe".to_vec() => b"reindeer".to_vec(), b"doe".to_vec() => b"reindeer".to_vec(),
b"dog".to_vec() => b"puppyXXX".to_vec(), b"dog".to_vec() => b"puppyXXX".to_vec(),
b"dogglesworth".to_vec() => b"catXXX".to_vec(), b"dogglesworth".to_vec() => b"catXXX".to_vec(),
b"doug".to_vec() => b"notadog".to_vec() b"doug".to_vec() => b"notadog".to_vec()
]); ];
let backend = InMemory::from(initial);
let mut overlay = OverlayedChanges { let mut overlay = OverlayedChanges {
committed: map![ committed: map![
b"dog".to_vec() => Some(b"puppy".to_vec()), b"dog".to_vec() => Some(b"puppy".to_vec()),