State pruning (#216)

* Started work on state db

* Updated for a new hash type

* Pruning and tests

* Generalize on the block hash/key type

* Integrate with the client backend

* Merge w master

* CLI options

* Updated for light client refactoring

* Used IntoIterator

* Fixed invalid input hadling
This commit is contained in:
Arkadiy Paronyan
2018-06-28 17:30:24 +02:00
committed by Gav Wood
parent e263c8cf5d
commit b4a0140c6d
22 changed files with 1402 additions and 69 deletions
+12 -1
View File
@@ -2135,6 +2135,7 @@ dependencies = [
"substrate-primitives 0.1.0",
"substrate-runtime-primitives 0.1.0",
"substrate-runtime-support 0.1.0",
"substrate-state-db 0.1.0",
"substrate-state-machine 0.1.0",
]
@@ -2547,6 +2548,17 @@ dependencies = [
"serde_json 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "substrate-state-db"
version = "0.1.0"
dependencies = [
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-codec 0.1.0",
"substrate-primitives 0.1.0",
]
[[package]]
name = "substrate-state-machine"
version = "0.1.0"
@@ -2555,7 +2567,6 @@ dependencies = [
"ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hashdb 0.1.1 (git+https://github.com/paritytech/parity.git)",
"hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"kvdb 0.1.0 (git+https://github.com/paritytech/parity.git)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"memorydb 0.1.1 (git+https://github.com/paritytech/parity.git)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+1
View File
@@ -52,6 +52,7 @@ members = [
"substrate/runtime/system",
"substrate/runtime/timestamp",
"substrate/serializer",
"substrate/state-db",
"substrate/state-machine",
"substrate/test-runtime",
"substrate/telemetry",
+5
View File
@@ -75,6 +75,11 @@ args:
value_name: CHAIN_SPEC
help: Specify the chain specification (one of dev, local or poc-2)
takes_value: true
- pruning:
long: pruning
value_name: PRUNING_MODE
help: Specify the pruning mode. (a number of blocks to keep or "archive"). Default is 256.
takes_value: true
- name:
long: name
value_name: NAME
+5
View File
@@ -28,5 +28,10 @@ error_chain! {
Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"];
}
errors {
/// Input error.
Input(m: String) {
description("Invalid input"),
display("{}", m),
}
}
}
+7
View File
@@ -83,6 +83,7 @@ use polkadot_primitives::Block;
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use tokio_core::reactor;
use service::PruningMode;
const DEFAULT_TELEMETRY_URL: &str = "ws://telemetry.polkadot.io:1024";
@@ -189,6 +190,12 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
.into();
config.database_path = db_path(&base_path).to_string_lossy().into();
config.pruning = match matches.value_of("pruning") {
Some("archive") => PruningMode::ArchiveAll,
None => PruningMode::keep_blocks(256),
Some(s) => PruningMode::keep_blocks(s.parse()
.map_err(|_| error::ErrorKind::Input("Invalid pruning mode specified".to_owned()))?),
};
let (mut genesis_storage, boot_nodes) = PresetConfig::from_spec(chain_spec)
.map(PresetConfig::deconstruct)
+4
View File
@@ -20,6 +20,7 @@ use transaction_pool;
use runtime_primitives::MakeStorage;
pub use network::Role;
pub use network::NetworkConfiguration;
pub use client_db::PruningMode;
/// Service configuration.
pub struct Configuration {
@@ -33,6 +34,8 @@ pub struct Configuration {
pub keystore_path: String,
/// Path to the database.
pub database_path: String,
/// Pruning settings.
pub pruning: PruningMode,
/// Additional key seeds.
pub keys: Vec<String>,
/// The name of the chain.
@@ -58,6 +61,7 @@ impl Default for Configuration {
genesis_storage: Box::new(Default::default),
telemetry: Default::default(),
name: "Anonymous".into(),
pruning: PruningMode::ArchiveAll,
}
}
}
+2 -1
View File
@@ -68,7 +68,7 @@ use exit_future::Signal;
pub use self::error::{ErrorKind, Error};
pub use self::components::{Components, FullComponents, LightComponents};
pub use config::{Configuration, Role};
pub use config::{Configuration, Role, PruningMode};
/// Polkadot service.
pub struct Service<Components: components::Components> {
@@ -118,6 +118,7 @@ impl<Components> Service<Components>
let db_settings = client_db::DatabaseSettings {
cache_size: None,
path: config.database_path.into(),
pruning: config.pruning,
};
let (client, on_demand) = components.build_client(db_settings, executor, config.genesis_storage)?;
+1
View File
@@ -18,6 +18,7 @@ substrate-client = { path = "../../../substrate/client" }
substrate-state-machine = { path = "../../../substrate/state-machine" }
substrate-runtime-support = { path = "../../../substrate/runtime-support" }
substrate-codec = { path = "../../../substrate/codec" }
substrate-state-db = { path = "../../../substrate/state-db" }
[dev-dependencies]
kvdb-memorydb = { git = "https://github.com/paritytech/parity.git" }
+131 -41
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Client backend that uses RocksDB database as storage. State is still kept in memory.
//! Client backend that uses RocksDB database as storage.
extern crate substrate_client as client;
extern crate kvdb_rocksdb;
@@ -27,6 +27,7 @@ extern crate substrate_primitives as primitives;
extern crate substrate_runtime_support as runtime_support;
extern crate substrate_runtime_primitives as runtime_primitives;
extern crate substrate_codec as codec;
extern crate substrate_state_db as state_db;
#[macro_use]
extern crate log;
@@ -38,17 +39,21 @@ use std::sync::Arc;
use std::path::PathBuf;
use codec::Slicable;
use hashdb::DBValue;
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};
use memorydb::MemoryDB;
use parking_lot::RwLock;
use primitives::H256;
use runtime_primitives::generic::BlockId;
use runtime_primitives::bft::Justification;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hashing, HashingFor, Zero};
use runtime_primitives::BuildStorage;
use state_machine::backend::Backend as StateBackend;
use state_machine::CodeExecutor;
use state_machine::{CodeExecutor, TrieH256, DBValue};
use state_db::StateDb;
pub use state_db::PruningMode;
const FINALIZATION_WINDOW: u64 = 32;
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub type DbState = state_machine::TrieBackend;
@@ -59,6 +64,8 @@ pub struct DatabaseSettings {
pub cache_size: Option<usize>,
/// Path to the database.
pub path: PathBuf,
/// Pruning mode.
pub pruning: PruningMode,
}
/// Create an instance of db-backed client.
@@ -74,7 +81,7 @@ pub fn new_client<E, S, Block>(
E: CodeExecutor,
S: BuildStorage,
{
let backend = Arc::new(Backend::new(&settings)?);
let backend = Arc::new(Backend::new(settings, FINALIZATION_WINDOW)?);
let executor = client::LocalCallExecutor::new(backend.clone(), executor);
Ok(client::Client::new(backend, executor, genesis_storage)?)
}
@@ -82,11 +89,12 @@ pub fn new_client<E, S, Block>(
mod columns {
pub const META: Option<u32> = Some(0);
pub const STATE: Option<u32> = Some(1);
pub const BLOCK_INDEX: Option<u32> = Some(2);
pub const HEADER: Option<u32> = Some(3);
pub const BODY: Option<u32> = Some(4);
pub const JUSTIFICATION: Option<u32> = Some(5);
pub const NUM_COLUMNS: u32 = 6;
pub const STATE_META: Option<u32> = Some(2);
pub const BLOCK_INDEX: Option<u32> = Some(3);
pub const HEADER: Option<u32> = Some(4);
pub const BODY: Option<u32> = Some(5);
pub const JUSTIFICATION: Option<u32> = Some(6);
pub const NUM_COLUMNS: u32 = 7;
}
mod meta {
@@ -131,6 +139,17 @@ fn db_err(err: kvdb::Error) -> client::error::Error {
}
}
// wrapper that implements trait required for state_db
struct StateMetaDb<'a>(&'a KeyValueDB);
impl<'a> state_db::MetaDb for StateMetaDb<'a> {
type Error = kvdb::Error;
fn get_meta(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.0.get(columns::STATE_META, key).map(|r| r.map(|v| v.to_vec()))
}
}
/// Block database
pub struct BlockchainDb<Block: BlockT> {
db: Arc<KeyValueDB>,
@@ -299,44 +318,87 @@ impl<Block: BlockT> client::backend::BlockImportOperation<Block> for BlockImport
}
}
struct StorageDb<Block: BlockT> {
pub db: Arc<KeyValueDB>,
pub state_db: StateDb<Block::Hash, H256>,
}
impl<Block: BlockT> state_machine::Storage for StorageDb<Block> {
fn get(&self, key: &TrieH256) -> Result<Option<DBValue>, String> {
self.state_db.get(&key.0.into(), self).map(|r| r.map(|v| DBValue::from_slice(&v)))
.map_err(|e| format!("Database backend error: {:?}", e))
}
}
impl<Block: BlockT> state_db::HashDb for StorageDb<Block> {
type Error = kvdb::Error;
type Hash = H256;
fn get(&self, key: &H256) -> Result<Option<Vec<u8>>, Self::Error> {
self.db.get(columns::STATE, &key[..]).map(|r| r.map(|v| v.to_vec()))
}
}
/// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks.
/// Otherwise, trie nodes are kept only from the most recent block.
pub struct Backend<Block: BlockT> {
db: Arc<KeyValueDB>,
storage: Arc<StorageDb<Block>>,
blockchain: BlockchainDb<Block>,
archive: bool,
finalization_window: u64,
}
impl<Block: BlockT> Backend<Block> where <Block::Header as HeaderT>::Number: As<u32> {
/// Create a new instance of database backend.
pub fn new(config: &DatabaseSettings) -> Result<Self, client::error::Error> {
pub fn new(config: DatabaseSettings, finalization_window: u64) -> Result<Self, client::error::Error> {
let mut db_config = DatabaseConfig::with_columns(Some(columns::NUM_COLUMNS));
db_config.memory_budget = config.cache_size;
db_config.wal = true;
let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?;
let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?);
Backend::from_kvdb(db as Arc<_>, true)
Backend::from_kvdb(db as Arc<_>, config.pruning, finalization_window)
}
#[cfg(test)]
fn new_test() -> Self {
let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS));
Backend::from_kvdb(db as Arc<_>, false).expect("failed to create test-db")
Backend::from_kvdb(db as Arc<_>, PruningMode::keep_blocks(0), 0).expect("failed to create test-db")
}
fn from_kvdb(db: Arc<KeyValueDB>, archive: bool) -> Result<Self, client::error::Error> {
fn from_kvdb(db: Arc<KeyValueDB>, pruning: PruningMode, finalization_window: u64) -> Result<Self, client::error::Error> {
let blockchain = BlockchainDb::new(db.clone())?;
let map_e = |e: state_db::Error<kvdb::Error>| ::client::error::Error::from(format!("State database error: {:?}", e));
let state_db: StateDb<Block::Hash, H256> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?;
let storage_db = StorageDb {
db,
state_db,
};
Ok(Backend {
db,
storage: Arc::new(storage_db),
blockchain,
archive
finalization_window,
})
}
}
fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet<H256>) {
for (key, val) in commit.data.inserted.into_iter() {
transaction.put(columns::STATE, &key[..], &val);
}
for key in commit.data.deleted.into_iter() {
transaction.delete(columns::STATE, &key[..]);
}
for (key, val) in commit.meta.inserted.into_iter() {
transaction.put(columns::STATE_META, &key[..], &val);
}
for key in commit.meta.deleted.into_iter() {
transaction.delete(columns::STATE_META, &key[..]);
}
}
impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> where
<Block::Header as HeaderT>::Number: As<u32>,
Block::Hash: Into<[u8; 32]>, // TODO: remove when patricia_trie generic.
@@ -355,6 +417,7 @@ impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> where
}
fn commit_operation(&self, mut operation: Self::BlockImportOperation) -> Result<(), client::error::Error> {
use client::blockchain::Backend;
let mut transaction = DBTransaction::new();
if let Some(pending_block) = operation.pending_block {
let hash = pending_block.header.hash();
@@ -371,15 +434,34 @@ impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> where
if pending_block.is_best {
transaction.put(columns::META, meta::BEST_BLOCK, &key);
}
let mut changeset: state_db::ChangeSet<H256> = state_db::ChangeSet::default();
for (key, (val, rc)) in operation.updates.drain() {
if rc > 0 {
transaction.put(columns::STATE, &key.0[..], &val);
} else if rc < 0 && !self.archive {
transaction.delete(columns::STATE, &key.0[..]);
changeset.inserted.push((key.0.into(), val.to_vec()));
} else if rc < 0 {
changeset.deleted.push(key.0.into());
}
}
let number_u64 = number.as_().into();
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset);
apply_state_commit(&mut transaction, commit);
//finalize an older block
if number_u64 > self.finalization_window {
let finalizing_hash = if self.finalization_window == 0 {
Some(hash)
} else {
self.blockchain.hash(As::sa((number_u64 - self.finalization_window) as u32))?
};
if let Some(finalizing_hash) = finalizing_hash {
trace!("Finalizing block #{} ({:?})", number_u64 - self.finalization_window, finalizing_hash);
let commit = self.storage.state_db.finalize_block(&finalizing_hash);
apply_state_commit(&mut transaction, commit);
}
}
debug!("DB Commit {:?} ({})", hash, number);
self.db.write(transaction).map_err(db_err)?;
self.storage.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, pending_block.is_best);
}
Ok(())
@@ -395,13 +477,13 @@ impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> where
// special case for genesis initialization
match block {
BlockId::Hash(h) if h == Default::default() =>
return Ok(DbState::with_kvdb_for_genesis(self.db.clone(), ::columns::STATE)),
return Ok(DbState::with_storage_for_genesis(self.storage.clone())),
_ => {}
}
self.blockchain.header(block).and_then(|maybe_hdr| maybe_hdr.map(|hdr| {
let root: [u8; 32] = hdr.state_root().clone().into();
DbState::with_kvdb(self.db.clone(), ::columns::STATE, root.into())
DbState::with_storage(self.storage.clone(), root.into())
}).ok_or_else(|| client::error::ErrorKind::UnknownBlock(format!("{:?}", block)).into()))
}
}
@@ -438,7 +520,11 @@ mod tests {
let mut op = db.begin_operation(id).unwrap();
let header = Header {
number: i,
parent_hash: Default::default(),
parent_hash: if i == 0 {
Default::default()
} else {
db.blockchain.hash(i - 1).unwrap().unwrap()
},
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
@@ -538,10 +624,10 @@ mod tests {
#[test]
fn delete_only_when_negative_rc() {
let key;
let db = Backend::<Block>::new_test();
let backend = Backend::<Block>::new_test();
{
let mut op = db.begin_operation(BlockId::Hash(Default::default())).unwrap();
let hash = {
let mut op = backend.begin_operation(BlockId::Hash(Default::default())).unwrap();
let mut header = Header {
number: 0,
parent_hash: Default::default(),
@@ -557,6 +643,7 @@ mod tests {
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
let hash = header.hash();
op.reset_storage(storage.iter().cloned()).unwrap();
@@ -568,16 +655,17 @@ mod tests {
true
).unwrap();
db.commit_operation(op).unwrap();
backend.commit_operation(op).unwrap();
assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}
assert_eq!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
hash
};
{
let mut op = db.begin_operation(BlockId::Number(0)).unwrap();
let hash = {
let mut op = backend.begin_operation(BlockId::Number(0)).unwrap();
let mut header = Header {
number: 1,
parent_hash: Default::default(),
parent_hash: hash,
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
@@ -590,6 +678,7 @@ mod tests {
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
let hash = header.hash();
op.updates.insert(b"hello");
op.updates.remove(&key);
@@ -600,16 +689,17 @@ mod tests {
true
).unwrap();
db.commit_operation(op).unwrap();
backend.commit_operation(op).unwrap();
assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}
assert_eq!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
hash
};
{
let mut op = db.begin_operation(BlockId::Number(1)).unwrap();
let mut op = backend.begin_operation(BlockId::Number(1)).unwrap();
let mut header = Header {
number: 1,
parent_hash: Default::default(),
number: 2,
parent_hash: hash,
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
@@ -631,9 +721,9 @@ mod tests {
true
).unwrap();
db.commit_operation(op).unwrap();
backend.commit_operation(op).unwrap();
assert!(db.db.get(::columns::STATE, &key.0[..]).unwrap().is_none());
assert!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().is_none());
}
}
}
-1
View File
@@ -486,7 +486,6 @@ mod tests {
#[test]
fn client_initialises_from_genesis_ok() {
let client = test_client::new();
let _genesis_hash = client.block_hash(0).unwrap().unwrap();
assert_eq!(client.using_environment(|| test_runtime::system::balance_of(Keyring::Alice.to_raw_public().into())).unwrap(), 1000);
assert_eq!(client.using_environment(|| test_runtime::system::balance_of(Keyring::Ferdie.to_raw_public().into())).unwrap(), 0);
@@ -361,7 +361,7 @@ impl<T: Trait> Module<T> {
let (_, _, expiring) = Self::next_finalise().ok_or("cannot present outside of presentation period")?;
let stakes = Self::snapshoted_stakes();
let voters = Self::voters();
let bad_presentation_punishment = Self::present_slash_per_voter() * T::Balance::sa(voters.len());
let bad_presentation_punishment = Self::present_slash_per_voter() * T::Balance::sa(voters.len() as u64);
ensure!(<staking::Module<T>>::can_slash(aux.ref_into(), bad_presentation_punishment), "presenter must have sufficient slashable funds");
let mut leaderboard = Self::leaderboard().ok_or("leaderboard must exist while present phase active")?;
@@ -117,7 +117,7 @@ impl<T: Trait> Module<T> {
/// Get the amount locked in support of `proposal`; `None` if proposal isn't a valid proposal
/// index.
pub fn locked_for(proposal: PropIndex) -> Option<T::Balance> {
Self::deposit_of(proposal).map(|(d, l)| d * T::Balance::sa(l.len()))
Self::deposit_of(proposal).map(|(d, l)| d * T::Balance::sa(l.len() as u64))
}
/// Return true if `ref_index` is an on-going referendum.
@@ -121,7 +121,7 @@ impl<T> RefInto<T> for T {
}
pub trait SimpleArithmetic:
Zero + One + IntegerSquareRoot + As<usize> +
Zero + One + IntegerSquareRoot + As<u64> +
Add<Self, Output = Self> + AddAssign<Self> +
Sub<Self, Output = Self> + SubAssign<Self> +
Mul<Self, Output = Self> + MulAssign<Self> +
@@ -130,7 +130,7 @@ pub trait SimpleArithmetic:
PartialOrd<Self> + Ord
{}
impl<T:
Zero + One + IntegerSquareRoot + As<usize> +
Zero + One + IntegerSquareRoot + As<u64> +
Add<Self, Output = Self> + AddAssign<Self> +
Sub<Self, Output = Self> + SubAssign<Self> +
Mul<Self, Output = Self> + MulAssign<Self> +
@@ -314,7 +314,7 @@ pub trait Digest {
/// `parent_hash`, as well as a `digest` and a block `number`.
///
/// You can also create a `new` one from those fields.
pub trait Header: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug {
pub trait Header: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug + 'static {
type Number: Member + ::rstd::hash::Hash + Copy + MaybeDisplay + SimpleArithmetic + Slicable;
type Hash: Member + ::rstd::hash::Hash + Copy + MaybeDisplay + Default + SimpleBitOps + Slicable + AsRef<[u8]>;
type Hashing: Hashing<Output = Self::Hash>;
@@ -352,7 +352,7 @@ pub trait Header: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug {
/// `Extrinsic` piece of information as well as a `Header`.
///
/// You can get an iterator over each of the `extrinsics` and retrieve the `header`.
pub trait Block: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug {
pub trait Block: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug + 'static {
type Extrinsic: Member + Slicable;
type Header: Header<Hash=Self::Hash>;
type Hash: Member + ::rstd::hash::Hash + Copy + MaybeDisplay + Default + SimpleBitOps + Slicable + AsRef<[u8]>;
@@ -120,12 +120,12 @@ impl<Hashing, AccountId> ContractAddressFor<AccountId> for Hashing where
pub trait Trait: system::Trait + session::Trait {
/// The balance of an account.
type Balance: Parameter + SimpleArithmetic + Slicable + Default + Copy + As<Self::AccountIndex> + As<usize>;
type Balance: Parameter + SimpleArithmetic + Slicable + Default + Copy + As<Self::AccountIndex> + As<usize> + As<u64>;
/// Function type to get the contract address given the creator.
type DetermineContractAddress: ContractAddressFor<Self::AccountId>;
/// Type used for storing an account's index; implies the maximum number of accounts the system
/// can hold.
type AccountIndex: Parameter + Member + Slicable + SimpleArithmetic + As<u8> + As<u16> + As<u32> + As<usize> + Copy;
type AccountIndex: Parameter + Member + Slicable + SimpleArithmetic + As<u8> + As<u16> + As<u32> + As<u64> + As<usize> + Copy;
}
decl_module! {
@@ -872,7 +872,7 @@ impl<T: Trait> AuxLookup for Module<T> {
impl<T: Trait> MakePayment<T::AccountId> for Module<T> {
fn make_payment(transactor: &T::AccountId, encoded_len: usize) -> Result {
let b = Self::free_balance(transactor);
let transaction_fee = Self::transaction_base_fee() + Self::transaction_byte_fee() * <T::Balance as As<usize>>::sa(encoded_len);
let transaction_fee = Self::transaction_base_fee() + Self::transaction_byte_fee() * <T::Balance as As<u64>>::sa(encoded_len as u64);
if b < transaction_fee {
return Err("not enough funds for transaction fee");
}
+13
View File
@@ -0,0 +1,13 @@
[package]
name = "substrate-state-db"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
parking_lot = "0.5"
log = "0.4"
substrate-primitives = { path = "../../substrate/primitives" }
substrate-codec = { path = "../../substrate/codec" }
[dev-dependencies]
env_logger = "0.4"
+354
View File
@@ -0,0 +1,354 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! State database maintenance. Handles finalization and pruning in the database. The input to
//! this module is a `ChangeSet` which is basicall a list of key-value pairs (trie nodes) that
//! were added or deleted during block execution.
//!
//! # Finalization.
//! Finalization window tracks a tree of blocks identified by header hash. The in-memory
//! overlay allows to get any node that was was inserted in any any of the blocks within the window.
//! The tree is journaled to the backing database and rebuilt on startup.
//! Finalization function select one root from the top of the tree and discards all other roots and
//! their subtrees.
//!
//! # Pruning.
//! See `RefWindow` for pruning algorithm details. `StateDb` prunes on each finalization until pruning
//! constraints are satisfied.
//!
#[macro_use] extern crate log;
extern crate parking_lot;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
mod unfinalized;
mod pruning;
#[cfg(test)] mod test;
use std::fmt;
use parking_lot::RwLock;
use codec::Slicable;
use std::collections::HashSet;
use unfinalized::UnfinalizedOverlay;
use pruning::RefWindow;
/// Database value type.
pub type DBValue = Vec<u8>;
/// Basic set of requirements for the Block hash and node key types.
pub trait Hash: Send + Sync + Sized + Eq + PartialEq + Clone + Default + fmt::Debug + Slicable + std::hash::Hash + 'static {}
impl<T: Send + Sync + Sized + Eq + PartialEq + Clone + Default + fmt::Debug + Slicable + std::hash::Hash + 'static> Hash for T {}
/// Backend database trait. Read-only.
pub trait MetaDb {
type Error: fmt::Debug;
/// Get meta value, such as the journal.
fn get_meta(&self, key: &[u8]) -> Result<Option<DBValue>, Self::Error>;
}
/// Backend database trait. Read-only.
pub trait HashDb {
type Hash: Hash;
type Error: fmt::Debug;
/// Get state trie node.
fn get(&self, key: &Self::Hash) -> Result<Option<DBValue>, Self::Error>;
}
/// Error type.
/// Error type.
pub enum Error<E: fmt::Debug> {
/// Database backend error.
Db(E),
/// `Slicable` decoding error.
Decoding,
}
impl<E: fmt::Debug> fmt::Debug for Error<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match self {
Error::Db(e) => e.fmt(f),
Error::Decoding => write!(f, "Error decoding slicable value"),
}
}
}
/// A set of state node changes.
#[derive(Default, Debug, Clone)]
pub struct ChangeSet<H: Hash> {
/// Inserted nodes.
pub inserted: Vec<(H, DBValue)>,
/// Delted nodes.
pub deleted: Vec<H>,
}
/// A set of changes to the backing database.
#[derive(Default, Debug, Clone)]
pub struct CommitSet<H: Hash> {
/// State node changes.
pub data: ChangeSet<H>,
/// Metadata changes.
pub meta: ChangeSet<Vec<u8>>,
}
/// Pruning contraints. If none are specified pruning is
#[derive(Default, Debug, Clone)]
pub struct Constraints {
/// Maximum blocks. Defaults to 0 when unspecified, effectively keeping only unfinalized states.
pub max_blocks: Option<u32>,
/// Maximum memory in the pruning overlay.
pub max_mem: Option<usize>,
}
/// Pruning mode.
#[derive(Debug, Clone)]
pub enum PruningMode {
/// Maintain a pruning window.
Constrained(Constraints),
/// No pruning. Finalization is a no-op.
ArchiveAll,
/// Finalization discards unfinalized nodes. All the finalized nodes are kept in the DB.
ArchiveCanonical,
}
impl PruningMode {
/// Create a mode that keeps given number of blocks.
pub fn keep_blocks(n: u32) -> PruningMode {
PruningMode::Constrained(Constraints {
max_blocks: Some(n),
max_mem: None,
})
}
}
fn to_meta_key<S: Slicable>(suffix: &[u8], data: &S) -> Vec<u8> {
let mut buffer = data.encode();
buffer.extend(suffix);
buffer
}
struct StateDbSync<BlockHash: Hash, Key: Hash> {
mode: PruningMode,
unfinalized: UnfinalizedOverlay<BlockHash, Key>,
pruning: Option<RefWindow<BlockHash, Key>>,
pinned: HashSet<BlockHash>,
}
impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
pub fn new<D: MetaDb>(mode: PruningMode, db: &D) -> Result<StateDbSync<BlockHash, Key>, Error<D::Error>> {
trace!("StateDb settings: {:?}", mode);
let unfinalized: UnfinalizedOverlay<BlockHash, Key> = UnfinalizedOverlay::new(db)?;
let pruning: Option<RefWindow<BlockHash, Key>> = match mode {
PruningMode::Constrained(Constraints {
max_mem: Some(_),
..
}) => unimplemented!(),
PruningMode::Constrained(_) => Some(RefWindow::new(db)?),
PruningMode::ArchiveAll | PruningMode::ArchiveCanonical => None,
};
Ok(StateDbSync {
mode,
unfinalized,
pruning: pruning,
pinned: Default::default(),
})
}
pub fn insert_block(&mut self, hash: &BlockHash, number: u64, parent_hash: &BlockHash, mut changeset: ChangeSet<Key>) -> CommitSet<Key> {
if number == 0 {
return CommitSet {
data: changeset,
meta: Default::default(),
}
}
match self.mode {
PruningMode::ArchiveAll => {
changeset.deleted.clear();
// write changes immediatelly
CommitSet {
data: changeset,
meta: Default::default(),
}
},
PruningMode::Constrained(_) | PruningMode::ArchiveCanonical => {
self.unfinalized.insert(hash, number, parent_hash, changeset)
}
}
}
pub fn finalize_block(&mut self, hash: &BlockHash) -> CommitSet<Key> {
let mut commit = match self.mode {
PruningMode::ArchiveAll => {
CommitSet::default()
},
PruningMode::ArchiveCanonical => {
let mut commit = self.unfinalized.finalize(hash);
commit.data.deleted.clear();
commit
},
PruningMode::Constrained(_) => {
self.unfinalized.finalize(hash)
},
};
if let Some(ref mut pruning) = self.pruning {
pruning.note_finalized(hash, &mut commit);
}
self.prune(&mut commit);
commit
}
fn prune(&mut self, commit: &mut CommitSet<Key>) {
if let (&mut Some(ref mut pruning), &PruningMode::Constrained(ref constraints)) = (&mut self.pruning, &self.mode) {
loop {
if pruning.window_size() <= constraints.max_blocks.unwrap_or(0) as u64 {
break;
}
if constraints.max_mem.map_or(false, |m| pruning.mem_used() > m) {
break;
}
let pinned = &self.pinned;
if pruning.next_hash().map_or(false, |h| pinned.contains(&h)) {
break;
}
pruning.prune_one(commit);
}
}
}
pub fn pin(&mut self, hash: &BlockHash) {
self.pinned.insert(hash.clone());
}
pub fn unpin(&mut self, hash: &BlockHash) {
self.pinned.remove(hash);
}
pub fn get<D: HashDb<Hash=Key>>(&self, key: &Key, db: &D) -> Result<Option<DBValue>, Error<D::Error>> {
if let Some(value) = self.unfinalized.get(key) {
return Ok(Some(value));
}
db.get(key).map_err(|e| Error::Db(e))
}
}
/// State DB maintenance. See module description.
/// Can be shared across threads.
pub struct StateDb<BlockHash: Hash, Key: Hash> {
db: RwLock<StateDbSync<BlockHash, Key>>,
}
impl<BlockHash: Hash, Key: Hash> StateDb<BlockHash, Key> {
/// Creates a new instance. Does not expect any metadata in the database.
pub fn new<D: MetaDb>(mode: PruningMode, db: &D) -> Result<StateDb<BlockHash, Key>, Error<D::Error>> {
Ok(StateDb {
db: RwLock::new(StateDbSync::new(mode, db)?)
})
}
/// Add a new unfinalized block.
pub fn insert_block(&self, hash: &BlockHash, number: u64, parent_hash: &BlockHash, changeset: ChangeSet<Key>) -> CommitSet<Key> {
self.db.write().insert_block(hash, number, parent_hash, changeset)
}
/// Finalize a previously inserted block.
pub fn finalize_block(&self, hash: &BlockHash) -> CommitSet<Key> {
self.db.write().finalize_block(hash)
}
/// Prevents pruning of specified block and its descendants.
pub fn pin(&self, hash: &BlockHash) {
self.db.write().pin(hash)
}
/// Allows pruning of specified block.
pub fn unpin(&self, hash: &BlockHash) {
self.db.write().unpin(hash)
}
/// Get a value from unfinalized/pruning overlay or the backing DB.
pub fn get<D: HashDb<Hash=Key>>(&self, key: &Key, db: &D) -> Result<Option<DBValue>, Error<D::Error>> {
self.db.read().get(key, db)
}
}
#[cfg(test)]
mod tests {
use primitives::H256;
use {StateDb, PruningMode, Constraints};
use test::{make_db, make_changeset, TestDb};
fn make_test_db(settings: PruningMode) -> (TestDb, StateDb<H256, H256>) {
let mut db = make_db(&[91, 921, 922, 93, 94]);
let state_db = StateDb::new(settings, &db).unwrap();
db.commit(&state_db.insert_block(&H256::from(1), 1, &H256::from(0), make_changeset(&[1], &[91])));
db.commit(&state_db.insert_block(&H256::from(21), 2, &H256::from(1), make_changeset(&[21], &[921, 1])));
db.commit(&state_db.insert_block(&H256::from(22), 2, &H256::from(1), make_changeset(&[22], &[922])));
db.commit(&state_db.insert_block(&H256::from(3), 3, &H256::from(21), make_changeset(&[3], &[93])));
db.commit(&state_db.finalize_block(&H256::from(1)));
db.commit(&state_db.insert_block(&H256::from(4), 4, &H256::from(3), make_changeset(&[4], &[94])));
db.commit(&state_db.finalize_block(&H256::from(21)));
db.commit(&state_db.finalize_block(&H256::from(3)));
(db, state_db)
}
#[test]
fn full_archive_keeps_everything() {
let (db, _) = make_test_db(PruningMode::ArchiveAll);
assert!(db.data_eq(&make_db(&[1, 21, 22, 3, 4, 91, 921, 922, 93, 94])));
}
#[test]
fn canonical_archive_keeps_canonical() {
let (db, _) = make_test_db(PruningMode::ArchiveCanonical);
assert!(db.data_eq(&make_db(&[1, 21, 3, 91, 921, 922, 93, 94])));
}
#[test]
fn prune_window_0() {
let (db, _) = make_test_db(PruningMode::Constrained(Constraints {
max_blocks: Some(0),
max_mem: None,
}));
assert!(db.data_eq(&make_db(&[21, 3, 922, 94])));
}
#[test]
fn prune_window_1() {
let (db, _) = make_test_db(PruningMode::Constrained(Constraints {
max_blocks: Some(1),
max_mem: None,
}));
assert!(db.data_eq(&make_db(&[21, 3, 922, 93, 94])));
}
#[test]
fn prune_window_2() {
let (db, _) = make_test_db(PruningMode::Constrained(Constraints {
max_blocks: Some(2),
max_mem: None,
}));
assert!(db.data_eq(&make_db(&[1, 21, 3, 921, 922, 93, 94])));
}
}
+280
View File
@@ -0,0 +1,280 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Pruning window.
//!
//! For each block we maintain a list of nodes pending deletion.
//! There is also a global index of node key to block number.
//! If a node is re-inserted into the window it gets removed from
//! the death list.
//! The changes are journaled in the DB.
use std::collections::{HashMap, HashSet, VecDeque};
use codec::{Slicable, self};
use {CommitSet, Error, MetaDb, to_meta_key, Hash};
const LAST_PRUNED: &[u8] = b"last_pruned";
const PRUNING_JOURNAL: &[u8] = b"pruning_journal";
/// See module documentation.
pub struct RefWindow<BlockHash: Hash, Key: Hash> {
death_rows: VecDeque<DeathRow<BlockHash, Key>>,
death_index: HashMap<Key, u64>,
pending_number: u64,
}
#[derive(Debug, PartialEq, Eq)]
struct DeathRow<BlockHash: Hash, Key: Hash> {
hash: BlockHash,
journal_key: Vec<u8>,
deleted: HashSet<Key>,
}
struct JournalRecord<BlockHash: Hash, Key: Hash> {
hash: BlockHash,
inserted: Vec<Key>,
deleted: Vec<Key>,
}
impl<BlockHash: Hash, Key: Hash> Slicable for JournalRecord<BlockHash, Key> {
fn encode(&self) -> Vec<u8> {
let mut v = Vec::with_capacity(4096);
self.hash.using_encoded(|s| v.extend(s));
self.inserted.using_encoded(|s| v.extend(s));
self.deleted.using_encoded(|s| v.extend(s));
v
}
fn decode<I: codec::Input>(input: &mut I) -> Option<Self> {
Some(JournalRecord {
hash: Slicable::decode(input)?,
inserted: Slicable::decode(input)?,
deleted: Slicable::decode(input)?,
})
}
}
fn to_journal_key(block: u64) -> Vec<u8> {
to_meta_key(PRUNING_JOURNAL, &block)
}
impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> {
pub fn new<D: MetaDb>(db: &D) -> Result<RefWindow<BlockHash, Key>, Error<D::Error>> {
let last_pruned = db.get_meta(&to_meta_key(LAST_PRUNED, &()))
.map_err(|e| Error::Db(e))?;
let pending_number: u64 = match last_pruned {
Some(buffer) => u64::decode(&mut buffer.as_slice()).ok_or(Error::Decoding)? + 1,
None => 1,
};
let mut block = pending_number;
let mut pruning = RefWindow {
death_rows: Default::default(),
death_index: Default::default(),
pending_number: pending_number,
};
// read the journal
trace!(target: "state-db", "Reading pruning journal. Last pruned #{}", pending_number - 1);
loop {
let journal_key = to_journal_key(block);
match db.get_meta(&journal_key).map_err(|e| Error::Db(e))? {
Some(record) => {
let record: JournalRecord<BlockHash, Key> = Slicable::decode(&mut record.as_slice()).ok_or(Error::Decoding)?;
trace!(target: "state-db", "Pruning journal entry {} ({} inserted, {} deleted)", block, record.inserted.len(), record.deleted.len());
pruning.import(&record.hash, journal_key, record.inserted.into_iter(), record.deleted);
},
None => break,
}
block += 1;
}
Ok(pruning)
}
fn import<I: IntoIterator<Item=Key>>(&mut self, hash: &BlockHash, journal_key: Vec<u8>, inserted: I, deleted: Vec<Key>) {
// remove all re-inserted keys from death rows
for k in inserted {
if let Some(block) = self.death_index.remove(&k) {
self.death_rows[(block - self.pending_number) as usize].deleted.remove(&k);
}
}
// add new keys
let imported_block = self.pending_number + self.death_rows.len() as u64;
for k in deleted.iter() {
self.death_index.insert(k.clone(), imported_block);
}
self.death_rows.push_back(
DeathRow {
hash: hash.clone(),
deleted: deleted.into_iter().collect(),
journal_key: journal_key,
}
);
}
pub fn window_size(&self) -> u64 {
self.death_rows.len() as u64
}
pub fn next_hash(&self) -> Option<BlockHash> {
self.death_rows.front().map(|r| r.hash.clone())
}
pub fn mem_used(&self) -> usize {
0
}
/// Prune next block. Expects at least one block in the window. Adds changes to `commit`.
pub fn prune_one(&mut self, commit: &mut CommitSet<Key>) {
let pruned = self.death_rows.pop_front().expect("prune_one is only called with a non-empty window");
trace!(target: "state-db", "Pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len());
for k in pruned.deleted.iter() {
self.death_index.remove(&k);
}
commit.data.deleted.extend(pruned.deleted.into_iter());
commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), self.pending_number.encode()));
commit.meta.deleted.push(pruned.journal_key);
self.pending_number += 1;
}
/// Add a change set to the window. Creates a journal record and pushes it to `commit`
pub fn note_finalized(&mut self, hash: &BlockHash, commit: &mut CommitSet<Key>) {
trace!(target: "state-db", "Adding to pruning window: {:?} ({} inserted, {} deleted)", hash, commit.data.inserted.len(), commit.data.deleted.len());
let inserted = commit.data.inserted.iter().map(|(k, _)| k.clone()).collect();
let deleted = ::std::mem::replace(&mut commit.data.deleted, Vec::new());
let journal_record = JournalRecord {
hash: hash.clone(),
inserted,
deleted,
};
let block = self.pending_number + self.window_size();
let journal_key = to_journal_key(block);
commit.meta.inserted.push((journal_key.clone(), journal_record.encode()));
self.import(hash, journal_key, journal_record.inserted.into_iter(), journal_record.deleted);
}
}
#[cfg(test)]
mod tests {
use super::RefWindow;
use primitives::H256;
use {CommitSet};
use test::{make_db, make_commit, TestDb};
fn check_journal(pruning: &RefWindow<H256, H256>, db: &TestDb) {
let restored: RefWindow<H256, H256> = RefWindow::new(db).unwrap();
assert_eq!(pruning.pending_number, restored.pending_number);
assert_eq!(pruning.death_rows, restored.death_rows);
assert_eq!(pruning.death_index, restored.death_index);
}
#[test]
fn created_from_empty_db() {
let db = make_db(&[]);
let pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
assert_eq!(pruning.pending_number, 1);
assert!(pruning.death_rows.is_empty());
assert!(pruning.death_index.is_empty());
}
#[test]
#[should_panic]
fn prune_empty_panics() {
let db = make_db(&[]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
}
#[test]
fn prune_one() {
let mut db = make_db(&[1, 2, 3]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut commit = make_commit(&[4, 5], &[1, 3]);
let h = H256::random();
pruning.note_finalized(&h, &mut commit);
db.commit(&commit);
assert!(commit.data.deleted.is_empty());
assert_eq!(pruning.death_rows.len(), 1);
assert_eq!(pruning.death_index.len(), 2);
assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5])));
check_journal(&pruning, &db);
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[2, 4, 5])));
assert!(pruning.death_rows.is_empty());
assert!(pruning.death_index.is_empty());
assert_eq!(pruning.pending_number, 2);
}
#[test]
fn prune_two() {
let mut db = make_db(&[1, 2, 3]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut commit = make_commit(&[4], &[1]);
pruning.note_finalized(&H256::random(), &mut commit);
db.commit(&commit);
let mut commit = make_commit(&[5], &[2]);
pruning.note_finalized(&H256::random(), &mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5])));
check_journal(&pruning, &db);
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[2, 3, 4, 5])));
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[3, 4, 5])));
assert_eq!(pruning.pending_number, 3);
}
#[test]
fn reinserted_survives() {
let mut db = make_db(&[1, 2, 3]);
let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap();
let mut commit = make_commit(&[], &[2]);
pruning.note_finalized(&H256::random(), &mut commit);
db.commit(&commit);
let mut commit = make_commit(&[2], &[]);
pruning.note_finalized(&H256::random(), &mut commit);
db.commit(&commit);
let mut commit = make_commit(&[], &[2]);
pruning.note_finalized(&H256::random(), &mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[1, 2, 3])));
check_journal(&pruning, &db);
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[1, 2, 3])));
let mut commit = CommitSet::default();
pruning.prune_one(&mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[1, 2, 3])));
pruning.prune_one(&mut commit);
db.commit(&commit);
assert!(db.data_eq(&make_db(&[1, 3])));
assert_eq!(pruning.pending_number, 4);
}
}
+83
View File
@@ -0,0 +1,83 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Test utils
use std::collections::HashMap;
use primitives::H256;
use {DBValue, ChangeSet, CommitSet, MetaDb, HashDb};
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct TestDb {
pub data: HashMap<H256, DBValue>,
pub meta: HashMap<Vec<u8>, DBValue>,
}
impl MetaDb for TestDb {
type Error = ();
fn get_meta(&self, key: &[u8]) -> Result<Option<DBValue>, ()> {
Ok(self.meta.get(key).cloned())
}
}
impl HashDb for TestDb {
type Error = ();
type Hash = H256;
fn get(&self, key: &H256) -> Result<Option<DBValue>, ()> {
Ok(self.data.get(key).cloned())
}
}
impl TestDb {
pub fn commit(&mut self, commit: &CommitSet<H256>) {
self.data.extend(commit.data.inserted.iter().cloned());
for k in commit.data.deleted.iter() {
self.data.remove(k);
}
self.meta.extend(commit.meta.inserted.iter().cloned());
for k in commit.meta.deleted.iter() {
self.meta.remove(k);
}
}
pub fn data_eq(&self, other: &TestDb) -> bool {
self.data == other.data
}
}
pub fn make_changeset(inserted: &[u64], deleted: &[u64]) -> ChangeSet<H256> {
ChangeSet {
inserted: inserted.iter().map(|v| (H256::from(*v), H256::from(*v).to_vec())).collect(),
deleted: deleted.iter().map(|v| H256::from(*v)).collect(),
}
}
pub fn make_commit(inserted: &[u64], deleted: &[u64]) -> CommitSet<H256> {
CommitSet {
data: make_changeset(inserted, deleted),
meta: ChangeSet::default(),
}
}
pub fn make_db(inserted: &[u64]) -> TestDb {
TestDb {
data: inserted.iter().map(|v| (H256::from(*v), H256::from(*v).to_vec())).collect(),
meta: Default::default(),
}
}
@@ -0,0 +1,475 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Finalization window.
//! Maintains trees of block overlays and allows discarding trees/roots
use std::collections::{HashMap, VecDeque};
use super::{Error, DBValue, ChangeSet, CommitSet, MetaDb, Hash, to_meta_key};
use codec::{self, Slicable};
const UNFINALIZED_JOURNAL: &[u8] = b"unfinalized_journal";
const LAST_FINALIZED: &[u8] = b"last_finalized";
/// See module documentation.
pub struct UnfinalizedOverlay<BlockHash: Hash, Key: Hash> {
last_finalized: Option<(BlockHash, u64)>,
levels: VecDeque<Vec<BlockOverlay<BlockHash, Key>>>,
parents: HashMap<BlockHash, BlockHash>,
}
struct JournalRecord<BlockHash: Hash, Key: Hash> {
hash: BlockHash,
parent_hash: BlockHash,
inserted: Vec<(Key, DBValue)>,
deleted: Vec<Key>,
}
impl<BlockHash: Hash, Key: Hash> Slicable for JournalRecord<BlockHash, Key> {
fn encode(&self) -> Vec<u8> {
let mut v = Vec::with_capacity(4096);
self.hash.using_encoded(|s| v.extend(s));
self.parent_hash.using_encoded(|s| v.extend(s));
self.inserted.using_encoded(|s| v.extend(s));
self.deleted.using_encoded(|s| v.extend(s));
v
}
fn decode<I: codec::Input>(input: &mut I) -> Option<Self> {
Some(JournalRecord {
hash: Slicable::decode(input)?,
parent_hash: Slicable::decode(input)?,
inserted: Slicable::decode(input)?,
deleted: Slicable::decode(input)?,
})
}
}
fn to_journal_key(block: u64, index: u64) -> Vec<u8> {
to_meta_key(UNFINALIZED_JOURNAL, &(block, index))
}
#[cfg_attr(test, derive(PartialEq, Debug))]
struct BlockOverlay<BlockHash: Hash, Key: Hash> {
hash: BlockHash,
journal_key: Vec<u8>,
values: HashMap<Key, DBValue>,
deleted: Vec<Key>,
}
impl<BlockHash: Hash, Key: Hash> UnfinalizedOverlay<BlockHash, Key> {
/// Creates a new instance. Does not expect any metadata to be present in the DB.
pub fn new<D: MetaDb>(db: &D) -> Result<UnfinalizedOverlay<BlockHash, Key>, Error<D::Error>> {
let last_finalized = db.get_meta(&to_meta_key(LAST_FINALIZED, &()))
.map_err(|e| Error::Db(e))?;
let last_finalized = match last_finalized {
Some(buffer) => Some(<(BlockHash, u64)>::decode(&mut buffer.as_slice()).ok_or(Error::Decoding)?),
None => None,
};
let mut levels = VecDeque::new();
let mut parents = HashMap::new();
if let Some((ref hash, mut block)) = last_finalized {
// read the journal
trace!(target: "state-db", "Reading unfinalized journal. Last finalized #{} ({:?})", block, hash);
let mut total: u64 = 0;
block += 1;
loop {
let mut index: u64 = 0;
let mut level = Vec::new();
loop {
let journal_key = to_journal_key(block, index);
match db.get_meta(&journal_key).map_err(|e| Error::Db(e))? {
Some(record) => {
let record: JournalRecord<BlockHash, Key> = Slicable::decode(&mut record.as_slice()).ok_or(Error::Decoding)?;
let overlay = BlockOverlay {
hash: record.hash.clone(),
journal_key,
values: record.inserted.into_iter().collect(),
deleted: record.deleted,
};
trace!(target: "state-db", "Unfinalized journal entry {}.{} ({} inserted, {} deleted)", block, index, overlay.values.len(), overlay.deleted.len());
level.push(overlay);
parents.insert(record.hash, record.parent_hash);
index += 1;
total += 1;
},
None => break,
}
}
if level.is_empty() {
break;
}
levels.push_back(level);
block += 1;
}
trace!(target: "state-db", "Finished reading unfinalized journal, {} entries", total);
}
Ok(UnfinalizedOverlay {
last_finalized: last_finalized,
levels,
parents,
})
}
/// Insert a new block into the overlay. If inserted on the second level or lover expects parent to be present in the window.
pub fn insert(&mut self, hash: &BlockHash, number: u64, parent_hash: &BlockHash, changeset: ChangeSet<Key>) -> CommitSet<Key> {
let mut commit = CommitSet::default();
if self.levels.is_empty() && self.last_finalized.is_none() {
// assume that parent was finalized
let last_finalized = (parent_hash.clone(), number - 1);
commit.meta.inserted.push((to_meta_key(LAST_FINALIZED, &()), last_finalized.encode()));
self.last_finalized = Some(last_finalized);
} else if self.last_finalized.is_some() {
assert!(number >= self.front_block_number() && number < (self.front_block_number() + self.levels.len() as u64 + 1));
// check for valid parent if inserting on second level or higher
if number == self.front_block_number() {
assert!(self.last_finalized.as_ref().map_or(false, |&(ref h, n)| h == parent_hash && n == number - 1));
} else {
assert!(self.parents.contains_key(&parent_hash));
}
}
let level = if self.levels.is_empty() || number == self.front_block_number() + self.levels.len() as u64 {
self.levels.push_back(Vec::new());
self.levels.back_mut().expect("can't be empty after insertion; qed")
} else {
let front_block_number = self.front_block_number();
self.levels.get_mut((number - front_block_number) as usize)
.expect("number is [front_block_number .. front_block_number + levels.len()) is asserted in precondition; qed")
};
let index = level.len() as u64;
let journal_key = to_journal_key(number, index);
let overlay = BlockOverlay {
hash: hash.clone(),
journal_key: journal_key.clone(),
values: changeset.inserted.iter().cloned().collect(),
deleted: changeset.deleted.clone(),
};
level.push(overlay);
self.parents.insert(hash.clone(), parent_hash.clone());
let journal_record = JournalRecord {
hash: hash.clone(),
parent_hash: parent_hash.clone(),
inserted: changeset.inserted,
deleted: changeset.deleted,
};
trace!(target: "state-db", "Inserted unfinalized changeset {}.{} ({} inserted, {} deleted)", number, index, journal_record.inserted.len(), journal_record.deleted.len());
let journal_record = journal_record.encode();
commit.meta.inserted.push((journal_key, journal_record));
commit
}
fn discard(
levels: &mut [Vec<BlockOverlay<BlockHash, Key>>],
parents: &mut HashMap<BlockHash, BlockHash>,
discarded_journals: &mut Vec<Vec<u8>>,
number: u64,
hash: &BlockHash,
) {
if let Some((level, sublevels)) = levels.split_first_mut() {
level.retain(|ref overlay| {
let parent = parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone();
if parent == *hash {
parents.remove(&overlay.hash);
discarded_journals.push(overlay.journal_key.clone());
Self::discard(sublevels, parents, discarded_journals, number + 1, &overlay.hash);
false
} else {
true
}
});
}
}
fn front_block_number(&self) -> u64 {
self.last_finalized.as_ref().map(|&(_, n)| n + 1).unwrap_or(0)
}
/// Select a top-level root and finalized it. Discards all sibling subtrees and the root.
/// Returns a set of changes that need to be added to the DB.
pub fn finalize(&mut self, hash: &BlockHash) -> CommitSet<Key> {
trace!(target: "state-db", "Finalizing {:?}", hash);
let level = self.levels.pop_front().expect("no blocks to finalize");
let index = level.iter().position(|overlay| overlay.hash == *hash)
.expect("attempting to finalize unknown block");
let mut commit = CommitSet::default();
let mut discarded_journals = Vec::new();
for (i, overlay) in level.into_iter().enumerate() {
self.parents.remove(&overlay.hash);
if i == index {
// that's the one we need to finalize
commit.data.inserted = overlay.values.into_iter().collect();
commit.data.deleted = overlay.deleted;
} else {
// TODO: borrow checker won't allow us to split out mutable refernces
// required for recursive processing. A more efficient implementaion
// that does not require converting to vector is possible
let mut vec: Vec<_> = self.levels.drain(..).collect();
Self::discard(&mut vec, &mut self.parents, &mut discarded_journals, 0, &overlay.hash);
self.levels.extend(vec.into_iter());
}
// cleanup journal entry
discarded_journals.push(overlay.journal_key);
}
commit.meta.deleted.append(&mut discarded_journals);
let last_finalized = (hash.clone(), self.front_block_number());
commit.meta.inserted.push((to_meta_key(LAST_FINALIZED, &()), last_finalized.encode()));
self.last_finalized = Some(last_finalized);
trace!(target: "state-db", "Discarded {} records", commit.meta.deleted.len());
commit
}
/// Get a value from the node overlay. This searches in every existing changeset.
pub fn get(&self, key: &Key) -> Option<DBValue> {
for level in self.levels.iter() {
for overlay in level.iter() {
if let Some(value) = overlay.values.get(&key) {
return Some(value.clone());
}
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::UnfinalizedOverlay;
use {ChangeSet};
use primitives::H256;
use test::{make_db, make_changeset};
fn contains(overlay: &UnfinalizedOverlay<H256, H256>, key: u64) -> bool {
overlay.get(&H256::from(key)) == Some(H256::from(key).to_vec())
}
#[test]
fn created_from_empty_db() {
let db = make_db(&[]);
let overlay: UnfinalizedOverlay<H256, H256> = UnfinalizedOverlay::new(&db).unwrap();
assert_eq!(overlay.last_finalized, None);
assert!(overlay.levels.is_empty());
assert!(overlay.parents.is_empty());
}
#[test]
#[should_panic]
fn finalize_empty_panics() {
let db = make_db(&[]);
let mut overlay = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
overlay.finalize(&H256::default());
}
#[test]
#[should_panic]
fn insert_ahead_panics() {
let db = make_db(&[]);
let h1 = H256::random();
let h2 = H256::random();
let mut overlay = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
overlay.insert(&h1, 2, &H256::default(), ChangeSet::default());
overlay.insert(&h2, 1, &h1, ChangeSet::default());
}
#[test]
#[should_panic]
fn insert_behind_panics() {
let h1 = H256::random();
let h2 = H256::random();
let db = make_db(&[]);
let mut overlay = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
overlay.insert(&h1, 1, &H256::default(), ChangeSet::default());
overlay.insert(&h2, 3, &h1, ChangeSet::default());
}
#[test]
#[should_panic]
fn insert_unknown_parent_panics() {
let db = make_db(&[]);
let h1 = H256::random();
let h2 = H256::random();
let mut overlay = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
overlay.insert(&h1, 1, &H256::default(), ChangeSet::default());
overlay.insert(&h2, 2, &H256::default(), ChangeSet::default());
}
#[test]
#[should_panic]
fn finalize_unknown_panics() {
let h1 = H256::random();
let h2 = H256::random();
let db = make_db(&[]);
let mut overlay = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
overlay.insert(&h1, 1, &H256::default(), ChangeSet::default());
overlay.finalize(&h2);
}
#[test]
fn insert_finalize_one() {
let h1 = H256::random();
let mut db = make_db(&[1, 2]);
let mut overlay = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
let changeset = make_changeset(&[3, 4], &[2]);
let insertion = overlay.insert(&h1, 1, &H256::default(), changeset.clone());
assert_eq!(insertion.data.inserted.len(), 0);
assert_eq!(insertion.data.deleted.len(), 0);
assert_eq!(insertion.meta.inserted.len(), 2);
assert_eq!(insertion.meta.deleted.len(), 0);
db.commit(&insertion);
let finalization = overlay.finalize(&h1);
assert_eq!(finalization.data.inserted.len(), changeset.inserted.len());
assert_eq!(finalization.data.deleted.len(), changeset.deleted.len());
assert_eq!(finalization.meta.inserted.len(), 1);
assert_eq!(finalization.meta.deleted.len(), 1);
db.commit(&finalization);
assert!(db.data_eq(&make_db(&[1, 3, 4])));
}
#[test]
fn restore_from_journal() {
let h1 = H256::random();
let h2 = H256::random();
let mut db = make_db(&[1, 2]);
let mut overlay = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
db.commit(&overlay.insert(&h1, 10, &H256::default(), make_changeset(&[3, 4], &[2])));
db.commit(&overlay.insert(&h2, 11, &h1, make_changeset(&[5], &[3])));
assert_eq!(db.meta.len(), 3);
let overlay2 = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
assert_eq!(overlay.levels, overlay2.levels);
assert_eq!(overlay.parents, overlay2.parents);
assert_eq!(overlay.last_finalized, overlay2.last_finalized);
}
#[test]
fn insert_finalize_two() {
let h1 = H256::random();
let h2 = H256::random();
let mut db = make_db(&[1, 2, 3, 4]);
let mut overlay = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
let changeset1 = make_changeset(&[5, 6], &[2]);
let changeset2 = make_changeset(&[7, 8], &[5, 3]);
db.commit(&overlay.insert(&h1, 1, &H256::default(), changeset1));
assert!(contains(&overlay, 5));
db.commit(&overlay.insert(&h2, 2, &h1, changeset2));
assert!(contains(&overlay, 7));
assert!(contains(&overlay, 5));
assert_eq!(overlay.levels.len(), 2);
assert_eq!(overlay.parents.len(), 2);
db.commit(&overlay.finalize(&h1));
assert_eq!(overlay.levels.len(), 1);
assert_eq!(overlay.parents.len(), 1);
assert!(!contains(&overlay, 5));
assert!(contains(&overlay, 7));
db.commit(&overlay.finalize(&h2));
assert_eq!(overlay.levels.len(), 0);
assert_eq!(overlay.parents.len(), 0);
assert!(db.data_eq(&make_db(&[1, 4, 6, 7, 8])));
}
#[test]
fn complex_tree() {
let mut db = make_db(&[]);
// - 1 - 1_1 - 1_1_1
// \ 1_2 - 1_2_1
// \ 1_2_2
// \ 1_2_3
//
// - 2 - 2_1 - 2_1_1
// \ 2_2
//
// 1_2_2 is the winner
let (h_1, c_1) = (H256::random(), make_changeset(&[1], &[]));
let (h_2, c_2) = (H256::random(), make_changeset(&[2], &[]));
let (h_1_1, c_1_1) = (H256::random(), make_changeset(&[11], &[]));
let (h_1_2, c_1_2) = (H256::random(), make_changeset(&[12], &[]));
let (h_2_1, c_2_1) = (H256::random(), make_changeset(&[21], &[]));
let (h_2_2, c_2_2) = (H256::random(), make_changeset(&[22], &[]));
let (h_1_1_1, c_1_1_1) = (H256::random(), make_changeset(&[111], &[]));
let (h_1_2_1, c_1_2_1) = (H256::random(), make_changeset(&[121], &[]));
let (h_1_2_2, c_1_2_2) = (H256::random(), make_changeset(&[122], &[]));
let (h_1_2_3, c_1_2_3) = (H256::random(), make_changeset(&[123], &[]));
let (h_2_1_1, c_2_1_1) = (H256::random(), make_changeset(&[211], &[]));
let mut overlay = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
db.commit(&overlay.insert(&h_1, 1, &H256::default(), c_1));
db.commit(&overlay.insert(&h_1_1, 2, &h_1, c_1_1));
db.commit(&overlay.insert(&h_1_2, 2, &h_1, c_1_2));
db.commit(&overlay.insert(&h_2, 1, &H256::default(), c_2));
db.commit(&overlay.insert(&h_2_1, 2, &h_2, c_2_1));
db.commit(&overlay.insert(&h_2_2, 2, &h_2, c_2_2));
db.commit(&overlay.insert(&h_1_1_1, 3, &h_1_1, c_1_1_1));
db.commit(&overlay.insert(&h_1_2_1, 3, &h_1_2, c_1_2_1));
db.commit(&overlay.insert(&h_1_2_2, 3, &h_1_2, c_1_2_2));
db.commit(&overlay.insert(&h_1_2_3, 3, &h_1_2, c_1_2_3));
db.commit(&overlay.insert(&h_2_1_1, 3, &h_2_1, c_2_1_1));
assert!(contains(&overlay, 2));
assert!(contains(&overlay, 11));
assert!(contains(&overlay, 21));
assert!(contains(&overlay, 111));
assert!(contains(&overlay, 122));
assert!(contains(&overlay, 211));
assert_eq!(overlay.levels.len(), 3);
assert_eq!(overlay.parents.len(), 11);
assert_eq!(overlay.last_finalized, Some((H256::default(), 0)));
// check if restoration from journal results in the same tree
let overlay2 = UnfinalizedOverlay::<H256, H256>::new(&db).unwrap();
assert_eq!(overlay.levels, overlay2.levels);
assert_eq!(overlay.parents, overlay2.parents);
assert_eq!(overlay.last_finalized, overlay2.last_finalized);
// finalize 1. 2 and all its children should be discarded
db.commit(&overlay.finalize(&h_1));
assert_eq!(overlay.levels.len(), 2);
assert_eq!(overlay.parents.len(), 6);
assert!(!contains(&overlay, 1));
assert!(!contains(&overlay, 2));
assert!(!contains(&overlay, 21));
assert!(!contains(&overlay, 22));
assert!(!contains(&overlay, 211));
assert!(contains(&overlay, 111));
// finalize 1_2. 1_1 and all its children should be discarded
db.commit(&overlay.finalize(&h_1_2));
assert_eq!(overlay.levels.len(), 1);
assert_eq!(overlay.parents.len(), 3);
assert!(!contains(&overlay, 11));
assert!(!contains(&overlay, 111));
assert!(contains(&overlay, 121));
assert!(contains(&overlay, 122));
assert!(contains(&overlay, 123));
// finalize 1_2_2
db.commit(&overlay.finalize(&h_1_2_2));
assert_eq!(overlay.levels.len(), 0);
assert_eq!(overlay.parents.len(), 0);
assert!(db.data_eq(&make_db(&[1, 12, 122])));
assert_eq!(overlay.last_finalized, Some((h_1_2_2, 3)));
}
}
@@ -15,6 +15,5 @@ triehash = "0.1"
substrate-primitives = { path = "../primitives", version = "0.1.0" }
hashdb = { git = "https://github.com/paritytech/parity.git" }
kvdb = { git = "https://github.com/paritytech/parity.git" }
memorydb = { git = "https://github.com/paritytech/parity.git" }
patricia-trie = { git = "https://github.com/paritytech/parity.git" }
+1 -2
View File
@@ -25,7 +25,6 @@ extern crate hex_literal;
extern crate log;
extern crate ethereum_types;
extern crate kvdb;
extern crate hashdb;
extern crate memorydb;
extern crate triehash;
@@ -47,7 +46,7 @@ mod trie_backend;
pub use testing::TestExternalities;
pub use ext::Ext;
pub use backend::Backend;
pub use trie_backend::{TryIntoTrieBackend, TrieBackend};
pub use trie_backend::{TryIntoTrieBackend, TrieBackend, TrieH256, Storage, DBValue};
/// The overlayed changes to state to be queried on top of the backend.
///
@@ -18,12 +18,18 @@
use std::collections::HashMap;
use std::sync::Arc;
use ethereum_types::H256 as TrieH256;
use hashdb::{DBValue, HashDB};
use kvdb::KeyValueDB;
use hashdb::HashDB;
use memorydb::MemoryDB;
use patricia_trie::{TrieDB, TrieDBMut, TrieError, Trie, TrieMut};
use {Backend};
pub use ethereum_types::H256 as TrieH256;
pub use hashdb::DBValue;
/// Backend trie storage trait.
pub trait Storage: Send + Sync {
/// Get a trie node.
fn get(&self, key: &TrieH256) -> Result<Option<DBValue>, String>;
}
/// Try convert into trie-based backend.
pub trait TryIntoTrieBackend {
@@ -40,20 +46,20 @@ pub struct TrieBackend {
impl TrieBackend {
/// Create new trie-based backend.
pub fn with_kvdb(db: Arc<KeyValueDB>, storage_column: Option<u32>, root: TrieH256) -> Self {
pub fn with_storage(db: Arc<Storage>, root: TrieH256) -> Self {
TrieBackend {
storage: TrieBackendStorage::KeyValueDb(db, storage_column),
storage: TrieBackendStorage::Storage(db),
root,
}
}
/// Create new trie-based backend for genesis block.
pub fn with_kvdb_for_genesis(db: Arc<KeyValueDB>, storage_column: Option<u32>) -> Self {
pub fn with_storage_for_genesis(db: Arc<Storage>) -> Self {
let mut root = TrieH256::default();
let mut mdb = MemoryDB::default();
TrieDBMut::new(&mut mdb, &mut root);
Self::with_kvdb(db, storage_column, root)
Self::with_storage(db, root)
}
/// Create new trie-based backend backed by MemoryDb storage.
@@ -182,7 +188,7 @@ impl<'a> HashDB for Ephemeral<'a> {
Some(val)
}
}
None => match self.storage.get(&key.0[..]) {
None => match self.storage.get(&key) {
Ok(x) => x,
Err(e) => {
warn!(target: "trie", "Failed to read from DB: {}", e);
@@ -212,19 +218,19 @@ impl<'a> HashDB for Ephemeral<'a> {
#[derive(Clone)]
pub enum TrieBackendStorage {
/// Key value db + storage column.
KeyValueDb(Arc<KeyValueDB>, Option<u32>),
Storage(Arc<Storage>),
/// Hash db.
MemoryDb(MemoryDB),
}
impl TrieBackendStorage {
pub fn get(&self, key: &[u8]) -> Result<Option<DBValue>, String> {
pub fn get(&self, key: &TrieH256) -> Result<Option<DBValue>, String> {
match *self {
TrieBackendStorage::KeyValueDb(ref db, storage_column) =>
db.get(storage_column, key)
TrieBackendStorage::Storage(ref db) =>
db.get(key)
.map_err(|e| format!("Trie lookup error: {}", e)),
TrieBackendStorage::MemoryDb(ref db) =>
Ok(db.get(&TrieH256::from_slice(key))),
Ok(db.get(key)),
}
}
}