Parachains db column "migration" (#5797)

* Column migration for parityDB

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fmt

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Remove columns

* warn

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* bump paritydb

* use clear_column

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* logs

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* paritydb 0.3.16

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Update node/service/Cargo.toml

Co-authored-by: Andronik <write@reusable.software>

* ParityDB versioning

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* cargo lock

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* review + proper version constants

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Add test

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
Andrei Sandu
2022-07-27 12:34:27 +03:00
committed by GitHub
parent c2df8308d7
commit 5221fd667e
4 changed files with 205 additions and 40 deletions
+3 -2
View File
@@ -5660,9 +5660,9 @@ dependencies = [
[[package]] [[package]]
name = "parity-db" name = "parity-db"
version = "0.3.13" version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55a7901b85874402471e131de3332dde0e51f38432c69a3853627c8e25433048" checksum = "2bb474d0ed0836e185cb998a6b140ed1073d1fbf27d690ecf9ede8030289382c"
dependencies = [ dependencies = [
"blake2-rfc", "blake2-rfc",
"crc32fast", "crc32fast",
@@ -7228,6 +7228,7 @@ dependencies = [
"sp-transaction-pool", "sp-transaction-pool",
"sp-trie", "sp-trie",
"substrate-prometheus-endpoint", "substrate-prometheus-endpoint",
"tempfile",
"thiserror", "thiserror",
"tracing-gum", "tracing-gum",
"westend-runtime", "westend-runtime",
+2 -1
View File
@@ -69,7 +69,7 @@ serde_json = "1.0.81"
thiserror = "1.0.31" thiserror = "1.0.31"
kvdb = "0.11.0" kvdb = "0.11.0"
kvdb-rocksdb = { version = "0.15.2", optional = true } kvdb-rocksdb = { version = "0.15.2", optional = true }
parity-db = { version = "0.3.13", optional = true } parity-db = { version = "0.3.16", optional = true }
async-trait = "0.1.53" async-trait = "0.1.53"
lru = "0.7" lru = "0.7"
@@ -128,6 +128,7 @@ polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
env_logger = "0.9.0" env_logger = "0.9.0"
log = "0.4.17" log = "0.4.17"
assert_matches = "1.5.0" assert_matches = "1.5.0"
tempfile = "3.2"
[features] [features]
default = ["db", "full-node", "polkadot-native"] default = ["db", "full-node", "polkadot-native"]
+28 -19
View File
@@ -21,11 +21,15 @@ use {
#[cfg(feature = "full-node")] #[cfg(feature = "full-node")]
mod upgrade; mod upgrade;
const LOG_TARGET: &str = "parachain::db";
#[cfg(any(test, feature = "full-node"))] #[cfg(any(test, feature = "full-node"))]
pub(crate) mod columns { pub(crate) mod columns {
pub mod v0 { pub mod v0 {
pub const NUM_COLUMNS: u32 = 3; pub const NUM_COLUMNS: u32 = 3;
} }
pub mod v1 {
pub const NUM_COLUMNS: u32 = 5; pub const NUM_COLUMNS: u32 = 5;
pub const COL_AVAILABILITY_DATA: u32 = 0; pub const COL_AVAILABILITY_DATA: u32 = 0;
@@ -35,6 +39,7 @@ pub(crate) mod columns {
pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4; pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4;
pub const ORDERED_COL: &[u32] = pub const ORDERED_COL: &[u32] =
&[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA]; &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA];
}
} }
/// Columns used by different subsystems. /// Columns used by different subsystems.
@@ -56,13 +61,19 @@ pub struct ColumnsConfig {
/// The real columns used by the parachains DB. /// The real columns used by the parachains DB.
#[cfg(any(test, feature = "full-node"))] #[cfg(any(test, feature = "full-node"))]
pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig { pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig {
col_availability_data: columns::COL_AVAILABILITY_DATA, col_availability_data: columns::v1::COL_AVAILABILITY_DATA,
col_availability_meta: columns::COL_AVAILABILITY_META, col_availability_meta: columns::v1::COL_AVAILABILITY_META,
col_approval_data: columns::COL_APPROVAL_DATA, col_approval_data: columns::v1::COL_APPROVAL_DATA,
col_chain_selection_data: columns::COL_CHAIN_SELECTION_DATA, col_chain_selection_data: columns::v1::COL_CHAIN_SELECTION_DATA,
col_dispute_coordinator_data: columns::COL_DISPUTE_COORDINATOR_DATA, col_dispute_coordinator_data: columns::v1::COL_DISPUTE_COORDINATOR_DATA,
}; };
#[derive(PartialEq)]
pub(crate) enum DatabaseKind {
ParityDB,
RocksDB,
}
/// The cache size for each column, in megabytes. /// The cache size for each column, in megabytes.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CacheSizes { pub struct CacheSizes {
@@ -95,27 +106,29 @@ pub fn open_creating_rocksdb(
let path = root.join("parachains").join("db"); let path = root.join("parachains").join("db");
let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS); let mut db_config = DatabaseConfig::with_columns(columns::v1::NUM_COLUMNS);
let _ = db_config let _ = db_config
.memory_budget .memory_budget
.insert(columns::COL_AVAILABILITY_DATA, cache_sizes.availability_data); .insert(columns::v1::COL_AVAILABILITY_DATA, cache_sizes.availability_data);
let _ = db_config let _ = db_config
.memory_budget .memory_budget
.insert(columns::COL_AVAILABILITY_META, cache_sizes.availability_meta); .insert(columns::v1::COL_AVAILABILITY_META, cache_sizes.availability_meta);
let _ = db_config let _ = db_config
.memory_budget .memory_budget
.insert(columns::COL_APPROVAL_DATA, cache_sizes.approval_data); .insert(columns::v1::COL_APPROVAL_DATA, cache_sizes.approval_data);
let path_str = path let path_str = path
.to_str() .to_str()
.ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?; .ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?;
std::fs::create_dir_all(&path_str)?; std::fs::create_dir_all(&path_str)?;
upgrade::try_upgrade_db(&path)?; upgrade::try_upgrade_db(&path, DatabaseKind::RocksDB)?;
let db = Database::open(&db_config, &path_str)?; let db = Database::open(&db_config, &path_str)?;
let db = let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(
polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, columns::ORDERED_COL); db,
columns::v1::ORDERED_COL,
);
Ok(Arc::new(db)) Ok(Arc::new(db))
} }
@@ -132,18 +145,14 @@ pub fn open_creating_paritydb(
.ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?; .ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?;
std::fs::create_dir_all(&path_str)?; std::fs::create_dir_all(&path_str)?;
upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB)?;
let mut options = parity_db::Options::with_columns(&path, columns::NUM_COLUMNS as u8); let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_1_config(&path))
for i in columns::ORDERED_COL {
options.columns[*i as usize].btree_index = true;
}
let db = parity_db::Db::open_or_create(&options)
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;
let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new( let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new(
db, db,
columns::ORDERED_COL, columns::v1::ORDERED_COL,
); );
Ok(Arc::new(db)) Ok(Arc::new(db))
} }
@@ -15,6 +15,7 @@
#![cfg(feature = "full-node")] #![cfg(feature = "full-node")]
use super::{columns, other_io_error, DatabaseKind, LOG_TARGET};
use std::{ use std::{
fs, io, fs, io,
path::{Path, PathBuf}, path::{Path, PathBuf},
@@ -49,13 +50,23 @@ impl From<Error> for io::Error {
} }
/// Try upgrading parachain's database to the current version. /// Try upgrading parachain's database to the current version.
pub fn try_upgrade_db(db_path: &Path) -> Result<(), Error> { pub(crate) fn try_upgrade_db(db_path: &Path, db_kind: DatabaseKind) -> Result<(), Error> {
let is_empty = db_path.read_dir().map_or(true, |mut d| d.next().is_none()); let is_empty = db_path.read_dir().map_or(true, |mut d| d.next().is_none());
if !is_empty { if !is_empty {
match current_version(db_path)? { match get_db_version(db_path)? {
0 => migrate_from_version_0_to_1(db_path)?, // 0 -> 1 migration
CURRENT_VERSION => (), Some(0) => migrate_from_version_0_to_1(db_path, db_kind)?,
v => return Err(Error::FutureVersion { current: CURRENT_VERSION, got: v }), // Already at current version, do nothing.
Some(CURRENT_VERSION) => (),
// This is an arbitrary future version, we don't handle it.
Some(v) => return Err(Error::FutureVersion { current: CURRENT_VERSION, got: v }),
// No version file. For `RocksDB` we dont need to do anything.
None if db_kind == DatabaseKind::RocksDB => (),
// No version file. `ParityDB` did not previously have a version defined.
// We handle this as a `0 -> 1` migration.
None if db_kind == DatabaseKind::ParityDB =>
migrate_from_version_0_to_1(db_path, db_kind)?,
None => unreachable!(),
} }
} }
@@ -63,12 +74,14 @@ pub fn try_upgrade_db(db_path: &Path) -> Result<(), Error> {
} }
/// Reads current database version from the file at given path. /// Reads current database version from the file at given path.
/// If the file does not exist, assumes the current version. /// If the file does not exist returns `None`, otherwise the version stored in the file.
fn current_version(path: &Path) -> Result<Version, Error> { fn get_db_version(path: &Path) -> Result<Option<Version>, Error> {
match fs::read_to_string(version_file_path(path)) { match fs::read_to_string(version_file_path(path)) {
Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(CURRENT_VERSION), Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()), Err(err) => Err(err.into()),
Ok(content) => u32::from_str(&content).map_err(|_| Error::CorruptedVersionFile), Ok(content) => u32::from_str(&content)
.map(|v| Some(v))
.map_err(|_| Error::CorruptedVersionFile),
} }
} }
@@ -86,9 +99,22 @@ fn version_file_path(path: &Path) -> PathBuf {
file_path file_path
} }
fn migrate_from_version_0_to_1(path: &Path, db_kind: DatabaseKind) -> Result<(), Error> {
gum::info!(target: LOG_TARGET, "Migrating parachains db from version 0 to version 1 ...");
match db_kind {
DatabaseKind::ParityDB => paritydb_migrate_from_version_0_to_1(path),
DatabaseKind::RocksDB => rocksdb_migrate_from_version_0_to_1(path),
}
.and_then(|result| {
gum::info!(target: LOG_TARGET, "Migration complete! ");
Ok(result)
})
}
/// Migration from version 0 to version 1: /// Migration from version 0 to version 1:
/// * the number of columns has changed from 3 to 5; /// * the number of columns has changed from 3 to 5;
fn migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb_rocksdb::{Database, DatabaseConfig};
let db_path = path let db_path = path
@@ -102,3 +128,131 @@ fn migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
Ok(()) Ok(())
} }
// This currently clears columns which had their configs altered between versions.
// The columns to be changed are constrained by the `allowed_columns` vector.
fn paritydb_fix_columns(
path: &Path,
options: parity_db::Options,
allowed_columns: Vec<u32>,
) -> io::Result<()> {
// Figure out which columns to delete. This will be determined by inspecting
// the metadata file.
if let Some(metadata) = parity_db::Options::load_metadata(&path)
.map_err(|e| other_io_error(format!("Error reading metadata {:?}", e)))?
{
let columns_to_clear = metadata
.columns
.into_iter()
.enumerate()
.filter(|(idx, _)| allowed_columns.contains(&(*idx as u32)))
.filter_map(|(idx, opts)| {
let changed = opts != options.columns[idx];
if changed {
gum::debug!(
target: LOG_TARGET,
"Column {} will be cleared. Old options: {:?}, New options: {:?}",
idx,
opts,
options.columns[idx]
);
Some(idx)
} else {
None
}
})
.collect::<Vec<_>>();
if columns_to_clear.len() > 0 {
gum::debug!(
target: LOG_TARGET,
"Database column changes detected, need to cleanup {} columns.",
columns_to_clear.len()
);
}
for column in columns_to_clear {
gum::debug!(target: LOG_TARGET, "Clearing column {}", column,);
parity_db::clear_column(path, column.try_into().expect("Invalid column ID"))
.map_err(|e| other_io_error(format!("Error clearing column {:?}", e)))?;
}
// Write the updated column options.
options
.write_metadata(path, &metadata.salt)
.map_err(|e| other_io_error(format!("Error writing metadata {:?}", e)))?;
}
Ok(())
}
/// Database configuration for version 1.
pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options {
let mut options =
parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
for i in columns::v1::ORDERED_COL {
options.columns[*i as usize].btree_index = true;
}
options
}
/// Database configuration for version 0. This is useful just for testing.
#[cfg(test)]
pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options {
let mut options =
parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
options.columns[super::columns::v1::COL_AVAILABILITY_META as usize].btree_index = true;
options.columns[super::columns::v1::COL_CHAIN_SELECTION_DATA as usize].btree_index = true;
options
}
/// Migration from version 0 to version 1.
/// Cases covered:
/// - upgrading from v0.9.23 or earlier -> the `dispute coordinator column` was changed
/// - upgrading from v0.9.24+ -> this is a no op assuming the DB has been manually fixed as per
/// release notes
fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
// Delete the `dispute coordinator` column if needed (if column configuration is changed).
paritydb_fix_columns(
path,
paritydb_version_1_config(path),
vec![super::columns::v1::COL_DISPUTE_COORDINATOR_DATA],
)?;
Ok(())
}
#[cfg(test)]
mod tests {
#[test]
fn test_paritydb_migrate_0_1() {
use super::{columns::v1::*, *};
use parity_db::Db;
let db_dir = tempfile::tempdir().unwrap();
let path = db_dir.path();
{
let db = Db::open_or_create(&paritydb_version_0_config(&path)).unwrap();
db.commit(vec![
(COL_DISPUTE_COORDINATOR_DATA as u8, b"1234".to_vec(), Some(b"somevalue".to_vec())),
(COL_AVAILABILITY_META as u8, b"5678".to_vec(), Some(b"somevalue".to_vec())),
])
.unwrap();
}
try_upgrade_db(&path, DatabaseKind::ParityDB).unwrap();
let db = Db::open(&paritydb_version_1_config(&path)).unwrap();
assert_eq!(
db.get(super::columns::v1::COL_DISPUTE_COORDINATOR_DATA as u8, b"1234").unwrap(),
None
);
assert_eq!(
db.get(super::columns::v1::COL_AVAILABILITY_META as u8, b"5678").unwrap(),
Some("somevalue".as_bytes().to_vec())
);
}
}