mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 07:41:08 +00:00
paritydb support for parachains db. (#4838)
* parity db subsystem without cache and no splitted column * fmt * fix path (auto from parity-db fail) * lru cache for db column with cache * Revert "lru cache for db column with cache" This reverts commit ae177bc5e107a075eff6a21f651218ada6599b74. * Write_lock mutex * theoric code for bridges * revert changes * Revert bridge changes * fix spec_version * update parity db * test purge-db * Use specific ordered collection with paritydb. * Revert "Use specific ordered collection with paritydb." This reverts commit 8b66d0a4ae914cba1af0f44050d45dd6d9327c6b. * fix chain selection tests. * remove patch * fix auto. * Remove useless exists directory method * purge chain without parity-db removal * spellcheck * renamings and filtering. * fix assertion * format * update parity-db and fmt * Auto keep using rocksdb when it exists. * Revert "Auto keep using rocksdb when it exists." This reverts commit cea49b32ae590bdce31fed5c45f3c028ae0c7564. * Update kvdb version.
This commit is contained in:
Generated
+18
-2
@@ -3594,6 +3594,15 @@ dependencies = [
|
||||
"smallvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kvdb-shared-tests"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7a9001edd3459aa1503ea84215cf4618a6e2e020f51682494cc6f5ab1150e68e"
|
||||
dependencies = [
|
||||
"kvdb",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy_static"
|
||||
version = "1.4.0"
|
||||
@@ -5861,9 +5870,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parity-db"
|
||||
version = "0.3.5"
|
||||
version = "0.3.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "78a95abf24f1097c6e3181abbbbfc3630b3b5e681470940f719b69acb4911c7f"
|
||||
checksum = "865edee5b792f537356d9e55cbc138e7f4718dc881a7ea45a18b37bf61c21e3d"
|
||||
dependencies = [
|
||||
"blake2-rfc",
|
||||
"crc32fast",
|
||||
@@ -6984,11 +6993,16 @@ dependencies = [
|
||||
"fatality",
|
||||
"futures 0.3.21",
|
||||
"itertools",
|
||||
"kvdb",
|
||||
"kvdb-shared-tests",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"lru 0.7.3",
|
||||
"metered-channel",
|
||||
"parity-db",
|
||||
"parity-scale-codec",
|
||||
"parity-util-mem",
|
||||
"parking_lot 0.11.2",
|
||||
"pin-project 1.0.10",
|
||||
"polkadot-node-jaeger",
|
||||
"polkadot-node-metrics",
|
||||
@@ -7003,6 +7017,7 @@ dependencies = [
|
||||
"sp-application-crypto",
|
||||
"sp-core",
|
||||
"sp-keystore",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tracing",
|
||||
]
|
||||
@@ -7396,6 +7411,7 @@ dependencies = [
|
||||
"pallet-mmr-primitives",
|
||||
"pallet-staking",
|
||||
"pallet-transaction-payment-rpc-runtime-api",
|
||||
"parity-db",
|
||||
"polkadot-approval-distribution",
|
||||
"polkadot-availability-bitfield-distribution",
|
||||
"polkadot-availability-distribution",
|
||||
|
||||
@@ -16,10 +16,10 @@
|
||||
|
||||
//! Version 1 of the DB schema.
|
||||
|
||||
use kvdb::{DBTransaction, KeyValueDB};
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
use polkadot_node_primitives::approval::{AssignmentCert, DelayTranche};
|
||||
use polkadot_node_subsystem::{SubsystemError, SubsystemResult};
|
||||
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
|
||||
use polkadot_primitives::v1::{
|
||||
BlockNumber, CandidateHash, CandidateReceipt, CoreIndex, GroupIndex, Hash, SessionIndex,
|
||||
ValidatorIndex, ValidatorSignature,
|
||||
@@ -41,14 +41,14 @@ pub mod tests;
|
||||
|
||||
/// `DbBackend` is a concrete implementation of the higher-level Backend trait
|
||||
pub struct DbBackend {
|
||||
inner: Arc<dyn KeyValueDB>,
|
||||
inner: Arc<dyn Database>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
impl DbBackend {
|
||||
/// Create a new [`DbBackend`] with the supplied key-value store and
|
||||
/// config.
|
||||
pub fn new(db: Arc<dyn KeyValueDB>, config: Config) -> Self {
|
||||
pub fn new(db: Arc<dyn Database>, config: Config) -> Self {
|
||||
DbBackend { inner: db, config }
|
||||
}
|
||||
}
|
||||
@@ -239,7 +239,7 @@ impl std::error::Error for Error {}
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub(crate) fn load_decode<D: Decode>(
|
||||
store: &dyn KeyValueDB,
|
||||
store: &dyn Database,
|
||||
col_data: u32,
|
||||
key: &[u8],
|
||||
) -> Result<Option<D>> {
|
||||
@@ -283,7 +283,7 @@ pub(crate) fn blocks_at_height_key(block_number: BlockNumber) -> [u8; 16] {
|
||||
}
|
||||
|
||||
/// Return all blocks which have entries in the DB, ascending, by height.
|
||||
pub fn load_all_blocks(store: &dyn KeyValueDB, config: &Config) -> SubsystemResult<Vec<Hash>> {
|
||||
pub fn load_all_blocks(store: &dyn Database, config: &Config) -> SubsystemResult<Vec<Hash>> {
|
||||
let mut hashes = Vec::new();
|
||||
if let Some(stored_blocks) = load_stored_blocks(store, config)? {
|
||||
for height in stored_blocks.0..stored_blocks.1 {
|
||||
@@ -297,7 +297,7 @@ pub fn load_all_blocks(store: &dyn KeyValueDB, config: &Config) -> SubsystemResu
|
||||
|
||||
/// Load the stored-blocks key from the state.
|
||||
pub fn load_stored_blocks(
|
||||
store: &dyn KeyValueDB,
|
||||
store: &dyn Database,
|
||||
config: &Config,
|
||||
) -> SubsystemResult<Option<StoredBlockRange>> {
|
||||
load_decode(store, config.col_data, STORED_BLOCKS_KEY)
|
||||
@@ -306,7 +306,7 @@ pub fn load_stored_blocks(
|
||||
|
||||
/// Load a blocks-at-height entry for a given block number.
|
||||
pub fn load_blocks_at_height(
|
||||
store: &dyn KeyValueDB,
|
||||
store: &dyn Database,
|
||||
config: &Config,
|
||||
block_number: &BlockNumber,
|
||||
) -> SubsystemResult<Vec<Hash>> {
|
||||
@@ -317,7 +317,7 @@ pub fn load_blocks_at_height(
|
||||
|
||||
/// Load a block entry from the aux store.
|
||||
pub fn load_block_entry(
|
||||
store: &dyn KeyValueDB,
|
||||
store: &dyn Database,
|
||||
config: &Config,
|
||||
block_hash: &Hash,
|
||||
) -> SubsystemResult<Option<BlockEntry>> {
|
||||
@@ -328,7 +328,7 @@ pub fn load_block_entry(
|
||||
|
||||
/// Load a candidate entry from the aux store.
|
||||
pub fn load_candidate_entry(
|
||||
store: &dyn KeyValueDB,
|
||||
store: &dyn Database,
|
||||
config: &Config,
|
||||
candidate_hash: &CandidateHash,
|
||||
) -> SubsystemResult<Option<CandidateEntry>> {
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::{
|
||||
backend::{Backend, OverlayedBackend},
|
||||
ops::{add_block_entry, canonicalize, force_approve, NewCandidateInfo},
|
||||
};
|
||||
use kvdb::KeyValueDB;
|
||||
use polkadot_node_subsystem_util::database::Database;
|
||||
use polkadot_primitives::v1::Id as ParaId;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
@@ -32,8 +32,10 @@ const NUM_COLUMNS: u32 = 1;
|
||||
|
||||
const TEST_CONFIG: Config = Config { col_data: DATA_COL };
|
||||
|
||||
fn make_db() -> (DbBackend, Arc<dyn KeyValueDB>) {
|
||||
let db_writer: Arc<dyn KeyValueDB> = Arc::new(kvdb_memorydb::create(NUM_COLUMNS));
|
||||
fn make_db() -> (DbBackend, Arc<dyn Database>) {
|
||||
let db = kvdb_memorydb::create(NUM_COLUMNS);
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
|
||||
let db_writer: Arc<dyn Database> = Arc::new(db);
|
||||
(DbBackend::new(db_writer.clone(), TEST_CONFIG), db_writer)
|
||||
}
|
||||
|
||||
|
||||
@@ -579,11 +579,11 @@ pub(crate) mod tests {
|
||||
use crate::approval_db::v1::DbBackend;
|
||||
use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
|
||||
use assert_matches::assert_matches;
|
||||
use kvdb::KeyValueDB;
|
||||
use merlin::Transcript;
|
||||
use polkadot_node_primitives::approval::{VRFOutput, VRFProof};
|
||||
use polkadot_node_subsystem::messages::AllMessages;
|
||||
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
|
||||
use polkadot_node_subsystem_util::database::Database;
|
||||
use polkadot_primitives::{v1::ValidatorIndex, v2::SessionInfo};
|
||||
pub(crate) use sp_consensus_babe::{
|
||||
digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest},
|
||||
@@ -1125,7 +1125,9 @@ pub(crate) mod tests {
|
||||
|
||||
#[test]
|
||||
fn insta_approval_works() {
|
||||
let db_writer: Arc<dyn KeyValueDB> = Arc::new(kvdb_memorydb::create(NUM_COLUMNS));
|
||||
let db = kvdb_memorydb::create(NUM_COLUMNS);
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
|
||||
let db_writer: Arc<dyn Database> = Arc::new(db);
|
||||
let mut db = DbBackend::new(db_writer.clone(), TEST_CONFIG);
|
||||
let mut overlay_db = OverlayedBackend::new(&db);
|
||||
|
||||
|
||||
@@ -21,7 +21,6 @@
|
||||
//! of others. It uses this information to determine when candidates and blocks have
|
||||
//! been sufficiently approved to finalize.
|
||||
|
||||
use kvdb::KeyValueDB;
|
||||
use polkadot_node_jaeger as jaeger;
|
||||
use polkadot_node_primitives::{
|
||||
approval::{
|
||||
@@ -43,6 +42,7 @@ use polkadot_node_subsystem::{
|
||||
SubsystemResult, SubsystemSender,
|
||||
};
|
||||
use polkadot_node_subsystem_util::{
|
||||
database::Database,
|
||||
metrics::{self, prometheus},
|
||||
rolling_session_window::{
|
||||
new_session_window_size, RollingSessionWindow, SessionWindowSize, SessionWindowUpdate,
|
||||
@@ -140,7 +140,7 @@ pub struct ApprovalVotingSubsystem {
|
||||
keystore: Arc<LocalKeystore>,
|
||||
db_config: DatabaseConfig,
|
||||
slot_duration_millis: u64,
|
||||
db: Arc<dyn KeyValueDB>,
|
||||
db: Arc<dyn Database>,
|
||||
mode: Mode,
|
||||
metrics: Metrics,
|
||||
}
|
||||
@@ -329,7 +329,7 @@ impl ApprovalVotingSubsystem {
|
||||
/// Create a new approval voting subsystem with the given keystore, config, and database.
|
||||
pub fn with_config(
|
||||
config: Config,
|
||||
db: Arc<dyn KeyValueDB>,
|
||||
db: Arc<dyn Database>,
|
||||
keystore: Arc<LocalKeystore>,
|
||||
sync_oracle: Box<dyn SyncOracle + Send>,
|
||||
metrics: Metrics,
|
||||
|
||||
@@ -475,6 +475,9 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
|
||||
);
|
||||
|
||||
let clock = Box::new(clock);
|
||||
let db = kvdb_memorydb::create(test_constants::NUM_COLUMNS);
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
|
||||
|
||||
let subsystem = run(
|
||||
context,
|
||||
ApprovalVotingSubsystem::with_config(
|
||||
@@ -482,7 +485,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
|
||||
col_data: test_constants::TEST_CONFIG.col_data,
|
||||
slot_duration_millis: SLOT_DURATION_MILLIS,
|
||||
},
|
||||
Arc::new(kvdb_memorydb::create(test_constants::NUM_COLUMNS)),
|
||||
Arc::new(db),
|
||||
Arc::new(keystore),
|
||||
sync_oracle,
|
||||
Metrics::default(),
|
||||
|
||||
@@ -28,8 +28,8 @@ use std::{
|
||||
|
||||
use futures::{channel::oneshot, future, select, FutureExt};
|
||||
use futures_timer::Delay;
|
||||
use kvdb::{DBTransaction, KeyValueDB};
|
||||
use parity_scale_codec::{Decode, Encode, Error as CodecError, Input};
|
||||
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
|
||||
|
||||
use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
|
||||
use polkadot_node_primitives::{AvailableData, ErasureChunk};
|
||||
@@ -152,7 +152,7 @@ struct CandidateMeta {
|
||||
}
|
||||
|
||||
fn query_inner<D: Decode>(
|
||||
db: &Arc<dyn KeyValueDB>,
|
||||
db: &Arc<dyn Database>,
|
||||
column: u32,
|
||||
key: &[u8],
|
||||
) -> Result<Option<D>, Error> {
|
||||
@@ -181,7 +181,7 @@ fn write_available_data(
|
||||
}
|
||||
|
||||
fn load_available_data(
|
||||
db: &Arc<dyn KeyValueDB>,
|
||||
db: &Arc<dyn Database>,
|
||||
config: &Config,
|
||||
hash: &CandidateHash,
|
||||
) -> Result<Option<AvailableData>, Error> {
|
||||
@@ -197,7 +197,7 @@ fn delete_available_data(tx: &mut DBTransaction, config: &Config, hash: &Candida
|
||||
}
|
||||
|
||||
fn load_chunk(
|
||||
db: &Arc<dyn KeyValueDB>,
|
||||
db: &Arc<dyn Database>,
|
||||
config: &Config,
|
||||
candidate_hash: &CandidateHash,
|
||||
chunk_index: ValidatorIndex,
|
||||
@@ -231,7 +231,7 @@ fn delete_chunk(
|
||||
}
|
||||
|
||||
fn load_meta(
|
||||
db: &Arc<dyn KeyValueDB>,
|
||||
db: &Arc<dyn Database>,
|
||||
config: &Config,
|
||||
hash: &CandidateHash,
|
||||
) -> Result<Option<CandidateMeta>, Error> {
|
||||
@@ -443,7 +443,7 @@ impl Clock for SystemClock {
|
||||
pub struct AvailabilityStoreSubsystem {
|
||||
pruning_config: PruningConfig,
|
||||
config: Config,
|
||||
db: Arc<dyn KeyValueDB>,
|
||||
db: Arc<dyn Database>,
|
||||
known_blocks: KnownUnfinalizedBlocks,
|
||||
finalized_number: Option<BlockNumber>,
|
||||
metrics: Metrics,
|
||||
@@ -452,7 +452,7 @@ pub struct AvailabilityStoreSubsystem {
|
||||
|
||||
impl AvailabilityStoreSubsystem {
|
||||
/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
|
||||
pub fn new(db: Arc<dyn KeyValueDB>, config: Config, metrics: Metrics) -> Self {
|
||||
pub fn new(db: Arc<dyn Database>, config: Config, metrics: Metrics) -> Self {
|
||||
Self::with_pruning_config_and_clock(
|
||||
db,
|
||||
config,
|
||||
@@ -464,7 +464,7 @@ impl AvailabilityStoreSubsystem {
|
||||
|
||||
/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
|
||||
fn with_pruning_config_and_clock(
|
||||
db: Arc<dyn KeyValueDB>,
|
||||
db: Arc<dyn Database>,
|
||||
config: Config,
|
||||
pruning_config: PruningConfig,
|
||||
clock: Box<dyn Clock>,
|
||||
@@ -661,7 +661,7 @@ where
|
||||
|
||||
async fn process_new_head<Context>(
|
||||
ctx: &mut Context,
|
||||
db: &Arc<dyn KeyValueDB>,
|
||||
db: &Arc<dyn Database>,
|
||||
db_transaction: &mut DBTransaction,
|
||||
config: &Config,
|
||||
pruning_config: &PruningConfig,
|
||||
@@ -711,7 +711,7 @@ where
|
||||
}
|
||||
|
||||
fn note_block_backed(
|
||||
db: &Arc<dyn KeyValueDB>,
|
||||
db: &Arc<dyn Database>,
|
||||
db_transaction: &mut DBTransaction,
|
||||
config: &Config,
|
||||
pruning_config: &PruningConfig,
|
||||
@@ -740,7 +740,7 @@ fn note_block_backed(
|
||||
}
|
||||
|
||||
fn note_block_included(
|
||||
db: &Arc<dyn KeyValueDB>,
|
||||
db: &Arc<dyn Database>,
|
||||
db_transaction: &mut DBTransaction,
|
||||
config: &Config,
|
||||
pruning_config: &PruningConfig,
|
||||
@@ -1128,7 +1128,7 @@ fn process_message(
|
||||
|
||||
// Ok(true) on success, Ok(false) on failure, and Err on internal error.
|
||||
fn store_chunk(
|
||||
db: &Arc<dyn KeyValueDB>,
|
||||
db: &Arc<dyn Database>,
|
||||
config: &Config,
|
||||
candidate_hash: CandidateHash,
|
||||
chunk: ErasureChunk,
|
||||
@@ -1222,7 +1222,7 @@ fn store_available_data(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prune_all(db: &Arc<dyn KeyValueDB>, config: &Config, clock: &dyn Clock) -> Result<(), Error> {
|
||||
fn prune_all(db: &Arc<dyn Database>, config: &Config, clock: &dyn Clock) -> Result<(), Error> {
|
||||
let now = clock.now()?;
|
||||
let (range_start, range_end) = pruning_range(now);
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ use ::test_helpers::TestCandidateBuilder;
|
||||
use parking_lot::Mutex;
|
||||
use polkadot_node_primitives::{AvailableData, BlockData, PoV, Proof};
|
||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||
use polkadot_node_subsystem_util::TimeoutExt;
|
||||
use polkadot_node_subsystem_util::{database::Database, TimeoutExt};
|
||||
use polkadot_primitives::v1::{
|
||||
CandidateHash, CandidateReceipt, CoreIndex, GroupIndex, HeadData, Header,
|
||||
PersistedValidationData, ValidatorId,
|
||||
@@ -107,7 +107,7 @@ impl Default for TestState {
|
||||
|
||||
fn test_harness<T: Future<Output = VirtualOverseer>>(
|
||||
state: TestState,
|
||||
store: Arc<dyn KeyValueDB>,
|
||||
store: Arc<dyn Database>,
|
||||
test: impl FnOnce(VirtualOverseer) -> T,
|
||||
) {
|
||||
let _ = env_logger::builder()
|
||||
@@ -180,7 +180,7 @@ async fn overseer_signal(overseer: &mut VirtualOverseer, signal: OverseerSignal)
|
||||
.expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT));
|
||||
}
|
||||
|
||||
fn with_tx(db: &Arc<impl KeyValueDB>, f: impl FnOnce(&mut DBTransaction)) {
|
||||
fn with_tx(db: &Arc<impl Database + ?Sized>, f: impl FnOnce(&mut DBTransaction)) {
|
||||
let mut tx = DBTransaction::new();
|
||||
f(&mut tx);
|
||||
db.write(tx).unwrap();
|
||||
@@ -195,9 +195,17 @@ fn candidate_included(receipt: CandidateReceipt) -> CandidateEvent {
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn test_store() -> Arc<dyn Database> {
|
||||
let db = kvdb_memorydb::create(columns::NUM_COLUMNS);
|
||||
let db =
|
||||
polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[columns::META]);
|
||||
Arc::new(db)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runtime_api_error_does_not_stop_the_subsystem() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
|
||||
test_harness(TestState::default(), store, |mut virtual_overseer| async move {
|
||||
let new_leaf = Hash::repeat_byte(0x01);
|
||||
@@ -270,7 +278,8 @@ fn runtime_api_error_does_not_stop_the_subsystem() {
|
||||
|
||||
#[test]
|
||||
fn store_chunk_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
|
||||
test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move {
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
|
||||
let validator_index = ValidatorIndex(5);
|
||||
@@ -317,7 +326,8 @@ fn store_chunk_works() {
|
||||
|
||||
#[test]
|
||||
fn store_chunk_does_nothing_if_no_entry_already() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
|
||||
test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move {
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
|
||||
let validator_index = ValidatorIndex(5);
|
||||
@@ -348,7 +358,8 @@ fn store_chunk_does_nothing_if_no_entry_already() {
|
||||
|
||||
#[test]
|
||||
fn query_chunk_checks_meta() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
|
||||
test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move {
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
|
||||
let validator_index = ValidatorIndex(5);
|
||||
@@ -395,7 +406,7 @@ fn query_chunk_checks_meta() {
|
||||
|
||||
#[test]
|
||||
fn store_block_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
let test_state = TestState::default();
|
||||
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
|
||||
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
|
||||
@@ -445,7 +456,7 @@ fn store_block_works() {
|
||||
|
||||
#[test]
|
||||
fn store_pov_and_query_chunk_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
|
||||
@@ -487,7 +498,7 @@ fn store_pov_and_query_chunk_works() {
|
||||
|
||||
#[test]
|
||||
fn query_all_chunks_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
|
||||
@@ -582,7 +593,7 @@ fn query_all_chunks_works() {
|
||||
|
||||
#[test]
|
||||
fn stored_but_not_included_data_is_pruned() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
|
||||
@@ -626,7 +637,7 @@ fn stored_but_not_included_data_is_pruned() {
|
||||
|
||||
#[test]
|
||||
fn stored_data_kept_until_finalized() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
|
||||
@@ -719,7 +730,7 @@ fn stored_data_kept_until_finalized() {
|
||||
|
||||
#[test]
|
||||
fn we_dont_miss_anything_if_import_notifications_are_missed() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
|
||||
@@ -843,7 +854,7 @@ fn we_dont_miss_anything_if_import_notifications_are_missed() {
|
||||
|
||||
#[test]
|
||||
fn forkfullness_works() {
|
||||
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
|
||||
let store = test_store();
|
||||
let test_state = TestState::default();
|
||||
|
||||
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
|
||||
|
||||
@@ -40,8 +40,8 @@ use crate::{
|
||||
use polkadot_node_primitives::BlockWeight;
|
||||
use polkadot_primitives::v1::{BlockNumber, Hash};
|
||||
|
||||
use kvdb::{DBTransaction, KeyValueDB};
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -194,14 +194,14 @@ pub struct Config {
|
||||
|
||||
/// The database backend.
|
||||
pub struct DbBackend {
|
||||
inner: Arc<dyn KeyValueDB>,
|
||||
inner: Arc<dyn Database>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
impl DbBackend {
|
||||
/// Create a new [`DbBackend`] with the supplied key-value store and
|
||||
/// config.
|
||||
pub fn new(db: Arc<dyn KeyValueDB>, config: Config) -> Self {
|
||||
pub fn new(db: Arc<dyn Database>, config: Config) -> Self {
|
||||
DbBackend { inner: db, config }
|
||||
}
|
||||
}
|
||||
@@ -326,7 +326,7 @@ impl Backend for DbBackend {
|
||||
}
|
||||
|
||||
fn load_decode<D: Decode>(
|
||||
db: &dyn KeyValueDB,
|
||||
db: &dyn Database,
|
||||
col_data: u32,
|
||||
key: &[u8],
|
||||
) -> Result<Option<D>, Error> {
|
||||
@@ -387,6 +387,13 @@ fn decode_stagnant_at_key(key: &[u8]) -> Option<Timestamp> {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[cfg(test)]
|
||||
fn test_db() -> Arc<dyn Database> {
|
||||
let db = kvdb_memorydb::create(1);
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]);
|
||||
Arc::new(db)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn block_height_key_decodes() {
|
||||
let key = block_height_key(5);
|
||||
@@ -425,7 +432,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn write_read_block_entry() {
|
||||
let db = Arc::new(kvdb_memorydb::create(1));
|
||||
let db = test_db();
|
||||
let config = Config { col_data: 0 };
|
||||
|
||||
let mut backend = DbBackend::new(db, config);
|
||||
@@ -455,7 +462,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn delete_block_entry() {
|
||||
let db = Arc::new(kvdb_memorydb::create(1));
|
||||
let db = test_db();
|
||||
let config = Config { col_data: 0 };
|
||||
|
||||
let mut backend = DbBackend::new(db, config);
|
||||
@@ -486,7 +493,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn earliest_block_number() {
|
||||
let db = Arc::new(kvdb_memorydb::create(1));
|
||||
let db = test_db();
|
||||
let config = Config { col_data: 0 };
|
||||
|
||||
let mut backend = DbBackend::new(db, config);
|
||||
@@ -515,7 +522,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn stagnant_at_up_to() {
|
||||
let db = Arc::new(kvdb_memorydb::create(1));
|
||||
let db = test_db();
|
||||
let config = Config { col_data: 0 };
|
||||
|
||||
let mut backend = DbBackend::new(db, config);
|
||||
@@ -571,7 +578,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn write_read_blocks_at_height() {
|
||||
let db = Arc::new(kvdb_memorydb::create(1));
|
||||
let db = test_db();
|
||||
let config = Config { col_data: 0 };
|
||||
|
||||
let mut backend = DbBackend::new(db, config);
|
||||
|
||||
@@ -22,10 +22,10 @@ use polkadot_node_subsystem::{
|
||||
messages::{ChainApiMessage, ChainSelectionMessage},
|
||||
overseer, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError,
|
||||
};
|
||||
use polkadot_node_subsystem_util::database::Database;
|
||||
use polkadot_primitives::v1::{BlockNumber, ConsensusLog, Hash, Header};
|
||||
|
||||
use futures::{channel::oneshot, future::Either, prelude::*};
|
||||
use kvdb::KeyValueDB;
|
||||
use parity_scale_codec::Error as CodecError;
|
||||
|
||||
use std::{
|
||||
@@ -306,13 +306,13 @@ pub struct Config {
|
||||
/// The chain selection subsystem.
|
||||
pub struct ChainSelectionSubsystem {
|
||||
config: Config,
|
||||
db: Arc<dyn KeyValueDB>,
|
||||
db: Arc<dyn Database>,
|
||||
}
|
||||
|
||||
impl ChainSelectionSubsystem {
|
||||
/// Create a new instance of the subsystem with the given config
|
||||
/// and key-value store.
|
||||
pub fn new(config: Config, db: Arc<dyn KeyValueDB>) -> Self {
|
||||
pub fn new(config: Config, db: Arc<dyn Database>) -> Self {
|
||||
ChainSelectionSubsystem { config, db }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,12 +39,12 @@ mod dummy;
|
||||
/// The real implementation.
|
||||
mod real;
|
||||
|
||||
use kvdb::KeyValueDB;
|
||||
use metrics::Metrics;
|
||||
use polkadot_node_subsystem::{
|
||||
messages::DisputeCoordinatorMessage, overseer, SpawnedSubsystem, SubsystemContext,
|
||||
SubsystemError,
|
||||
};
|
||||
use polkadot_node_subsystem_util::database::Database;
|
||||
use sc_keystore::LocalKeystore;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -66,7 +66,7 @@ impl DisputeCoordinatorSubsystem {
|
||||
|
||||
/// Create a new instance of the subsystem.
|
||||
pub fn new(
|
||||
store: Arc<dyn KeyValueDB>,
|
||||
store: Arc<dyn Database>,
|
||||
config: real::Config,
|
||||
keystore: Arc<LocalKeystore>,
|
||||
metrics: Metrics,
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
//! `V1` database for the dispute coordinator.
|
||||
|
||||
use polkadot_node_subsystem::{SubsystemError, SubsystemResult};
|
||||
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
|
||||
use polkadot_primitives::v1::{
|
||||
CandidateHash, CandidateReceipt, Hash, InvalidDisputeStatementKind, SessionIndex,
|
||||
ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature,
|
||||
@@ -24,7 +25,6 @@ use polkadot_primitives::v1::{
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use kvdb::{DBTransaction, KeyValueDB};
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
|
||||
use crate::{
|
||||
@@ -41,12 +41,12 @@ const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session";
|
||||
const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes";
|
||||
|
||||
pub struct DbBackend {
|
||||
inner: Arc<dyn KeyValueDB>,
|
||||
inner: Arc<dyn Database>,
|
||||
config: ColumnConfiguration,
|
||||
}
|
||||
|
||||
impl DbBackend {
|
||||
pub fn new(db: Arc<dyn KeyValueDB>, config: ColumnConfiguration) -> Self {
|
||||
pub fn new(db: Arc<dyn Database>, config: ColumnConfiguration) -> Self {
|
||||
Self { inner: db, config }
|
||||
}
|
||||
}
|
||||
@@ -176,7 +176,7 @@ impl From<Error> for crate::error::Error {
|
||||
/// Result alias for DB errors.
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
fn load_decode<D: Decode>(db: &dyn KeyValueDB, col_data: u32, key: &[u8]) -> Result<Option<D>> {
|
||||
fn load_decode<D: Decode>(db: &dyn Database, col_data: u32, key: &[u8]) -> Result<Option<D>> {
|
||||
match db.get(col_data, key)? {
|
||||
None => Ok(None),
|
||||
Some(raw) => D::decode(&mut &raw[..]).map(Some).map_err(Into::into),
|
||||
@@ -185,7 +185,7 @@ fn load_decode<D: Decode>(db: &dyn KeyValueDB, col_data: u32, key: &[u8]) -> Res
|
||||
|
||||
/// Load the candidate votes for the specific session-candidate pair, if any.
|
||||
pub(crate) fn load_candidate_votes(
|
||||
db: &dyn KeyValueDB,
|
||||
db: &dyn Database,
|
||||
config: &ColumnConfiguration,
|
||||
session: SessionIndex,
|
||||
candidate_hash: &CandidateHash,
|
||||
@@ -196,7 +196,7 @@ pub(crate) fn load_candidate_votes(
|
||||
|
||||
/// Load the earliest session, if any.
|
||||
pub(crate) fn load_earliest_session(
|
||||
db: &dyn KeyValueDB,
|
||||
db: &dyn Database,
|
||||
config: &ColumnConfiguration,
|
||||
) -> SubsystemResult<Option<SessionIndex>> {
|
||||
load_decode(db, config.col_data, EARLIEST_SESSION_KEY)
|
||||
@@ -205,7 +205,7 @@ pub(crate) fn load_earliest_session(
|
||||
|
||||
/// Load the recent disputes, if any.
|
||||
pub(crate) fn load_recent_disputes(
|
||||
db: &dyn KeyValueDB,
|
||||
db: &dyn Database,
|
||||
config: &ColumnConfiguration,
|
||||
) -> SubsystemResult<Option<RecentDisputes>> {
|
||||
load_decode(db, config.col_data, RECENT_DISPUTES_KEY)
|
||||
@@ -267,7 +267,9 @@ mod tests {
|
||||
use polkadot_primitives::v1::{Hash, Id as ParaId};
|
||||
|
||||
fn make_db() -> DbBackend {
|
||||
let store = Arc::new(kvdb_memorydb::create(1));
|
||||
let db = kvdb_memorydb::create(1);
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
|
||||
let store = Arc::new(db);
|
||||
let config = ColumnConfiguration { col_data: 0 };
|
||||
DbBackend::new(store, config)
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use futures::FutureExt;
|
||||
use kvdb::KeyValueDB;
|
||||
|
||||
use sc_keystore::LocalKeystore;
|
||||
|
||||
@@ -36,7 +35,9 @@ use polkadot_node_subsystem::{
|
||||
messages::DisputeCoordinatorMessage, overseer, ActivatedLeaf, FromOverseer, OverseerSignal,
|
||||
SpawnedSubsystem, SubsystemContext, SubsystemError,
|
||||
};
|
||||
use polkadot_node_subsystem_util::rolling_session_window::RollingSessionWindow;
|
||||
use polkadot_node_subsystem_util::{
|
||||
database::Database, rolling_session_window::RollingSessionWindow,
|
||||
};
|
||||
use polkadot_primitives::v1::{ValidatorIndex, ValidatorPair};
|
||||
|
||||
use crate::{
|
||||
@@ -100,7 +101,7 @@ mod tests;
|
||||
/// An implementation of the dispute coordinator subsystem.
|
||||
pub struct DisputeCoordinatorSubsystem {
|
||||
config: Config,
|
||||
store: Arc<dyn KeyValueDB>,
|
||||
store: Arc<dyn Database>,
|
||||
keystore: Arc<LocalKeystore>,
|
||||
metrics: Metrics,
|
||||
}
|
||||
@@ -139,7 +140,7 @@ where
|
||||
impl DisputeCoordinatorSubsystem {
|
||||
/// Create a new instance of the subsystem.
|
||||
pub fn new(
|
||||
store: Arc<dyn KeyValueDB>,
|
||||
store: Arc<dyn Database>,
|
||||
config: Config,
|
||||
keystore: Arc<LocalKeystore>,
|
||||
metrics: Metrics,
|
||||
|
||||
@@ -29,8 +29,8 @@ use futures::{
|
||||
future::{self, BoxFuture},
|
||||
};
|
||||
|
||||
use kvdb::KeyValueDB;
|
||||
use parity_scale_codec::Encode;
|
||||
use polkadot_node_subsystem_util::database::Database;
|
||||
|
||||
use polkadot_node_primitives::{SignedDisputeStatement, SignedFullStatement, Statement};
|
||||
use polkadot_node_subsystem::{
|
||||
@@ -125,7 +125,7 @@ struct TestState {
|
||||
validator_groups: Vec<Vec<ValidatorIndex>>,
|
||||
master_keystore: Arc<sc_keystore::LocalKeystore>,
|
||||
subsystem_keystore: Arc<sc_keystore::LocalKeystore>,
|
||||
db: Arc<dyn KeyValueDB>,
|
||||
db: Arc<dyn Database>,
|
||||
config: Config,
|
||||
clock: MockClock,
|
||||
headers: HashMap<Hash, Header>,
|
||||
@@ -166,7 +166,9 @@ impl Default for TestState {
|
||||
let subsystem_keystore =
|
||||
make_keystore(vec![Sr25519Keyring::Alice.to_seed()].into_iter()).into();
|
||||
|
||||
let db = Arc::new(kvdb_memorydb::create(1));
|
||||
let db = kvdb_memorydb::create(1);
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
|
||||
let db = Arc::new(db);
|
||||
let config = Config { col_data: 0 };
|
||||
|
||||
TestState {
|
||||
|
||||
@@ -68,6 +68,7 @@ serde = { version = "1.0.136", features = ["derive"] }
|
||||
thiserror = "1.0.30"
|
||||
kvdb = "0.11.0"
|
||||
kvdb-rocksdb = { version = "0.15.1", optional = true }
|
||||
parity-db = { version = "0.3.8", optional = true }
|
||||
async-trait = "0.1.52"
|
||||
lru = "0.7"
|
||||
|
||||
@@ -156,7 +157,8 @@ full-node = [
|
||||
"polkadot-statement-distribution",
|
||||
"polkadot-approval-distribution",
|
||||
"polkadot-node-core-pvf-checker",
|
||||
"kvdb-rocksdb"
|
||||
"kvdb-rocksdb",
|
||||
"parity-db",
|
||||
]
|
||||
|
||||
# Configure the native runtimes to use. Polkadot is enabled by default.
|
||||
|
||||
@@ -856,10 +856,31 @@ where
|
||||
);
|
||||
}
|
||||
|
||||
let parachains_db = crate::parachains_db::open_creating(
|
||||
config.database.path().ok_or(Error::DatabasePathRequired)?.into(),
|
||||
crate::parachains_db::CacheSizes::default(),
|
||||
)?;
|
||||
let parachains_db = match &config.database {
|
||||
DatabaseSource::RocksDb { path, .. } => crate::parachains_db::open_creating_rocksdb(
|
||||
path.clone(),
|
||||
crate::parachains_db::CacheSizes::default(),
|
||||
)?,
|
||||
DatabaseSource::ParityDb { path, .. } => crate::parachains_db::open_creating_paritydb(
|
||||
path.parent().ok_or(Error::DatabasePathRequired)?.into(),
|
||||
crate::parachains_db::CacheSizes::default(),
|
||||
)?,
|
||||
DatabaseSource::Auto { paritydb_path, rocksdb_path, .. } =>
|
||||
if paritydb_path.is_dir() && paritydb_path.exists() {
|
||||
crate::parachains_db::open_creating_paritydb(
|
||||
paritydb_path.parent().ok_or(Error::DatabasePathRequired)?.into(),
|
||||
crate::parachains_db::CacheSizes::default(),
|
||||
)?
|
||||
} else {
|
||||
crate::parachains_db::open_creating_rocksdb(
|
||||
rocksdb_path.clone(),
|
||||
crate::parachains_db::CacheSizes::default(),
|
||||
)?
|
||||
},
|
||||
DatabaseSource::Custom { .. } => {
|
||||
unimplemented!("No polkadot subsystem db for custom source.");
|
||||
},
|
||||
};
|
||||
|
||||
let availability_config = AvailabilityConfig {
|
||||
col_data: crate::parachains_db::REAL_COLUMNS.col_availability_data,
|
||||
|
||||
@@ -79,7 +79,7 @@ where
|
||||
/// Runtime client generic, providing the `ProvieRuntimeApi` trait besides others.
|
||||
pub runtime_client: Arc<RuntimeClient>,
|
||||
/// The underlying key value store for the parachains.
|
||||
pub parachains_db: Arc<dyn kvdb::KeyValueDB>,
|
||||
pub parachains_db: Arc<dyn polkadot_node_subsystem_util::database::Database>,
|
||||
/// Underlying network service implementation.
|
||||
pub network_service: Arc<sc_network::NetworkService<Block, Hash>>,
|
||||
/// Underlying authority discovery service.
|
||||
|
||||
@@ -14,7 +14,9 @@
|
||||
//! A `RocksDB` instance for storing parachain data; availability data, and approvals.
|
||||
|
||||
#[cfg(feature = "full-node")]
|
||||
use {kvdb::KeyValueDB, std::io, std::path::PathBuf, std::sync::Arc};
|
||||
use {
|
||||
polkadot_node_subsystem_util::database::Database, std::io, std::path::PathBuf, std::sync::Arc,
|
||||
};
|
||||
|
||||
#[cfg(feature = "full-node")]
|
||||
mod upgrade;
|
||||
@@ -31,6 +33,7 @@ pub(crate) mod columns {
|
||||
pub const COL_APPROVAL_DATA: u32 = 2;
|
||||
pub const COL_CHAIN_SELECTION_DATA: u32 = 3;
|
||||
pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4;
|
||||
pub const ORDERED_COL: &[u32] = &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA];
|
||||
}
|
||||
|
||||
/// Columns used by different subsystems.
|
||||
@@ -83,7 +86,10 @@ pub(crate) fn other_io_error(err: String) -> io::Error {
|
||||
|
||||
/// Open the database on disk, creating it if it doesn't exist.
|
||||
#[cfg(feature = "full-node")]
|
||||
pub fn open_creating(root: PathBuf, cache_sizes: CacheSizes) -> io::Result<Arc<dyn KeyValueDB>> {
|
||||
pub fn open_creating_rocksdb(
|
||||
root: PathBuf,
|
||||
cache_sizes: CacheSizes,
|
||||
) -> io::Result<Arc<dyn Database>> {
|
||||
use kvdb_rocksdb::{Database, DatabaseConfig};
|
||||
|
||||
let path = root.join("parachains").join("db");
|
||||
@@ -107,6 +113,36 @@ pub fn open_creating(root: PathBuf, cache_sizes: CacheSizes) -> io::Result<Arc<d
|
||||
std::fs::create_dir_all(&path_str)?;
|
||||
upgrade::try_upgrade_db(&path)?;
|
||||
let db = Database::open(&db_config, &path_str)?;
|
||||
let db =
|
||||
polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, columns::ORDERED_COL);
|
||||
|
||||
Ok(Arc::new(db))
|
||||
}
|
||||
|
||||
/// Open a parity db database.
|
||||
#[cfg(feature = "full-node")]
|
||||
pub fn open_creating_paritydb(
|
||||
root: PathBuf,
|
||||
_cache_sizes: CacheSizes,
|
||||
) -> io::Result<Arc<dyn Database>> {
|
||||
let path = root.join("parachains");
|
||||
let path_str = path
|
||||
.to_str()
|
||||
.ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?;
|
||||
|
||||
std::fs::create_dir_all(&path_str)?;
|
||||
|
||||
let mut options = parity_db::Options::with_columns(&path, columns::NUM_COLUMNS as u8);
|
||||
for i in columns::ORDERED_COL {
|
||||
options.columns[*i as usize].btree_index = true;
|
||||
}
|
||||
|
||||
let db = parity_db::Db::open_or_create(&options)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;
|
||||
|
||||
let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new(
|
||||
db,
|
||||
columns::ORDERED_COL,
|
||||
);
|
||||
Ok(Arc::new(db))
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ async-trait = "0.1.52"
|
||||
futures = "0.3.21"
|
||||
itertools = "0.10"
|
||||
parity-scale-codec = { version = "3.0.0", default-features = false, features = ["derive"] }
|
||||
parking_lot = "0.11.2"
|
||||
pin-project = "1.0.9"
|
||||
rand = "0.8.5"
|
||||
thiserror = "1.0.30"
|
||||
@@ -31,6 +32,10 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
kvdb = "0.11.0"
|
||||
parity-util-mem = { version = "0.11", default-features = false }
|
||||
parity-db = { version = "0.3.8" }
|
||||
|
||||
[dev-dependencies]
|
||||
assert_matches = "1.4.0"
|
||||
env_logger = "0.9.0"
|
||||
@@ -39,3 +44,5 @@ log = "0.4.13"
|
||||
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
|
||||
lazy_static = "1.4.0"
|
||||
polkadot-primitives-test-helpers = { path = "../../primitives/test-helpers" }
|
||||
kvdb-shared-tests = "0.9.0"
|
||||
tempfile = "3.1.0"
|
||||
|
||||
@@ -0,0 +1,339 @@
|
||||
// Copyright 2021-2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Database trait for polkadot db.
|
||||
|
||||
pub use kvdb::{DBTransaction, DBValue, KeyValueDB};
|
||||
|
||||
/// Database trait with ordered key capacity.
|
||||
pub trait Database: KeyValueDB {
|
||||
/// Check if column allows content iteration
|
||||
/// and removal by prefix.
|
||||
fn is_indexed_column(&self, col: u32) -> bool;
|
||||
}
|
||||
|
||||
/// Implementation for database supporting `KeyValueDB` already.
|
||||
pub mod kvdb_impl {
|
||||
use super::{DBTransaction, DBValue, Database, KeyValueDB};
|
||||
use kvdb::{DBOp, IoStats, IoStatsKind};
|
||||
use parity_util_mem::{MallocSizeOf, MallocSizeOfOps};
|
||||
use std::{collections::BTreeSet, io::Result};
|
||||
|
||||
/// Adapter implementing subsystem database
|
||||
/// for `KeyValueDB`.
|
||||
#[derive(Clone)]
|
||||
pub struct DbAdapter<D> {
|
||||
db: D,
|
||||
indexed_columns: BTreeSet<u32>,
|
||||
}
|
||||
|
||||
impl<D: KeyValueDB> DbAdapter<D> {
|
||||
/// Instantiate new subsystem database, with
|
||||
/// the columns that allow ordered iteration.
|
||||
pub fn new(db: D, indexed_columns: &[u32]) -> Self {
|
||||
DbAdapter { db, indexed_columns: indexed_columns.iter().cloned().collect() }
|
||||
}
|
||||
|
||||
fn ensure_is_indexed(&self, col: u32) {
|
||||
debug_assert!(
|
||||
self.is_indexed_column(col),
|
||||
"Invalid configuration of database, column {} is not ordered.",
|
||||
col
|
||||
);
|
||||
}
|
||||
|
||||
fn ensure_ops_indexing(&self, transaction: &DBTransaction) {
|
||||
debug_assert!({
|
||||
let mut pass = true;
|
||||
for op in &transaction.ops {
|
||||
if let DBOp::DeletePrefix { col, .. } = op {
|
||||
if !self.is_indexed_column(*col) {
|
||||
pass = false;
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
pass
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: KeyValueDB> Database for DbAdapter<D> {
|
||||
fn is_indexed_column(&self, col: u32) -> bool {
|
||||
self.indexed_columns.contains(&col)
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: KeyValueDB> KeyValueDB for DbAdapter<D> {
|
||||
fn transaction(&self) -> DBTransaction {
|
||||
self.db.transaction()
|
||||
}
|
||||
|
||||
fn get(&self, col: u32, key: &[u8]) -> Result<Option<DBValue>> {
|
||||
self.db.get(col, key)
|
||||
}
|
||||
|
||||
fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
|
||||
self.ensure_is_indexed(col);
|
||||
self.db.get_by_prefix(col, prefix)
|
||||
}
|
||||
|
||||
fn write(&self, transaction: DBTransaction) -> Result<()> {
|
||||
self.ensure_ops_indexing(&transaction);
|
||||
self.db.write(transaction)
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
|
||||
self.ensure_is_indexed(col);
|
||||
self.db.iter(col)
|
||||
}
|
||||
|
||||
fn iter_with_prefix<'a>(
|
||||
&'a self,
|
||||
col: u32,
|
||||
prefix: &'a [u8],
|
||||
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
|
||||
self.ensure_is_indexed(col);
|
||||
self.db.iter_with_prefix(col, prefix)
|
||||
}
|
||||
|
||||
fn restore(&self, _new_db: &str) -> Result<()> {
|
||||
unimplemented!("restore is unsupported")
|
||||
}
|
||||
|
||||
fn io_stats(&self, kind: IoStatsKind) -> IoStats {
|
||||
self.db.io_stats(kind)
|
||||
}
|
||||
|
||||
fn has_key(&self, col: u32, key: &[u8]) -> Result<bool> {
|
||||
self.db.has_key(col, key)
|
||||
}
|
||||
|
||||
fn has_prefix(&self, col: u32, prefix: &[u8]) -> bool {
|
||||
self.ensure_is_indexed(col);
|
||||
self.db.has_prefix(col, prefix)
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: KeyValueDB> MallocSizeOf for DbAdapter<D> {
|
||||
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
|
||||
// ignore filter set
|
||||
self.db.size_of(ops)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Utilities for using parity-db database.
|
||||
pub mod paritydb_impl {
|
||||
use super::{DBTransaction, DBValue, Database, KeyValueDB};
|
||||
use kvdb::{DBOp, IoStats, IoStatsKind};
|
||||
use parity_db::Db;
|
||||
use parking_lot::Mutex;
|
||||
use std::{collections::BTreeSet, io::Result, sync::Arc};
|
||||
|
||||
fn handle_err<T>(result: parity_db::Result<T>) -> T {
|
||||
match result {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
panic!("Critical database error: {:?}", e);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn map_err<T>(result: parity_db::Result<T>) -> Result<T> {
|
||||
result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{:?}", e)))
|
||||
}
|
||||
|
||||
/// Implementation of of `Database` for parity-db adapter.
|
||||
pub struct DbAdapter {
|
||||
db: Db,
|
||||
indexed_columns: BTreeSet<u32>,
|
||||
write_lock: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
impl parity_util_mem::MallocSizeOf for DbAdapter {
|
||||
fn size_of(&self, _ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
|
||||
unimplemented!("size_of is not supported for parity_db")
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyValueDB for DbAdapter {
|
||||
fn transaction(&self) -> DBTransaction {
|
||||
DBTransaction::new()
|
||||
}
|
||||
|
||||
fn get(&self, col: u32, key: &[u8]) -> Result<Option<DBValue>> {
|
||||
map_err(self.db.get(col as u8, key))
|
||||
}
|
||||
|
||||
fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
|
||||
self.iter_with_prefix(col, prefix).next().map(|(_, v)| v)
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
|
||||
let mut iter = handle_err(self.db.iter(col as u8));
|
||||
Box::new(std::iter::from_fn(move || {
|
||||
if let Some((key, value)) = handle_err(iter.next()) {
|
||||
Some((key.into_boxed_slice(), value.into_boxed_slice()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn iter_with_prefix<'a>(
|
||||
&'a self,
|
||||
col: u32,
|
||||
prefix: &'a [u8],
|
||||
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
|
||||
if prefix.len() == 0 {
|
||||
return self.iter(col)
|
||||
}
|
||||
let mut iter = handle_err(self.db.iter(col as u8));
|
||||
handle_err(iter.seek(prefix));
|
||||
Box::new(std::iter::from_fn(move || {
|
||||
if let Some((key, value)) = handle_err(iter.next()) {
|
||||
key.starts_with(prefix)
|
||||
.then(|| (key.into_boxed_slice(), value.into_boxed_slice()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn restore(&self, _new_db: &str) -> Result<()> {
|
||||
unimplemented!("restore is unsupported")
|
||||
}
|
||||
|
||||
fn io_stats(&self, _kind: IoStatsKind) -> IoStats {
|
||||
unimplemented!("io_stats not supported by parity_db");
|
||||
}
|
||||
|
||||
fn has_key(&self, col: u32, key: &[u8]) -> Result<bool> {
|
||||
map_err(self.db.get_size(col as u8, key).map(|r| r.is_some()))
|
||||
}
|
||||
|
||||
fn has_prefix(&self, col: u32, prefix: &[u8]) -> bool {
|
||||
self.get_by_prefix(col, prefix).is_some()
|
||||
}
|
||||
|
||||
fn write(&self, transaction: DBTransaction) -> std::io::Result<()> {
|
||||
let mut ops = transaction.ops.into_iter();
|
||||
// TODO using a key iterator or native delete here would be faster.
|
||||
let mut current_prefix_iter: Option<(parity_db::BTreeIterator, u8, Vec<u8>)> = None;
|
||||
let current_prefix_iter = &mut current_prefix_iter;
|
||||
let transaction = std::iter::from_fn(move || loop {
|
||||
if let Some((prefix_iter, col, prefix)) = current_prefix_iter {
|
||||
if let Some((key, _value)) = handle_err(prefix_iter.next()) {
|
||||
if key.starts_with(prefix) {
|
||||
return Some((*col, key.to_vec(), None))
|
||||
}
|
||||
}
|
||||
*current_prefix_iter = None;
|
||||
}
|
||||
return match ops.next() {
|
||||
None => None,
|
||||
Some(DBOp::Insert { col, key, value }) =>
|
||||
Some((col as u8, key.to_vec(), Some(value))),
|
||||
Some(DBOp::Delete { col, key }) => Some((col as u8, key.to_vec(), None)),
|
||||
Some(DBOp::DeletePrefix { col, prefix }) => {
|
||||
let col = col as u8;
|
||||
let mut iter = handle_err(self.db.iter(col));
|
||||
handle_err(iter.seek(&prefix[..]));
|
||||
*current_prefix_iter = Some((iter, col, prefix.to_vec()));
|
||||
continue
|
||||
},
|
||||
}
|
||||
});
|
||||
|
||||
// Locking is required due to possible racy change of the content of a deleted prefix.
|
||||
let _lock = self.write_lock.lock();
|
||||
map_err(self.db.commit(transaction))
|
||||
}
|
||||
}
|
||||
|
||||
impl Database for DbAdapter {
|
||||
fn is_indexed_column(&self, col: u32) -> bool {
|
||||
self.indexed_columns.contains(&col)
|
||||
}
|
||||
}
|
||||
|
||||
impl DbAdapter {
|
||||
/// Implementation of of `Database` for parity-db adapter.
|
||||
pub fn new(db: Db, indexed_columns: &[u32]) -> Self {
|
||||
let write_lock = Arc::new(Mutex::new(()));
|
||||
DbAdapter { db, indexed_columns: indexed_columns.iter().cloned().collect(), write_lock }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use kvdb_shared_tests as st;
|
||||
use std::io;
|
||||
use tempfile::Builder as TempfileBuilder;
|
||||
|
||||
fn create(num_col: u32) -> io::Result<(DbAdapter, tempfile::TempDir)> {
|
||||
let tempdir = TempfileBuilder::new().prefix("").tempdir()?;
|
||||
let mut options = parity_db::Options::with_columns(tempdir.path(), num_col as u8);
|
||||
for i in 0..num_col {
|
||||
options.columns[i as usize].btree_index = true;
|
||||
}
|
||||
|
||||
let db = parity_db::Db::open_or_create(&options)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;
|
||||
|
||||
let db = DbAdapter::new(db, &[0]);
|
||||
Ok((db, tempdir))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn put_and_get() -> io::Result<()> {
|
||||
let (db, _temp_file) = create(1)?;
|
||||
st::test_put_and_get(&db)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delete_and_get() -> io::Result<()> {
|
||||
let (db, _temp_file) = create(1)?;
|
||||
st::test_delete_and_get(&db)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delete_prefix() -> io::Result<()> {
|
||||
let (db, _temp_file) = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
|
||||
st::test_delete_prefix(&db)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter() -> io::Result<()> {
|
||||
let (db, _temp_file) = create(1)?;
|
||||
st::test_iter(&db)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter_with_prefix() -> io::Result<()> {
|
||||
let (db, _temp_file) = create(1)?;
|
||||
st::test_iter_with_prefix(&db)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn complex() -> io::Result<()> {
|
||||
let (db, _temp_file) = create(1)?;
|
||||
st::test_complex(&db)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -87,6 +87,9 @@ pub mod rolling_session_window;
|
||||
/// Convenient and efficient runtime info access.
|
||||
pub mod runtime;
|
||||
|
||||
/// Database trait for subsystem.
|
||||
pub mod database;
|
||||
|
||||
mod determine_new_blocks;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -22,7 +22,7 @@ pub mod common;
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg(unix)]
|
||||
async fn purge_chain_works() {
|
||||
async fn purge_chain_rocksdb_works() {
|
||||
use nix::{
|
||||
sys::signal::{kill, Signal::SIGINT},
|
||||
unistd::Pid,
|
||||
@@ -33,6 +33,8 @@ async fn purge_chain_works() {
|
||||
let mut cmd = Command::new(cargo_bin("polkadot"))
|
||||
.args(&["--dev", "-d"])
|
||||
.arg(tmpdir.path())
|
||||
.arg("--port")
|
||||
.arg("33034")
|
||||
.spawn()
|
||||
.unwrap();
|
||||
|
||||
@@ -43,6 +45,9 @@ async fn purge_chain_works() {
|
||||
kill(Pid::from_raw(cmd.id().try_into().unwrap()), SIGINT).unwrap();
|
||||
// Wait for the node to handle it and exit.
|
||||
assert!(common::wait_for(&mut cmd, 30).map(|x| x.success()).unwrap_or_default());
|
||||
assert!(tmpdir.path().join("chains/dev").exists());
|
||||
assert!(tmpdir.path().join("chains/dev/db/full").exists());
|
||||
assert!(tmpdir.path().join("chains/dev/db/full/parachains").exists());
|
||||
|
||||
// Purge chain
|
||||
let status = Command::new(cargo_bin("polkadot"))
|
||||
@@ -57,3 +62,50 @@ async fn purge_chain_works() {
|
||||
assert!(tmpdir.path().join("chains/dev").exists());
|
||||
assert!(!tmpdir.path().join("chains/dev/db/full").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg(unix)]
|
||||
async fn purge_chain_paritydb_works() {
|
||||
use nix::{
|
||||
sys::signal::{kill, Signal::SIGINT},
|
||||
unistd::Pid,
|
||||
};
|
||||
|
||||
let tmpdir = tempdir().expect("could not create temp dir");
|
||||
|
||||
let mut cmd = Command::new(cargo_bin("polkadot"))
|
||||
.args(&["--dev", "-d"])
|
||||
.arg(tmpdir.path())
|
||||
.arg("--database")
|
||||
.arg("paritydb-experimental")
|
||||
.spawn()
|
||||
.unwrap();
|
||||
|
||||
// Let it produce 1 block.
|
||||
common::wait_n_finalized_blocks(1, Duration::from_secs(60)).await.unwrap();
|
||||
|
||||
// Send SIGINT to node.
|
||||
kill(Pid::from_raw(cmd.id().try_into().unwrap()), SIGINT).unwrap();
|
||||
// Wait for the node to handle it and exit.
|
||||
assert!(common::wait_for(&mut cmd, 30).map(|x| x.success()).unwrap_or_default());
|
||||
assert!(tmpdir.path().join("chains/dev").exists());
|
||||
assert!(tmpdir.path().join("chains/dev/paritydb/full").exists());
|
||||
assert!(tmpdir.path().join("chains/dev/paritydb/parachains").exists());
|
||||
|
||||
// Purge chain
|
||||
let status = Command::new(cargo_bin("polkadot"))
|
||||
.args(&["purge-chain", "--dev", "-d"])
|
||||
.arg(tmpdir.path())
|
||||
.arg("--database")
|
||||
.arg("paritydb-experimental")
|
||||
.arg("-y")
|
||||
.status()
|
||||
.unwrap();
|
||||
assert!(status.success());
|
||||
|
||||
// Make sure that the chain folder exists, but `db/full` is deleted.
|
||||
assert!(tmpdir.path().join("chains/dev").exists());
|
||||
assert!(!tmpdir.path().join("chains/dev/paritydb/full").exists());
|
||||
// Parachains removal requires calling "purge-chain --parachains".
|
||||
assert!(tmpdir.path().join("chains/dev/paritydb/parachains").exists());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user