diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 452fdf733b..244ed07aee 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2135,6 +2135,7 @@ dependencies = [ "substrate-primitives 0.1.0", "substrate-runtime-primitives 0.1.0", "substrate-runtime-support 0.1.0", + "substrate-state-db 0.1.0", "substrate-state-machine 0.1.0", ] @@ -2547,6 +2548,17 @@ dependencies = [ "serde_json 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "substrate-state-db" +version = "0.1.0" +dependencies = [ + "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", + "substrate-codec 0.1.0", + "substrate-primitives 0.1.0", +] + [[package]] name = "substrate-state-machine" version = "0.1.0" @@ -2555,7 +2567,6 @@ dependencies = [ "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hashdb 0.1.1 (git+https://github.com/paritytech/parity.git)", "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "kvdb 0.1.0 (git+https://github.com/paritytech/parity.git)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "memorydb 0.1.1 (git+https://github.com/paritytech/parity.git)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/Cargo.toml b/substrate/Cargo.toml index 45acf2082a..4d730a0236 100644 --- a/substrate/Cargo.toml +++ b/substrate/Cargo.toml @@ -52,6 +52,7 @@ members = [ "substrate/runtime/system", "substrate/runtime/timestamp", "substrate/serializer", + "substrate/state-db", "substrate/state-machine", "substrate/test-runtime", "substrate/telemetry", diff --git a/substrate/polkadot/cli/src/cli.yml b/substrate/polkadot/cli/src/cli.yml index 49ccd36d5d..c63f59993b 100644 --- a/substrate/polkadot/cli/src/cli.yml +++ b/substrate/polkadot/cli/src/cli.yml @@ -75,6 +75,11 @@ args: value_name: CHAIN_SPEC help: Specify the chain specification (one of dev, local or poc-2) takes_value: true + - pruning: + long: pruning + value_name: PRUNING_MODE + help: Specify the pruning mode. (a number of blocks to keep or "archive"). Default is 256. + takes_value: true - name: long: name value_name: NAME diff --git a/substrate/polkadot/cli/src/error.rs b/substrate/polkadot/cli/src/error.rs index d7f6afca49..d7c690276c 100644 --- a/substrate/polkadot/cli/src/error.rs +++ b/substrate/polkadot/cli/src/error.rs @@ -28,5 +28,10 @@ error_chain! { Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"]; } errors { + /// Input error. + Input(m: String) { + description("Invalid input"), + display("{}", m), + } } } diff --git a/substrate/polkadot/cli/src/lib.rs b/substrate/polkadot/cli/src/lib.rs index ac4f266b94..577535035c 100644 --- a/substrate/polkadot/cli/src/lib.rs +++ b/substrate/polkadot/cli/src/lib.rs @@ -83,6 +83,7 @@ use polkadot_primitives::Block; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; use tokio_core::reactor; +use service::PruningMode; const DEFAULT_TELEMETRY_URL: &str = "ws://telemetry.polkadot.io:1024"; @@ -189,6 +190,12 @@ pub fn run(args: I) -> error::Result<()> where .into(); config.database_path = db_path(&base_path).to_string_lossy().into(); + config.pruning = match matches.value_of("pruning") { + Some("archive") => PruningMode::ArchiveAll, + None => PruningMode::keep_blocks(256), + Some(s) => PruningMode::keep_blocks(s.parse() + .map_err(|_| error::ErrorKind::Input("Invalid pruning mode specified".to_owned()))?), + }; let (mut genesis_storage, boot_nodes) = PresetConfig::from_spec(chain_spec) .map(PresetConfig::deconstruct) diff --git a/substrate/polkadot/service/src/config.rs b/substrate/polkadot/service/src/config.rs index c6dff5a584..0631f1819c 100644 --- a/substrate/polkadot/service/src/config.rs +++ b/substrate/polkadot/service/src/config.rs @@ -20,6 +20,7 @@ use transaction_pool; use runtime_primitives::MakeStorage; pub use network::Role; pub use network::NetworkConfiguration; +pub use client_db::PruningMode; /// Service configuration. pub struct Configuration { @@ -33,6 +34,8 @@ pub struct Configuration { pub keystore_path: String, /// Path to the database. pub database_path: String, + /// Pruning settings. + pub pruning: PruningMode, /// Additional key seeds. pub keys: Vec, /// The name of the chain. @@ -58,6 +61,7 @@ impl Default for Configuration { genesis_storage: Box::new(Default::default), telemetry: Default::default(), name: "Anonymous".into(), + pruning: PruningMode::ArchiveAll, } } } diff --git a/substrate/polkadot/service/src/lib.rs b/substrate/polkadot/service/src/lib.rs index 230a9ae587..90dd96a329 100644 --- a/substrate/polkadot/service/src/lib.rs +++ b/substrate/polkadot/service/src/lib.rs @@ -68,7 +68,7 @@ use exit_future::Signal; pub use self::error::{ErrorKind, Error}; pub use self::components::{Components, FullComponents, LightComponents}; -pub use config::{Configuration, Role}; +pub use config::{Configuration, Role, PruningMode}; /// Polkadot service. pub struct Service { @@ -118,6 +118,7 @@ impl Service let db_settings = client_db::DatabaseSettings { cache_size: None, path: config.database_path.into(), + pruning: config.pruning, }; let (client, on_demand) = components.build_client(db_settings, executor, config.genesis_storage)?; diff --git a/substrate/substrate/client/db/Cargo.toml b/substrate/substrate/client/db/Cargo.toml index b299d838a0..ac821794c9 100644 --- a/substrate/substrate/client/db/Cargo.toml +++ b/substrate/substrate/client/db/Cargo.toml @@ -18,6 +18,7 @@ substrate-client = { path = "../../../substrate/client" } substrate-state-machine = { path = "../../../substrate/state-machine" } substrate-runtime-support = { path = "../../../substrate/runtime-support" } substrate-codec = { path = "../../../substrate/codec" } +substrate-state-db = { path = "../../../substrate/state-db" } [dev-dependencies] kvdb-memorydb = { git = "https://github.com/paritytech/parity.git" } diff --git a/substrate/substrate/client/db/src/lib.rs b/substrate/substrate/client/db/src/lib.rs index f222df681f..1076618f3b 100644 --- a/substrate/substrate/client/db/src/lib.rs +++ b/substrate/substrate/client/db/src/lib.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Client backend that uses RocksDB database as storage. State is still kept in memory. +//! Client backend that uses RocksDB database as storage. extern crate substrate_client as client; extern crate kvdb_rocksdb; @@ -27,6 +27,7 @@ extern crate substrate_primitives as primitives; extern crate substrate_runtime_support as runtime_support; extern crate substrate_runtime_primitives as runtime_primitives; extern crate substrate_codec as codec; +extern crate substrate_state_db as state_db; #[macro_use] extern crate log; @@ -38,17 +39,21 @@ use std::sync::Arc; use std::path::PathBuf; use codec::Slicable; -use hashdb::DBValue; use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb::{KeyValueDB, DBTransaction}; use memorydb::MemoryDB; use parking_lot::RwLock; +use primitives::H256; use runtime_primitives::generic::BlockId; use runtime_primitives::bft::Justification; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hashing, HashingFor, Zero}; use runtime_primitives::BuildStorage; use state_machine::backend::Backend as StateBackend; -use state_machine::CodeExecutor; +use state_machine::{CodeExecutor, TrieH256, DBValue}; +use state_db::StateDb; +pub use state_db::PruningMode; + +const FINALIZATION_WINDOW: u64 = 32; /// DB-backed patricia trie state, transaction type is an overlay of changes to commit. pub type DbState = state_machine::TrieBackend; @@ -59,6 +64,8 @@ pub struct DatabaseSettings { pub cache_size: Option, /// Path to the database. pub path: PathBuf, + /// Pruning mode. + pub pruning: PruningMode, } /// Create an instance of db-backed client. @@ -74,7 +81,7 @@ pub fn new_client( E: CodeExecutor, S: BuildStorage, { - let backend = Arc::new(Backend::new(&settings)?); + let backend = Arc::new(Backend::new(settings, FINALIZATION_WINDOW)?); let executor = client::LocalCallExecutor::new(backend.clone(), executor); Ok(client::Client::new(backend, executor, genesis_storage)?) } @@ -82,11 +89,12 @@ pub fn new_client( mod columns { pub const META: Option = Some(0); pub const STATE: Option = Some(1); - pub const BLOCK_INDEX: Option = Some(2); - pub const HEADER: Option = Some(3); - pub const BODY: Option = Some(4); - pub const JUSTIFICATION: Option = Some(5); - pub const NUM_COLUMNS: u32 = 6; + pub const STATE_META: Option = Some(2); + pub const BLOCK_INDEX: Option = Some(3); + pub const HEADER: Option = Some(4); + pub const BODY: Option = Some(5); + pub const JUSTIFICATION: Option = Some(6); + pub const NUM_COLUMNS: u32 = 7; } mod meta { @@ -131,6 +139,17 @@ fn db_err(err: kvdb::Error) -> client::error::Error { } } +// wrapper that implements trait required for state_db +struct StateMetaDb<'a>(&'a KeyValueDB); + +impl<'a> state_db::MetaDb for StateMetaDb<'a> { + type Error = kvdb::Error; + + fn get_meta(&self, key: &[u8]) -> Result>, Self::Error> { + self.0.get(columns::STATE_META, key).map(|r| r.map(|v| v.to_vec())) + } +} + /// Block database pub struct BlockchainDb { db: Arc, @@ -299,44 +318,87 @@ impl client::backend::BlockImportOperation for BlockImport } } +struct StorageDb { + pub db: Arc, + pub state_db: StateDb, +} + +impl state_machine::Storage for StorageDb { + fn get(&self, key: &TrieH256) -> Result, String> { + self.state_db.get(&key.0.into(), self).map(|r| r.map(|v| DBValue::from_slice(&v))) + .map_err(|e| format!("Database backend error: {:?}", e)) + } +} + +impl state_db::HashDb for StorageDb { + type Error = kvdb::Error; + type Hash = H256; + + fn get(&self, key: &H256) -> Result>, Self::Error> { + self.db.get(columns::STATE, &key[..]).map(|r| r.map(|v| v.to_vec())) + } +} + + /// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks. /// Otherwise, trie nodes are kept only from the most recent block. pub struct Backend { - db: Arc, + storage: Arc>, blockchain: BlockchainDb, - archive: bool, + finalization_window: u64, } impl Backend where ::Number: As { /// Create a new instance of database backend. - pub fn new(config: &DatabaseSettings) -> Result { + pub fn new(config: DatabaseSettings, finalization_window: u64) -> Result { let mut db_config = DatabaseConfig::with_columns(Some(columns::NUM_COLUMNS)); db_config.memory_budget = config.cache_size; db_config.wal = true; let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?; let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?); - Backend::from_kvdb(db as Arc<_>, true) + Backend::from_kvdb(db as Arc<_>, config.pruning, finalization_window) } #[cfg(test)] fn new_test() -> Self { let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS)); - Backend::from_kvdb(db as Arc<_>, false).expect("failed to create test-db") + Backend::from_kvdb(db as Arc<_>, PruningMode::keep_blocks(0), 0).expect("failed to create test-db") } - fn from_kvdb(db: Arc, archive: bool) -> Result { + fn from_kvdb(db: Arc, pruning: PruningMode, finalization_window: u64) -> Result { let blockchain = BlockchainDb::new(db.clone())?; + let map_e = |e: state_db::Error| ::client::error::Error::from(format!("State database error: {:?}", e)); + let state_db: StateDb = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?; + let storage_db = StorageDb { + db, + state_db, + }; Ok(Backend { - db, + storage: Arc::new(storage_db), blockchain, - archive + finalization_window, }) } } +fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet) { + for (key, val) in commit.data.inserted.into_iter() { + transaction.put(columns::STATE, &key[..], &val); + } + for key in commit.data.deleted.into_iter() { + transaction.delete(columns::STATE, &key[..]); + } + for (key, val) in commit.meta.inserted.into_iter() { + transaction.put(columns::STATE_META, &key[..], &val); + } + for key in commit.meta.deleted.into_iter() { + transaction.delete(columns::STATE_META, &key[..]); + } +} + impl client::backend::Backend for Backend where ::Number: As, Block::Hash: Into<[u8; 32]>, // TODO: remove when patricia_trie generic. @@ -355,6 +417,7 @@ impl client::backend::Backend for Backend where } fn commit_operation(&self, mut operation: Self::BlockImportOperation) -> Result<(), client::error::Error> { + use client::blockchain::Backend; let mut transaction = DBTransaction::new(); if let Some(pending_block) = operation.pending_block { let hash = pending_block.header.hash(); @@ -371,15 +434,34 @@ impl client::backend::Backend for Backend where if pending_block.is_best { transaction.put(columns::META, meta::BEST_BLOCK, &key); } + let mut changeset: state_db::ChangeSet = state_db::ChangeSet::default(); for (key, (val, rc)) in operation.updates.drain() { if rc > 0 { - transaction.put(columns::STATE, &key.0[..], &val); - } else if rc < 0 && !self.archive { - transaction.delete(columns::STATE, &key.0[..]); + changeset.inserted.push((key.0.into(), val.to_vec())); + } else if rc < 0 { + changeset.deleted.push(key.0.into()); } } + let number_u64 = number.as_().into(); + let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset); + apply_state_commit(&mut transaction, commit); + + //finalize an older block + if number_u64 > self.finalization_window { + let finalizing_hash = if self.finalization_window == 0 { + Some(hash) + } else { + self.blockchain.hash(As::sa((number_u64 - self.finalization_window) as u32))? + }; + if let Some(finalizing_hash) = finalizing_hash { + trace!("Finalizing block #{} ({:?})", number_u64 - self.finalization_window, finalizing_hash); + let commit = self.storage.state_db.finalize_block(&finalizing_hash); + apply_state_commit(&mut transaction, commit); + } + } + debug!("DB Commit {:?} ({})", hash, number); - self.db.write(transaction).map_err(db_err)?; + self.storage.db.write(transaction).map_err(db_err)?; self.blockchain.update_meta(hash, number, pending_block.is_best); } Ok(()) @@ -395,13 +477,13 @@ impl client::backend::Backend for Backend where // special case for genesis initialization match block { BlockId::Hash(h) if h == Default::default() => - return Ok(DbState::with_kvdb_for_genesis(self.db.clone(), ::columns::STATE)), + return Ok(DbState::with_storage_for_genesis(self.storage.clone())), _ => {} } self.blockchain.header(block).and_then(|maybe_hdr| maybe_hdr.map(|hdr| { let root: [u8; 32] = hdr.state_root().clone().into(); - DbState::with_kvdb(self.db.clone(), ::columns::STATE, root.into()) + DbState::with_storage(self.storage.clone(), root.into()) }).ok_or_else(|| client::error::ErrorKind::UnknownBlock(format!("{:?}", block)).into())) } } @@ -438,7 +520,11 @@ mod tests { let mut op = db.begin_operation(id).unwrap(); let header = Header { number: i, - parent_hash: Default::default(), + parent_hash: if i == 0 { + Default::default() + } else { + db.blockchain.hash(i - 1).unwrap().unwrap() + }, state_root: Default::default(), digest: Default::default(), extrinsics_root: Default::default(), @@ -538,10 +624,10 @@ mod tests { #[test] fn delete_only_when_negative_rc() { let key; - let db = Backend::::new_test(); + let backend = Backend::::new_test(); - { - let mut op = db.begin_operation(BlockId::Hash(Default::default())).unwrap(); + let hash = { + let mut op = backend.begin_operation(BlockId::Hash(Default::default())).unwrap(); let mut header = Header { number: 0, parent_hash: Default::default(), @@ -557,6 +643,7 @@ mod tests { .cloned() .map(|(x, y)| (x, Some(y))) ).0.into(); + let hash = header.hash(); op.reset_storage(storage.iter().cloned()).unwrap(); @@ -568,16 +655,17 @@ mod tests { true ).unwrap(); - db.commit_operation(op).unwrap(); + backend.commit_operation(op).unwrap(); - assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]); - } + assert_eq!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]); + hash + }; - { - let mut op = db.begin_operation(BlockId::Number(0)).unwrap(); + let hash = { + let mut op = backend.begin_operation(BlockId::Number(0)).unwrap(); let mut header = Header { number: 1, - parent_hash: Default::default(), + parent_hash: hash, state_root: Default::default(), digest: Default::default(), extrinsics_root: Default::default(), @@ -590,6 +678,7 @@ mod tests { .cloned() .map(|(x, y)| (x, Some(y))) ).0.into(); + let hash = header.hash(); op.updates.insert(b"hello"); op.updates.remove(&key); @@ -600,16 +689,17 @@ mod tests { true ).unwrap(); - db.commit_operation(op).unwrap(); + backend.commit_operation(op).unwrap(); - assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]); - } + assert_eq!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]); + hash + }; { - let mut op = db.begin_operation(BlockId::Number(1)).unwrap(); + let mut op = backend.begin_operation(BlockId::Number(1)).unwrap(); let mut header = Header { - number: 1, - parent_hash: Default::default(), + number: 2, + parent_hash: hash, state_root: Default::default(), digest: Default::default(), extrinsics_root: Default::default(), @@ -631,9 +721,9 @@ mod tests { true ).unwrap(); - db.commit_operation(op).unwrap(); + backend.commit_operation(op).unwrap(); - assert!(db.db.get(::columns::STATE, &key.0[..]).unwrap().is_none()); + assert!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().is_none()); } } } diff --git a/substrate/substrate/client/src/client.rs b/substrate/substrate/client/src/client.rs index cfca952570..30928fa3aa 100644 --- a/substrate/substrate/client/src/client.rs +++ b/substrate/substrate/client/src/client.rs @@ -486,7 +486,6 @@ mod tests { #[test] fn client_initialises_from_genesis_ok() { let client = test_client::new(); - let _genesis_hash = client.block_hash(0).unwrap().unwrap(); assert_eq!(client.using_environment(|| test_runtime::system::balance_of(Keyring::Alice.to_raw_public().into())).unwrap(), 1000); assert_eq!(client.using_environment(|| test_runtime::system::balance_of(Keyring::Ferdie.to_raw_public().into())).unwrap(), 0); diff --git a/substrate/substrate/runtime/council/src/lib.rs b/substrate/substrate/runtime/council/src/lib.rs index 7a4ff1ccc8..f1921ee056 100644 --- a/substrate/substrate/runtime/council/src/lib.rs +++ b/substrate/substrate/runtime/council/src/lib.rs @@ -361,7 +361,7 @@ impl Module { let (_, _, expiring) = Self::next_finalise().ok_or("cannot present outside of presentation period")?; let stakes = Self::snapshoted_stakes(); let voters = Self::voters(); - let bad_presentation_punishment = Self::present_slash_per_voter() * T::Balance::sa(voters.len()); + let bad_presentation_punishment = Self::present_slash_per_voter() * T::Balance::sa(voters.len() as u64); ensure!(>::can_slash(aux.ref_into(), bad_presentation_punishment), "presenter must have sufficient slashable funds"); let mut leaderboard = Self::leaderboard().ok_or("leaderboard must exist while present phase active")?; diff --git a/substrate/substrate/runtime/democracy/src/lib.rs b/substrate/substrate/runtime/democracy/src/lib.rs index 17b72d68e4..b4df04cf3c 100644 --- a/substrate/substrate/runtime/democracy/src/lib.rs +++ b/substrate/substrate/runtime/democracy/src/lib.rs @@ -117,7 +117,7 @@ impl Module { /// Get the amount locked in support of `proposal`; `None` if proposal isn't a valid proposal /// index. pub fn locked_for(proposal: PropIndex) -> Option { - Self::deposit_of(proposal).map(|(d, l)| d * T::Balance::sa(l.len())) + Self::deposit_of(proposal).map(|(d, l)| d * T::Balance::sa(l.len() as u64)) } /// Return true if `ref_index` is an on-going referendum. diff --git a/substrate/substrate/runtime/primitives/src/traits.rs b/substrate/substrate/runtime/primitives/src/traits.rs index 996235e06b..218fbc1276 100644 --- a/substrate/substrate/runtime/primitives/src/traits.rs +++ b/substrate/substrate/runtime/primitives/src/traits.rs @@ -121,7 +121,7 @@ impl RefInto for T { } pub trait SimpleArithmetic: - Zero + One + IntegerSquareRoot + As + + Zero + One + IntegerSquareRoot + As + Add + AddAssign + Sub + SubAssign + Mul + MulAssign + @@ -130,7 +130,7 @@ pub trait SimpleArithmetic: PartialOrd + Ord {} impl + + Zero + One + IntegerSquareRoot + As + Add + AddAssign + Sub + SubAssign + Mul + MulAssign + @@ -314,7 +314,7 @@ pub trait Digest { /// `parent_hash`, as well as a `digest` and a block `number`. /// /// You can also create a `new` one from those fields. -pub trait Header: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug { +pub trait Header: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug + 'static { type Number: Member + ::rstd::hash::Hash + Copy + MaybeDisplay + SimpleArithmetic + Slicable; type Hash: Member + ::rstd::hash::Hash + Copy + MaybeDisplay + Default + SimpleBitOps + Slicable + AsRef<[u8]>; type Hashing: Hashing; @@ -352,7 +352,7 @@ pub trait Header: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug { /// `Extrinsic` piece of information as well as a `Header`. /// /// You can get an iterator over each of the `extrinsics` and retrieve the `header`. -pub trait Block: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug { +pub trait Block: Clone + Send + Sync + Slicable + Eq + MaybeSerializeDebug + 'static { type Extrinsic: Member + Slicable; type Header: Header; type Hash: Member + ::rstd::hash::Hash + Copy + MaybeDisplay + Default + SimpleBitOps + Slicable + AsRef<[u8]>; diff --git a/substrate/substrate/runtime/staking/src/lib.rs b/substrate/substrate/runtime/staking/src/lib.rs index 138130886a..3237584b84 100644 --- a/substrate/substrate/runtime/staking/src/lib.rs +++ b/substrate/substrate/runtime/staking/src/lib.rs @@ -120,12 +120,12 @@ impl ContractAddressFor for Hashing where pub trait Trait: system::Trait + session::Trait { /// The balance of an account. - type Balance: Parameter + SimpleArithmetic + Slicable + Default + Copy + As + As; + type Balance: Parameter + SimpleArithmetic + Slicable + Default + Copy + As + As + As; /// Function type to get the contract address given the creator. type DetermineContractAddress: ContractAddressFor; /// Type used for storing an account's index; implies the maximum number of accounts the system /// can hold. - type AccountIndex: Parameter + Member + Slicable + SimpleArithmetic + As + As + As + As + Copy; + type AccountIndex: Parameter + Member + Slicable + SimpleArithmetic + As + As + As + As + As + Copy; } decl_module! { @@ -872,7 +872,7 @@ impl AuxLookup for Module { impl MakePayment for Module { fn make_payment(transactor: &T::AccountId, encoded_len: usize) -> Result { let b = Self::free_balance(transactor); - let transaction_fee = Self::transaction_base_fee() + Self::transaction_byte_fee() * >::sa(encoded_len); + let transaction_fee = Self::transaction_base_fee() + Self::transaction_byte_fee() * >::sa(encoded_len as u64); if b < transaction_fee { return Err("not enough funds for transaction fee"); } diff --git a/substrate/substrate/state-db/Cargo.toml b/substrate/substrate/state-db/Cargo.toml new file mode 100644 index 0000000000..734fc27f48 --- /dev/null +++ b/substrate/substrate/state-db/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "substrate-state-db" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +parking_lot = "0.5" +log = "0.4" +substrate-primitives = { path = "../../substrate/primitives" } +substrate-codec = { path = "../../substrate/codec" } + +[dev-dependencies] +env_logger = "0.4" diff --git a/substrate/substrate/state-db/src/lib.rs b/substrate/substrate/state-db/src/lib.rs new file mode 100644 index 0000000000..8e4e330333 --- /dev/null +++ b/substrate/substrate/state-db/src/lib.rs @@ -0,0 +1,354 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! State database maintenance. Handles finalization and pruning in the database. The input to +//! this module is a `ChangeSet` which is basicall a list of key-value pairs (trie nodes) that +//! were added or deleted during block execution. +//! +//! # Finalization. +//! Finalization window tracks a tree of blocks identified by header hash. The in-memory +//! overlay allows to get any node that was was inserted in any any of the blocks within the window. +//! The tree is journaled to the backing database and rebuilt on startup. +//! Finalization function select one root from the top of the tree and discards all other roots and +//! their subtrees. +//! +//! # Pruning. +//! See `RefWindow` for pruning algorithm details. `StateDb` prunes on each finalization until pruning +//! constraints are satisfied. +//! + +#[macro_use] extern crate log; +extern crate parking_lot; +extern crate substrate_codec as codec; +extern crate substrate_primitives as primitives; + +mod unfinalized; +mod pruning; +#[cfg(test)] mod test; + +use std::fmt; +use parking_lot::RwLock; +use codec::Slicable; +use std::collections::HashSet; +use unfinalized::UnfinalizedOverlay; +use pruning::RefWindow; + +/// Database value type. +pub type DBValue = Vec; + +/// Basic set of requirements for the Block hash and node key types. +pub trait Hash: Send + Sync + Sized + Eq + PartialEq + Clone + Default + fmt::Debug + Slicable + std::hash::Hash + 'static {} +impl Hash for T {} + +/// Backend database trait. Read-only. +pub trait MetaDb { + type Error: fmt::Debug; + + /// Get meta value, such as the journal. + fn get_meta(&self, key: &[u8]) -> Result, Self::Error>; +} + + +/// Backend database trait. Read-only. +pub trait HashDb { + type Hash: Hash; + type Error: fmt::Debug; + + /// Get state trie node. + fn get(&self, key: &Self::Hash) -> Result, Self::Error>; +} + +/// Error type. +/// Error type. +pub enum Error { + /// Database backend error. + Db(E), + /// `Slicable` decoding error. + Decoding, +} + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match self { + Error::Db(e) => e.fmt(f), + Error::Decoding => write!(f, "Error decoding slicable value"), + } + } +} + +/// A set of state node changes. +#[derive(Default, Debug, Clone)] +pub struct ChangeSet { + /// Inserted nodes. + pub inserted: Vec<(H, DBValue)>, + /// Delted nodes. + pub deleted: Vec, +} + + +/// A set of changes to the backing database. +#[derive(Default, Debug, Clone)] +pub struct CommitSet { + /// State node changes. + pub data: ChangeSet, + /// Metadata changes. + pub meta: ChangeSet>, +} + +/// Pruning contraints. If none are specified pruning is +#[derive(Default, Debug, Clone)] +pub struct Constraints { + /// Maximum blocks. Defaults to 0 when unspecified, effectively keeping only unfinalized states. + pub max_blocks: Option, + /// Maximum memory in the pruning overlay. + pub max_mem: Option, +} + +/// Pruning mode. +#[derive(Debug, Clone)] +pub enum PruningMode { + /// Maintain a pruning window. + Constrained(Constraints), + /// No pruning. Finalization is a no-op. + ArchiveAll, + /// Finalization discards unfinalized nodes. All the finalized nodes are kept in the DB. + ArchiveCanonical, +} + +impl PruningMode { + /// Create a mode that keeps given number of blocks. + pub fn keep_blocks(n: u32) -> PruningMode { + PruningMode::Constrained(Constraints { + max_blocks: Some(n), + max_mem: None, + }) + } +} + +fn to_meta_key(suffix: &[u8], data: &S) -> Vec { + let mut buffer = data.encode(); + buffer.extend(suffix); + buffer +} + +struct StateDbSync { + mode: PruningMode, + unfinalized: UnfinalizedOverlay, + pruning: Option>, + pinned: HashSet, +} + +impl StateDbSync { + pub fn new(mode: PruningMode, db: &D) -> Result, Error> { + trace!("StateDb settings: {:?}", mode); + let unfinalized: UnfinalizedOverlay = UnfinalizedOverlay::new(db)?; + let pruning: Option> = match mode { + PruningMode::Constrained(Constraints { + max_mem: Some(_), + .. + }) => unimplemented!(), + PruningMode::Constrained(_) => Some(RefWindow::new(db)?), + PruningMode::ArchiveAll | PruningMode::ArchiveCanonical => None, + }; + Ok(StateDbSync { + mode, + unfinalized, + pruning: pruning, + pinned: Default::default(), + }) + } + + pub fn insert_block(&mut self, hash: &BlockHash, number: u64, parent_hash: &BlockHash, mut changeset: ChangeSet) -> CommitSet { + if number == 0 { + return CommitSet { + data: changeset, + meta: Default::default(), + } + } + match self.mode { + PruningMode::ArchiveAll => { + changeset.deleted.clear(); + // write changes immediatelly + CommitSet { + data: changeset, + meta: Default::default(), + } + }, + PruningMode::Constrained(_) | PruningMode::ArchiveCanonical => { + self.unfinalized.insert(hash, number, parent_hash, changeset) + } + } + } + + pub fn finalize_block(&mut self, hash: &BlockHash) -> CommitSet { + let mut commit = match self.mode { + PruningMode::ArchiveAll => { + CommitSet::default() + }, + PruningMode::ArchiveCanonical => { + let mut commit = self.unfinalized.finalize(hash); + commit.data.deleted.clear(); + commit + }, + PruningMode::Constrained(_) => { + self.unfinalized.finalize(hash) + }, + }; + if let Some(ref mut pruning) = self.pruning { + pruning.note_finalized(hash, &mut commit); + } + self.prune(&mut commit); + commit + } + + fn prune(&mut self, commit: &mut CommitSet) { + if let (&mut Some(ref mut pruning), &PruningMode::Constrained(ref constraints)) = (&mut self.pruning, &self.mode) { + loop { + if pruning.window_size() <= constraints.max_blocks.unwrap_or(0) as u64 { + break; + } + + if constraints.max_mem.map_or(false, |m| pruning.mem_used() > m) { + break; + } + + let pinned = &self.pinned; + if pruning.next_hash().map_or(false, |h| pinned.contains(&h)) { + break; + } + + pruning.prune_one(commit); + } + } + } + + pub fn pin(&mut self, hash: &BlockHash) { + self.pinned.insert(hash.clone()); + } + + pub fn unpin(&mut self, hash: &BlockHash) { + self.pinned.remove(hash); + } + + pub fn get>(&self, key: &Key, db: &D) -> Result, Error> { + if let Some(value) = self.unfinalized.get(key) { + return Ok(Some(value)); + } + db.get(key).map_err(|e| Error::Db(e)) + } +} + +/// State DB maintenance. See module description. +/// Can be shared across threads. +pub struct StateDb { + db: RwLock>, +} + +impl StateDb { + /// Creates a new instance. Does not expect any metadata in the database. + pub fn new(mode: PruningMode, db: &D) -> Result, Error> { + Ok(StateDb { + db: RwLock::new(StateDbSync::new(mode, db)?) + }) + } + + /// Add a new unfinalized block. + pub fn insert_block(&self, hash: &BlockHash, number: u64, parent_hash: &BlockHash, changeset: ChangeSet) -> CommitSet { + self.db.write().insert_block(hash, number, parent_hash, changeset) + } + + /// Finalize a previously inserted block. + pub fn finalize_block(&self, hash: &BlockHash) -> CommitSet { + self.db.write().finalize_block(hash) + } + + /// Prevents pruning of specified block and its descendants. + pub fn pin(&self, hash: &BlockHash) { + self.db.write().pin(hash) + } + + /// Allows pruning of specified block. + pub fn unpin(&self, hash: &BlockHash) { + self.db.write().unpin(hash) + } + + /// Get a value from unfinalized/pruning overlay or the backing DB. + pub fn get>(&self, key: &Key, db: &D) -> Result, Error> { + self.db.read().get(key, db) + } +} + +#[cfg(test)] +mod tests { + use primitives::H256; + use {StateDb, PruningMode, Constraints}; + use test::{make_db, make_changeset, TestDb}; + + fn make_test_db(settings: PruningMode) -> (TestDb, StateDb) { + let mut db = make_db(&[91, 921, 922, 93, 94]); + let state_db = StateDb::new(settings, &db).unwrap(); + + db.commit(&state_db.insert_block(&H256::from(1), 1, &H256::from(0), make_changeset(&[1], &[91]))); + db.commit(&state_db.insert_block(&H256::from(21), 2, &H256::from(1), make_changeset(&[21], &[921, 1]))); + db.commit(&state_db.insert_block(&H256::from(22), 2, &H256::from(1), make_changeset(&[22], &[922]))); + db.commit(&state_db.insert_block(&H256::from(3), 3, &H256::from(21), make_changeset(&[3], &[93]))); + db.commit(&state_db.finalize_block(&H256::from(1))); + db.commit(&state_db.insert_block(&H256::from(4), 4, &H256::from(3), make_changeset(&[4], &[94]))); + db.commit(&state_db.finalize_block(&H256::from(21))); + db.commit(&state_db.finalize_block(&H256::from(3))); + + (db, state_db) + } + + #[test] + fn full_archive_keeps_everything() { + let (db, _) = make_test_db(PruningMode::ArchiveAll); + assert!(db.data_eq(&make_db(&[1, 21, 22, 3, 4, 91, 921, 922, 93, 94]))); + } + + #[test] + fn canonical_archive_keeps_canonical() { + let (db, _) = make_test_db(PruningMode::ArchiveCanonical); + assert!(db.data_eq(&make_db(&[1, 21, 3, 91, 921, 922, 93, 94]))); + } + + #[test] + fn prune_window_0() { + let (db, _) = make_test_db(PruningMode::Constrained(Constraints { + max_blocks: Some(0), + max_mem: None, + })); + assert!(db.data_eq(&make_db(&[21, 3, 922, 94]))); + } + + #[test] + fn prune_window_1() { + let (db, _) = make_test_db(PruningMode::Constrained(Constraints { + max_blocks: Some(1), + max_mem: None, + })); + assert!(db.data_eq(&make_db(&[21, 3, 922, 93, 94]))); + } + + #[test] + fn prune_window_2() { + let (db, _) = make_test_db(PruningMode::Constrained(Constraints { + max_blocks: Some(2), + max_mem: None, + })); + assert!(db.data_eq(&make_db(&[1, 21, 3, 921, 922, 93, 94]))); + } +} diff --git a/substrate/substrate/state-db/src/pruning.rs b/substrate/substrate/state-db/src/pruning.rs new file mode 100644 index 0000000000..b5fbb553d9 --- /dev/null +++ b/substrate/substrate/state-db/src/pruning.rs @@ -0,0 +1,280 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Pruning window. +//! +//! For each block we maintain a list of nodes pending deletion. +//! There is also a global index of node key to block number. +//! If a node is re-inserted into the window it gets removed from +//! the death list. +//! The changes are journaled in the DB. + +use std::collections::{HashMap, HashSet, VecDeque}; +use codec::{Slicable, self}; +use {CommitSet, Error, MetaDb, to_meta_key, Hash}; + +const LAST_PRUNED: &[u8] = b"last_pruned"; +const PRUNING_JOURNAL: &[u8] = b"pruning_journal"; + +/// See module documentation. +pub struct RefWindow { + death_rows: VecDeque>, + death_index: HashMap, + pending_number: u64, +} + +#[derive(Debug, PartialEq, Eq)] +struct DeathRow { + hash: BlockHash, + journal_key: Vec, + deleted: HashSet, +} + +struct JournalRecord { + hash: BlockHash, + inserted: Vec, + deleted: Vec, +} + +impl Slicable for JournalRecord { + fn encode(&self) -> Vec { + let mut v = Vec::with_capacity(4096); + self.hash.using_encoded(|s| v.extend(s)); + self.inserted.using_encoded(|s| v.extend(s)); + self.deleted.using_encoded(|s| v.extend(s)); + v + } + + fn decode(input: &mut I) -> Option { + Some(JournalRecord { + hash: Slicable::decode(input)?, + inserted: Slicable::decode(input)?, + deleted: Slicable::decode(input)?, + }) + } +} + +fn to_journal_key(block: u64) -> Vec { + to_meta_key(PRUNING_JOURNAL, &block) +} + +impl RefWindow { + pub fn new(db: &D) -> Result, Error> { + let last_pruned = db.get_meta(&to_meta_key(LAST_PRUNED, &())) + .map_err(|e| Error::Db(e))?; + let pending_number: u64 = match last_pruned { + Some(buffer) => u64::decode(&mut buffer.as_slice()).ok_or(Error::Decoding)? + 1, + None => 1, + }; + let mut block = pending_number; + let mut pruning = RefWindow { + death_rows: Default::default(), + death_index: Default::default(), + pending_number: pending_number, + }; + // read the journal + trace!(target: "state-db", "Reading pruning journal. Last pruned #{}", pending_number - 1); + loop { + let journal_key = to_journal_key(block); + match db.get_meta(&journal_key).map_err(|e| Error::Db(e))? { + Some(record) => { + let record: JournalRecord = Slicable::decode(&mut record.as_slice()).ok_or(Error::Decoding)?; + trace!(target: "state-db", "Pruning journal entry {} ({} inserted, {} deleted)", block, record.inserted.len(), record.deleted.len()); + pruning.import(&record.hash, journal_key, record.inserted.into_iter(), record.deleted); + }, + None => break, + } + block += 1; + } + Ok(pruning) + } + + fn import>(&mut self, hash: &BlockHash, journal_key: Vec, inserted: I, deleted: Vec) { + // remove all re-inserted keys from death rows + for k in inserted { + if let Some(block) = self.death_index.remove(&k) { + self.death_rows[(block - self.pending_number) as usize].deleted.remove(&k); + } + } + + // add new keys + let imported_block = self.pending_number + self.death_rows.len() as u64; + for k in deleted.iter() { + self.death_index.insert(k.clone(), imported_block); + } + self.death_rows.push_back( + DeathRow { + hash: hash.clone(), + deleted: deleted.into_iter().collect(), + journal_key: journal_key, + } + ); + } + + pub fn window_size(&self) -> u64 { + self.death_rows.len() as u64 + } + + pub fn next_hash(&self) -> Option { + self.death_rows.front().map(|r| r.hash.clone()) + } + + pub fn mem_used(&self) -> usize { + 0 + } + + /// Prune next block. Expects at least one block in the window. Adds changes to `commit`. + pub fn prune_one(&mut self, commit: &mut CommitSet) { + let pruned = self.death_rows.pop_front().expect("prune_one is only called with a non-empty window"); + trace!(target: "state-db", "Pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len()); + for k in pruned.deleted.iter() { + self.death_index.remove(&k); + } + commit.data.deleted.extend(pruned.deleted.into_iter()); + commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), self.pending_number.encode())); + commit.meta.deleted.push(pruned.journal_key); + self.pending_number += 1; + } + + /// Add a change set to the window. Creates a journal record and pushes it to `commit` + pub fn note_finalized(&mut self, hash: &BlockHash, commit: &mut CommitSet) { + trace!(target: "state-db", "Adding to pruning window: {:?} ({} inserted, {} deleted)", hash, commit.data.inserted.len(), commit.data.deleted.len()); + let inserted = commit.data.inserted.iter().map(|(k, _)| k.clone()).collect(); + let deleted = ::std::mem::replace(&mut commit.data.deleted, Vec::new()); + let journal_record = JournalRecord { + hash: hash.clone(), + inserted, + deleted, + }; + let block = self.pending_number + self.window_size(); + let journal_key = to_journal_key(block); + commit.meta.inserted.push((journal_key.clone(), journal_record.encode())); + + self.import(hash, journal_key, journal_record.inserted.into_iter(), journal_record.deleted); + } +} + +#[cfg(test)] +mod tests { + use super::RefWindow; + use primitives::H256; + use {CommitSet}; + use test::{make_db, make_commit, TestDb}; + + fn check_journal(pruning: &RefWindow, db: &TestDb) { + let restored: RefWindow = RefWindow::new(db).unwrap(); + assert_eq!(pruning.pending_number, restored.pending_number); + assert_eq!(pruning.death_rows, restored.death_rows); + assert_eq!(pruning.death_index, restored.death_index); + } + + #[test] + fn created_from_empty_db() { + let db = make_db(&[]); + let pruning: RefWindow = RefWindow::new(&db).unwrap(); + assert_eq!(pruning.pending_number, 1); + assert!(pruning.death_rows.is_empty()); + assert!(pruning.death_index.is_empty()); + } + + #[test] + #[should_panic] + fn prune_empty_panics() { + let db = make_db(&[]); + let mut pruning: RefWindow = RefWindow::new(&db).unwrap(); + let mut commit = CommitSet::default(); + pruning.prune_one(&mut commit); + } + + #[test] + fn prune_one() { + let mut db = make_db(&[1, 2, 3]); + let mut pruning: RefWindow = RefWindow::new(&db).unwrap(); + let mut commit = make_commit(&[4, 5], &[1, 3]); + let h = H256::random(); + pruning.note_finalized(&h, &mut commit); + db.commit(&commit); + assert!(commit.data.deleted.is_empty()); + assert_eq!(pruning.death_rows.len(), 1); + assert_eq!(pruning.death_index.len(), 2); + assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5]))); + check_journal(&pruning, &db); + + let mut commit = CommitSet::default(); + pruning.prune_one(&mut commit); + db.commit(&commit); + assert!(db.data_eq(&make_db(&[2, 4, 5]))); + assert!(pruning.death_rows.is_empty()); + assert!(pruning.death_index.is_empty()); + assert_eq!(pruning.pending_number, 2); + } + + #[test] + fn prune_two() { + let mut db = make_db(&[1, 2, 3]); + let mut pruning: RefWindow = RefWindow::new(&db).unwrap(); + let mut commit = make_commit(&[4], &[1]); + pruning.note_finalized(&H256::random(), &mut commit); + db.commit(&commit); + let mut commit = make_commit(&[5], &[2]); + pruning.note_finalized(&H256::random(), &mut commit); + db.commit(&commit); + assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5]))); + + check_journal(&pruning, &db); + + let mut commit = CommitSet::default(); + pruning.prune_one(&mut commit); + db.commit(&commit); + assert!(db.data_eq(&make_db(&[2, 3, 4, 5]))); + let mut commit = CommitSet::default(); + pruning.prune_one(&mut commit); + db.commit(&commit); + assert!(db.data_eq(&make_db(&[3, 4, 5]))); + assert_eq!(pruning.pending_number, 3); + } + + #[test] + fn reinserted_survives() { + let mut db = make_db(&[1, 2, 3]); + let mut pruning: RefWindow = RefWindow::new(&db).unwrap(); + let mut commit = make_commit(&[], &[2]); + pruning.note_finalized(&H256::random(), &mut commit); + db.commit(&commit); + let mut commit = make_commit(&[2], &[]); + pruning.note_finalized(&H256::random(), &mut commit); + db.commit(&commit); + let mut commit = make_commit(&[], &[2]); + pruning.note_finalized(&H256::random(), &mut commit); + db.commit(&commit); + assert!(db.data_eq(&make_db(&[1, 2, 3]))); + + check_journal(&pruning, &db); + + let mut commit = CommitSet::default(); + pruning.prune_one(&mut commit); + db.commit(&commit); + assert!(db.data_eq(&make_db(&[1, 2, 3]))); + let mut commit = CommitSet::default(); + pruning.prune_one(&mut commit); + db.commit(&commit); + assert!(db.data_eq(&make_db(&[1, 2, 3]))); + pruning.prune_one(&mut commit); + db.commit(&commit); + assert!(db.data_eq(&make_db(&[1, 3]))); + assert_eq!(pruning.pending_number, 4); + } +} diff --git a/substrate/substrate/state-db/src/test.rs b/substrate/substrate/state-db/src/test.rs new file mode 100644 index 0000000000..d9ff05a6a2 --- /dev/null +++ b/substrate/substrate/state-db/src/test.rs @@ -0,0 +1,83 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Test utils + +use std::collections::HashMap; +use primitives::H256; +use {DBValue, ChangeSet, CommitSet, MetaDb, HashDb}; + +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct TestDb { + pub data: HashMap, + pub meta: HashMap, DBValue>, +} + +impl MetaDb for TestDb { + type Error = (); + + fn get_meta(&self, key: &[u8]) -> Result, ()> { + Ok(self.meta.get(key).cloned()) + } +} + +impl HashDb for TestDb { + type Error = (); + type Hash = H256; + + fn get(&self, key: &H256) -> Result, ()> { + Ok(self.data.get(key).cloned()) + } +} + +impl TestDb { + pub fn commit(&mut self, commit: &CommitSet) { + self.data.extend(commit.data.inserted.iter().cloned()); + for k in commit.data.deleted.iter() { + self.data.remove(k); + } + self.meta.extend(commit.meta.inserted.iter().cloned()); + for k in commit.meta.deleted.iter() { + self.meta.remove(k); + } + } + + pub fn data_eq(&self, other: &TestDb) -> bool { + self.data == other.data + } +} + +pub fn make_changeset(inserted: &[u64], deleted: &[u64]) -> ChangeSet { + ChangeSet { + inserted: inserted.iter().map(|v| (H256::from(*v), H256::from(*v).to_vec())).collect(), + deleted: deleted.iter().map(|v| H256::from(*v)).collect(), + } +} + +pub fn make_commit(inserted: &[u64], deleted: &[u64]) -> CommitSet { + CommitSet { + data: make_changeset(inserted, deleted), + meta: ChangeSet::default(), + } +} + +pub fn make_db(inserted: &[u64]) -> TestDb { + TestDb { + data: inserted.iter().map(|v| (H256::from(*v), H256::from(*v).to_vec())).collect(), + meta: Default::default(), + } +} + diff --git a/substrate/substrate/state-db/src/unfinalized.rs b/substrate/substrate/state-db/src/unfinalized.rs new file mode 100644 index 0000000000..e4cd5aa278 --- /dev/null +++ b/substrate/substrate/state-db/src/unfinalized.rs @@ -0,0 +1,475 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Finalization window. +//! Maintains trees of block overlays and allows discarding trees/roots + +use std::collections::{HashMap, VecDeque}; +use super::{Error, DBValue, ChangeSet, CommitSet, MetaDb, Hash, to_meta_key}; +use codec::{self, Slicable}; + +const UNFINALIZED_JOURNAL: &[u8] = b"unfinalized_journal"; +const LAST_FINALIZED: &[u8] = b"last_finalized"; + +/// See module documentation. +pub struct UnfinalizedOverlay { + last_finalized: Option<(BlockHash, u64)>, + levels: VecDeque>>, + parents: HashMap, +} + +struct JournalRecord { + hash: BlockHash, + parent_hash: BlockHash, + inserted: Vec<(Key, DBValue)>, + deleted: Vec, +} + +impl Slicable for JournalRecord { + fn encode(&self) -> Vec { + let mut v = Vec::with_capacity(4096); + self.hash.using_encoded(|s| v.extend(s)); + self.parent_hash.using_encoded(|s| v.extend(s)); + self.inserted.using_encoded(|s| v.extend(s)); + self.deleted.using_encoded(|s| v.extend(s)); + v + } + + fn decode(input: &mut I) -> Option { + Some(JournalRecord { + hash: Slicable::decode(input)?, + parent_hash: Slicable::decode(input)?, + inserted: Slicable::decode(input)?, + deleted: Slicable::decode(input)?, + }) + } +} + +fn to_journal_key(block: u64, index: u64) -> Vec { + to_meta_key(UNFINALIZED_JOURNAL, &(block, index)) +} + +#[cfg_attr(test, derive(PartialEq, Debug))] +struct BlockOverlay { + hash: BlockHash, + journal_key: Vec, + values: HashMap, + deleted: Vec, +} + +impl UnfinalizedOverlay { + /// Creates a new instance. Does not expect any metadata to be present in the DB. + pub fn new(db: &D) -> Result, Error> { + let last_finalized = db.get_meta(&to_meta_key(LAST_FINALIZED, &())) + .map_err(|e| Error::Db(e))?; + let last_finalized = match last_finalized { + Some(buffer) => Some(<(BlockHash, u64)>::decode(&mut buffer.as_slice()).ok_or(Error::Decoding)?), + None => None, + }; + let mut levels = VecDeque::new(); + let mut parents = HashMap::new(); + if let Some((ref hash, mut block)) = last_finalized { + // read the journal + trace!(target: "state-db", "Reading unfinalized journal. Last finalized #{} ({:?})", block, hash); + let mut total: u64 = 0; + block += 1; + loop { + let mut index: u64 = 0; + let mut level = Vec::new(); + loop { + let journal_key = to_journal_key(block, index); + match db.get_meta(&journal_key).map_err(|e| Error::Db(e))? { + Some(record) => { + let record: JournalRecord = Slicable::decode(&mut record.as_slice()).ok_or(Error::Decoding)?; + let overlay = BlockOverlay { + hash: record.hash.clone(), + journal_key, + values: record.inserted.into_iter().collect(), + deleted: record.deleted, + }; + trace!(target: "state-db", "Unfinalized journal entry {}.{} ({} inserted, {} deleted)", block, index, overlay.values.len(), overlay.deleted.len()); + level.push(overlay); + parents.insert(record.hash, record.parent_hash); + index += 1; + total += 1; + }, + None => break, + } + } + if level.is_empty() { + break; + } + levels.push_back(level); + block += 1; + } + trace!(target: "state-db", "Finished reading unfinalized journal, {} entries", total); + } + Ok(UnfinalizedOverlay { + last_finalized: last_finalized, + levels, + parents, + }) + } + + /// Insert a new block into the overlay. If inserted on the second level or lover expects parent to be present in the window. + pub fn insert(&mut self, hash: &BlockHash, number: u64, parent_hash: &BlockHash, changeset: ChangeSet) -> CommitSet { + let mut commit = CommitSet::default(); + if self.levels.is_empty() && self.last_finalized.is_none() { + // assume that parent was finalized + let last_finalized = (parent_hash.clone(), number - 1); + commit.meta.inserted.push((to_meta_key(LAST_FINALIZED, &()), last_finalized.encode())); + self.last_finalized = Some(last_finalized); + } else if self.last_finalized.is_some() { + assert!(number >= self.front_block_number() && number < (self.front_block_number() + self.levels.len() as u64 + 1)); + // check for valid parent if inserting on second level or higher + if number == self.front_block_number() { + assert!(self.last_finalized.as_ref().map_or(false, |&(ref h, n)| h == parent_hash && n == number - 1)); + } else { + assert!(self.parents.contains_key(&parent_hash)); + } + } + let level = if self.levels.is_empty() || number == self.front_block_number() + self.levels.len() as u64 { + self.levels.push_back(Vec::new()); + self.levels.back_mut().expect("can't be empty after insertion; qed") + } else { + let front_block_number = self.front_block_number(); + self.levels.get_mut((number - front_block_number) as usize) + .expect("number is [front_block_number .. front_block_number + levels.len()) is asserted in precondition; qed") + }; + + let index = level.len() as u64; + let journal_key = to_journal_key(number, index); + + let overlay = BlockOverlay { + hash: hash.clone(), + journal_key: journal_key.clone(), + values: changeset.inserted.iter().cloned().collect(), + deleted: changeset.deleted.clone(), + }; + level.push(overlay); + self.parents.insert(hash.clone(), parent_hash.clone()); + let journal_record = JournalRecord { + hash: hash.clone(), + parent_hash: parent_hash.clone(), + inserted: changeset.inserted, + deleted: changeset.deleted, + }; + trace!(target: "state-db", "Inserted unfinalized changeset {}.{} ({} inserted, {} deleted)", number, index, journal_record.inserted.len(), journal_record.deleted.len()); + let journal_record = journal_record.encode(); + commit.meta.inserted.push((journal_key, journal_record)); + commit + } + + fn discard( + levels: &mut [Vec>], + parents: &mut HashMap, + discarded_journals: &mut Vec>, + number: u64, + hash: &BlockHash, + ) { + if let Some((level, sublevels)) = levels.split_first_mut() { + level.retain(|ref overlay| { + let parent = parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone(); + if parent == *hash { + parents.remove(&overlay.hash); + discarded_journals.push(overlay.journal_key.clone()); + Self::discard(sublevels, parents, discarded_journals, number + 1, &overlay.hash); + false + } else { + true + } + }); + } + } + + fn front_block_number(&self) -> u64 { + self.last_finalized.as_ref().map(|&(_, n)| n + 1).unwrap_or(0) + } + + /// Select a top-level root and finalized it. Discards all sibling subtrees and the root. + /// Returns a set of changes that need to be added to the DB. + pub fn finalize(&mut self, hash: &BlockHash) -> CommitSet { + trace!(target: "state-db", "Finalizing {:?}", hash); + let level = self.levels.pop_front().expect("no blocks to finalize"); + let index = level.iter().position(|overlay| overlay.hash == *hash) + .expect("attempting to finalize unknown block"); + + let mut commit = CommitSet::default(); + let mut discarded_journals = Vec::new(); + for (i, overlay) in level.into_iter().enumerate() { + self.parents.remove(&overlay.hash); + if i == index { + // that's the one we need to finalize + commit.data.inserted = overlay.values.into_iter().collect(); + commit.data.deleted = overlay.deleted; + } else { + // TODO: borrow checker won't allow us to split out mutable refernces + // required for recursive processing. A more efficient implementaion + // that does not require converting to vector is possible + let mut vec: Vec<_> = self.levels.drain(..).collect(); + Self::discard(&mut vec, &mut self.parents, &mut discarded_journals, 0, &overlay.hash); + self.levels.extend(vec.into_iter()); + } + // cleanup journal entry + discarded_journals.push(overlay.journal_key); + } + commit.meta.deleted.append(&mut discarded_journals); + let last_finalized = (hash.clone(), self.front_block_number()); + commit.meta.inserted.push((to_meta_key(LAST_FINALIZED, &()), last_finalized.encode())); + self.last_finalized = Some(last_finalized); + trace!(target: "state-db", "Discarded {} records", commit.meta.deleted.len()); + commit + } + + /// Get a value from the node overlay. This searches in every existing changeset. + pub fn get(&self, key: &Key) -> Option { + for level in self.levels.iter() { + for overlay in level.iter() { + if let Some(value) = overlay.values.get(&key) { + return Some(value.clone()); + } + } + } + None + } +} + +#[cfg(test)] +mod tests { + use super::UnfinalizedOverlay; + use {ChangeSet}; + use primitives::H256; + use test::{make_db, make_changeset}; + + fn contains(overlay: &UnfinalizedOverlay, key: u64) -> bool { + overlay.get(&H256::from(key)) == Some(H256::from(key).to_vec()) + } + + #[test] + fn created_from_empty_db() { + let db = make_db(&[]); + let overlay: UnfinalizedOverlay = UnfinalizedOverlay::new(&db).unwrap(); + assert_eq!(overlay.last_finalized, None); + assert!(overlay.levels.is_empty()); + assert!(overlay.parents.is_empty()); + } + + #[test] + #[should_panic] + fn finalize_empty_panics() { + let db = make_db(&[]); + let mut overlay = UnfinalizedOverlay::::new(&db).unwrap(); + overlay.finalize(&H256::default()); + } + + #[test] + #[should_panic] + fn insert_ahead_panics() { + let db = make_db(&[]); + let h1 = H256::random(); + let h2 = H256::random(); + let mut overlay = UnfinalizedOverlay::::new(&db).unwrap(); + overlay.insert(&h1, 2, &H256::default(), ChangeSet::default()); + overlay.insert(&h2, 1, &h1, ChangeSet::default()); + } + + #[test] + #[should_panic] + fn insert_behind_panics() { + let h1 = H256::random(); + let h2 = H256::random(); + let db = make_db(&[]); + let mut overlay = UnfinalizedOverlay::::new(&db).unwrap(); + overlay.insert(&h1, 1, &H256::default(), ChangeSet::default()); + overlay.insert(&h2, 3, &h1, ChangeSet::default()); + } + + #[test] + #[should_panic] + fn insert_unknown_parent_panics() { + let db = make_db(&[]); + let h1 = H256::random(); + let h2 = H256::random(); + let mut overlay = UnfinalizedOverlay::::new(&db).unwrap(); + overlay.insert(&h1, 1, &H256::default(), ChangeSet::default()); + overlay.insert(&h2, 2, &H256::default(), ChangeSet::default()); + } + + #[test] + #[should_panic] + fn finalize_unknown_panics() { + let h1 = H256::random(); + let h2 = H256::random(); + let db = make_db(&[]); + let mut overlay = UnfinalizedOverlay::::new(&db).unwrap(); + overlay.insert(&h1, 1, &H256::default(), ChangeSet::default()); + overlay.finalize(&h2); + } + + #[test] + fn insert_finalize_one() { + let h1 = H256::random(); + let mut db = make_db(&[1, 2]); + let mut overlay = UnfinalizedOverlay::::new(&db).unwrap(); + let changeset = make_changeset(&[3, 4], &[2]); + let insertion = overlay.insert(&h1, 1, &H256::default(), changeset.clone()); + assert_eq!(insertion.data.inserted.len(), 0); + assert_eq!(insertion.data.deleted.len(), 0); + assert_eq!(insertion.meta.inserted.len(), 2); + assert_eq!(insertion.meta.deleted.len(), 0); + db.commit(&insertion); + let finalization = overlay.finalize(&h1); + assert_eq!(finalization.data.inserted.len(), changeset.inserted.len()); + assert_eq!(finalization.data.deleted.len(), changeset.deleted.len()); + assert_eq!(finalization.meta.inserted.len(), 1); + assert_eq!(finalization.meta.deleted.len(), 1); + db.commit(&finalization); + assert!(db.data_eq(&make_db(&[1, 3, 4]))); + } + + #[test] + fn restore_from_journal() { + let h1 = H256::random(); + let h2 = H256::random(); + let mut db = make_db(&[1, 2]); + let mut overlay = UnfinalizedOverlay::::new(&db).unwrap(); + db.commit(&overlay.insert(&h1, 10, &H256::default(), make_changeset(&[3, 4], &[2]))); + db.commit(&overlay.insert(&h2, 11, &h1, make_changeset(&[5], &[3]))); + assert_eq!(db.meta.len(), 3); + + let overlay2 = UnfinalizedOverlay::::new(&db).unwrap(); + assert_eq!(overlay.levels, overlay2.levels); + assert_eq!(overlay.parents, overlay2.parents); + assert_eq!(overlay.last_finalized, overlay2.last_finalized); + } + + #[test] + fn insert_finalize_two() { + let h1 = H256::random(); + let h2 = H256::random(); + let mut db = make_db(&[1, 2, 3, 4]); + let mut overlay = UnfinalizedOverlay::::new(&db).unwrap(); + let changeset1 = make_changeset(&[5, 6], &[2]); + let changeset2 = make_changeset(&[7, 8], &[5, 3]); + db.commit(&overlay.insert(&h1, 1, &H256::default(), changeset1)); + assert!(contains(&overlay, 5)); + db.commit(&overlay.insert(&h2, 2, &h1, changeset2)); + assert!(contains(&overlay, 7)); + assert!(contains(&overlay, 5)); + assert_eq!(overlay.levels.len(), 2); + assert_eq!(overlay.parents.len(), 2); + db.commit(&overlay.finalize(&h1)); + assert_eq!(overlay.levels.len(), 1); + assert_eq!(overlay.parents.len(), 1); + assert!(!contains(&overlay, 5)); + assert!(contains(&overlay, 7)); + db.commit(&overlay.finalize(&h2)); + assert_eq!(overlay.levels.len(), 0); + assert_eq!(overlay.parents.len(), 0); + assert!(db.data_eq(&make_db(&[1, 4, 6, 7, 8]))); + } + + + #[test] + fn complex_tree() { + let mut db = make_db(&[]); + + // - 1 - 1_1 - 1_1_1 + // \ 1_2 - 1_2_1 + // \ 1_2_2 + // \ 1_2_3 + // + // - 2 - 2_1 - 2_1_1 + // \ 2_2 + // + // 1_2_2 is the winner + + let (h_1, c_1) = (H256::random(), make_changeset(&[1], &[])); + let (h_2, c_2) = (H256::random(), make_changeset(&[2], &[])); + + let (h_1_1, c_1_1) = (H256::random(), make_changeset(&[11], &[])); + let (h_1_2, c_1_2) = (H256::random(), make_changeset(&[12], &[])); + let (h_2_1, c_2_1) = (H256::random(), make_changeset(&[21], &[])); + let (h_2_2, c_2_2) = (H256::random(), make_changeset(&[22], &[])); + + let (h_1_1_1, c_1_1_1) = (H256::random(), make_changeset(&[111], &[])); + let (h_1_2_1, c_1_2_1) = (H256::random(), make_changeset(&[121], &[])); + let (h_1_2_2, c_1_2_2) = (H256::random(), make_changeset(&[122], &[])); + let (h_1_2_3, c_1_2_3) = (H256::random(), make_changeset(&[123], &[])); + let (h_2_1_1, c_2_1_1) = (H256::random(), make_changeset(&[211], &[])); + + let mut overlay = UnfinalizedOverlay::::new(&db).unwrap(); + db.commit(&overlay.insert(&h_1, 1, &H256::default(), c_1)); + + db.commit(&overlay.insert(&h_1_1, 2, &h_1, c_1_1)); + db.commit(&overlay.insert(&h_1_2, 2, &h_1, c_1_2)); + + db.commit(&overlay.insert(&h_2, 1, &H256::default(), c_2)); + + db.commit(&overlay.insert(&h_2_1, 2, &h_2, c_2_1)); + db.commit(&overlay.insert(&h_2_2, 2, &h_2, c_2_2)); + + db.commit(&overlay.insert(&h_1_1_1, 3, &h_1_1, c_1_1_1)); + db.commit(&overlay.insert(&h_1_2_1, 3, &h_1_2, c_1_2_1)); + db.commit(&overlay.insert(&h_1_2_2, 3, &h_1_2, c_1_2_2)); + db.commit(&overlay.insert(&h_1_2_3, 3, &h_1_2, c_1_2_3)); + db.commit(&overlay.insert(&h_2_1_1, 3, &h_2_1, c_2_1_1)); + + assert!(contains(&overlay, 2)); + assert!(contains(&overlay, 11)); + assert!(contains(&overlay, 21)); + assert!(contains(&overlay, 111)); + assert!(contains(&overlay, 122)); + assert!(contains(&overlay, 211)); + assert_eq!(overlay.levels.len(), 3); + assert_eq!(overlay.parents.len(), 11); + assert_eq!(overlay.last_finalized, Some((H256::default(), 0))); + + // check if restoration from journal results in the same tree + let overlay2 = UnfinalizedOverlay::::new(&db).unwrap(); + assert_eq!(overlay.levels, overlay2.levels); + assert_eq!(overlay.parents, overlay2.parents); + assert_eq!(overlay.last_finalized, overlay2.last_finalized); + + // finalize 1. 2 and all its children should be discarded + db.commit(&overlay.finalize(&h_1)); + assert_eq!(overlay.levels.len(), 2); + assert_eq!(overlay.parents.len(), 6); + assert!(!contains(&overlay, 1)); + assert!(!contains(&overlay, 2)); + assert!(!contains(&overlay, 21)); + assert!(!contains(&overlay, 22)); + assert!(!contains(&overlay, 211)); + assert!(contains(&overlay, 111)); + + // finalize 1_2. 1_1 and all its children should be discarded + db.commit(&overlay.finalize(&h_1_2)); + assert_eq!(overlay.levels.len(), 1); + assert_eq!(overlay.parents.len(), 3); + assert!(!contains(&overlay, 11)); + assert!(!contains(&overlay, 111)); + assert!(contains(&overlay, 121)); + assert!(contains(&overlay, 122)); + assert!(contains(&overlay, 123)); + + // finalize 1_2_2 + db.commit(&overlay.finalize(&h_1_2_2)); + assert_eq!(overlay.levels.len(), 0); + assert_eq!(overlay.parents.len(), 0); + assert!(db.data_eq(&make_db(&[1, 12, 122]))); + assert_eq!(overlay.last_finalized, Some((h_1_2_2, 3))); + } +} diff --git a/substrate/substrate/state-machine/Cargo.toml b/substrate/substrate/state-machine/Cargo.toml index 74a46764cf..17b072c4c6 100644 --- a/substrate/substrate/state-machine/Cargo.toml +++ b/substrate/substrate/state-machine/Cargo.toml @@ -15,6 +15,5 @@ triehash = "0.1" substrate-primitives = { path = "../primitives", version = "0.1.0" } hashdb = { git = "https://github.com/paritytech/parity.git" } -kvdb = { git = "https://github.com/paritytech/parity.git" } memorydb = { git = "https://github.com/paritytech/parity.git" } patricia-trie = { git = "https://github.com/paritytech/parity.git" } diff --git a/substrate/substrate/state-machine/src/lib.rs b/substrate/substrate/state-machine/src/lib.rs index af643d7897..c9c087f502 100644 --- a/substrate/substrate/state-machine/src/lib.rs +++ b/substrate/substrate/state-machine/src/lib.rs @@ -25,7 +25,6 @@ extern crate hex_literal; extern crate log; extern crate ethereum_types; -extern crate kvdb; extern crate hashdb; extern crate memorydb; extern crate triehash; @@ -47,7 +46,7 @@ mod trie_backend; pub use testing::TestExternalities; pub use ext::Ext; pub use backend::Backend; -pub use trie_backend::{TryIntoTrieBackend, TrieBackend}; +pub use trie_backend::{TryIntoTrieBackend, TrieBackend, TrieH256, Storage, DBValue}; /// The overlayed changes to state to be queried on top of the backend. /// diff --git a/substrate/substrate/state-machine/src/trie_backend.rs b/substrate/substrate/state-machine/src/trie_backend.rs index 5a9af8f57b..3419bb8f67 100644 --- a/substrate/substrate/state-machine/src/trie_backend.rs +++ b/substrate/substrate/state-machine/src/trie_backend.rs @@ -18,12 +18,18 @@ use std::collections::HashMap; use std::sync::Arc; -use ethereum_types::H256 as TrieH256; -use hashdb::{DBValue, HashDB}; -use kvdb::KeyValueDB; +use hashdb::HashDB; use memorydb::MemoryDB; use patricia_trie::{TrieDB, TrieDBMut, TrieError, Trie, TrieMut}; use {Backend}; +pub use ethereum_types::H256 as TrieH256; +pub use hashdb::DBValue; + +/// Backend trie storage trait. +pub trait Storage: Send + Sync { + /// Get a trie node. + fn get(&self, key: &TrieH256) -> Result, String>; +} /// Try convert into trie-based backend. pub trait TryIntoTrieBackend { @@ -40,20 +46,20 @@ pub struct TrieBackend { impl TrieBackend { /// Create new trie-based backend. - pub fn with_kvdb(db: Arc, storage_column: Option, root: TrieH256) -> Self { + pub fn with_storage(db: Arc, root: TrieH256) -> Self { TrieBackend { - storage: TrieBackendStorage::KeyValueDb(db, storage_column), + storage: TrieBackendStorage::Storage(db), root, } } /// Create new trie-based backend for genesis block. - pub fn with_kvdb_for_genesis(db: Arc, storage_column: Option) -> Self { + pub fn with_storage_for_genesis(db: Arc) -> Self { let mut root = TrieH256::default(); let mut mdb = MemoryDB::default(); TrieDBMut::new(&mut mdb, &mut root); - Self::with_kvdb(db, storage_column, root) + Self::with_storage(db, root) } /// Create new trie-based backend backed by MemoryDb storage. @@ -182,7 +188,7 @@ impl<'a> HashDB for Ephemeral<'a> { Some(val) } } - None => match self.storage.get(&key.0[..]) { + None => match self.storage.get(&key) { Ok(x) => x, Err(e) => { warn!(target: "trie", "Failed to read from DB: {}", e); @@ -212,19 +218,19 @@ impl<'a> HashDB for Ephemeral<'a> { #[derive(Clone)] pub enum TrieBackendStorage { /// Key value db + storage column. - KeyValueDb(Arc, Option), + Storage(Arc), /// Hash db. MemoryDb(MemoryDB), } impl TrieBackendStorage { - pub fn get(&self, key: &[u8]) -> Result, String> { + pub fn get(&self, key: &TrieH256) -> Result, String> { match *self { - TrieBackendStorage::KeyValueDb(ref db, storage_column) => - db.get(storage_column, key) + TrieBackendStorage::Storage(ref db) => + db.get(key) .map_err(|e| format!("Trie lookup error: {}", e)), TrieBackendStorage::MemoryDb(ref db) => - Ok(db.get(&TrieH256::from_slice(key))), + Ok(db.get(key)), } } }