DatabaseSource::Auto (#9500)

* implement "auto" database backend in client/db, in progress, #9201

* move fn supports_ref_counting from DatabaseSource enum to Database trait to make it work correctly for all types of dbs

* update kvdb_rocksdb to 0.13 and use it's new config feature  to properly auto start existing database

* tests for auto database reopening

* introduce OpenDbError to cleanup opening database error handling and handle case when database is not enabled at the compile time

* cargo fmt strings again

* cargo fmt strings again

* rename DataSettingsSrc to fix test compilation

* fix the call to the new kvdb-rocksdb interdace in tests to fix compilation

* simplify OpenDbError and make it compile even when paritydb and rocksdb are disabled

* cargo fmt

* fix compilation without flag with-parity-db

* fix unused var compilation warning

* support different paths for rocksdb and paritydb in DatabaseSouce::Auto

* support "auto" database option in substrate cli

* enable Lz4 compression for some of the parity-db colums as per review suggestion

* applied review suggestions
This commit is contained in:
Marek Kotewicz
2021-08-09 15:22:28 +02:00
committed by GitHub
parent 4dc8db2b80
commit a2f7524138
21 changed files with 549 additions and 219 deletions
+76 -64
View File
@@ -19,8 +19,8 @@
//! Database upgrade logic.
use std::{
fs,
io::{ErrorKind, Read, Write},
fmt, fs,
io::{self, ErrorKind, Read, Write},
path::{Path, PathBuf},
};
@@ -39,61 +39,79 @@ const CURRENT_VERSION: u32 = 3;
const V1_NUM_COLUMNS: u32 = 11;
const V2_NUM_COLUMNS: u32 = 12;
/// Upgrade database to current version.
pub fn upgrade_db<Block: BlockT>(
db_path: &Path,
db_type: DatabaseType,
) -> sp_blockchain::Result<()> {
let is_empty = db_path.read_dir().map_or(true, |mut d| d.next().is_none());
if !is_empty {
let db_version = current_version(db_path)?;
match db_version {
0 => Err(sp_blockchain::Error::Backend(format!(
"Unsupported database version: {}",
db_version
)))?,
1 => {
migrate_1_to_2::<Block>(db_path, db_type)?;
migrate_2_to_3::<Block>(db_path, db_type)?
},
2 => migrate_2_to_3::<Block>(db_path, db_type)?,
CURRENT_VERSION => (),
_ => Err(sp_blockchain::Error::Backend(format!(
"Future database version: {}",
db_version
)))?,
/// Database upgrade errors.
#[derive(Debug)]
pub enum UpgradeError {
/// Database version cannot be read from existing db_version file.
UnknownDatabaseVersion,
/// Missing database version file.
MissingDatabaseVersionFile,
/// Database version no longer supported.
UnsupportedVersion(u32),
/// Database version comes from future version of the client.
FutureDatabaseVersion(u32),
/// Invalid justification block.
DecodingJustificationBlock,
/// Common io error.
Io(io::Error),
}
pub type UpgradeResult<T> = Result<T, UpgradeError>;
impl From<io::Error> for UpgradeError {
fn from(err: io::Error) -> Self {
UpgradeError::Io(err)
}
}
impl fmt::Display for UpgradeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
UpgradeError::UnknownDatabaseVersion =>
write!(f, "Database version cannot be read from exisiting db_version file"),
UpgradeError::MissingDatabaseVersionFile => write!(f, "Missing database version file"),
UpgradeError::UnsupportedVersion(version) =>
write!(f, "Database version no longer supported: {}", version),
UpgradeError::FutureDatabaseVersion(version) =>
write!(f, "Database version comes from future version of the client: {}", version),
UpgradeError::DecodingJustificationBlock =>
write!(f, "Decodoning justification block failed"),
UpgradeError::Io(err) => write!(f, "Io error: {}", err),
}
}
}
update_version(db_path)
/// Upgrade database to current version.
pub fn upgrade_db<Block: BlockT>(db_path: &Path, db_type: DatabaseType) -> UpgradeResult<()> {
let db_version = current_version(db_path)?;
match db_version {
0 => return Err(UpgradeError::UnsupportedVersion(db_version)),
1 => {
migrate_1_to_2::<Block>(db_path, db_type)?;
migrate_2_to_3::<Block>(db_path, db_type)?
},
2 => migrate_2_to_3::<Block>(db_path, db_type)?,
CURRENT_VERSION => (),
_ => return Err(UpgradeError::FutureDatabaseVersion(db_version)),
}
update_version(db_path)?;
Ok(())
}
/// Migration from version1 to version2:
/// 1) the number of columns has changed from 11 to 12;
/// 2) transactions column is added;
fn migrate_1_to_2<Block: BlockT>(
db_path: &Path,
_db_type: DatabaseType,
) -> sp_blockchain::Result<()> {
let db_path = db_path
.to_str()
.ok_or_else(|| sp_blockchain::Error::Backend("Invalid database path".into()))?;
fn migrate_1_to_2<Block: BlockT>(db_path: &Path, _db_type: DatabaseType) -> UpgradeResult<()> {
let db_cfg = DatabaseConfig::with_columns(V1_NUM_COLUMNS);
let db = Database::open(&db_cfg, db_path).map_err(db_err)?;
db.add_column().map_err(db_err)
let db = Database::open(&db_cfg, db_path)?;
db.add_column().map_err(Into::into)
}
/// Migration from version2 to version3:
/// - The format of the stored Justification changed to support multiple Justifications.
fn migrate_2_to_3<Block: BlockT>(
db_path: &Path,
_db_type: DatabaseType,
) -> sp_blockchain::Result<()> {
let db_path = db_path
.to_str()
.ok_or_else(|| sp_blockchain::Error::Backend("Invalid database path".into()))?;
fn migrate_2_to_3<Block: BlockT>(db_path: &Path, _db_type: DatabaseType) -> UpgradeResult<()> {
let db_cfg = DatabaseConfig::with_columns(V2_NUM_COLUMNS);
let db = Database::open(&db_cfg, db_path).map_err(db_err)?;
let db = Database::open(&db_cfg, db_path)?;
// Get all the keys we need to update
let keys: Vec<_> = db.iter(columns::JUSTIFICATIONS).map(|entry| entry.0).collect();
@@ -101,49 +119,43 @@ fn migrate_2_to_3<Block: BlockT>(
// Read and update each entry
let mut transaction = db.transaction();
for key in keys {
if let Some(justification) = db.get(columns::JUSTIFICATIONS, &key).map_err(db_err)? {
if let Some(justification) = db.get(columns::JUSTIFICATIONS, &key)? {
// Tag each justification with the hardcoded ID for GRANDPA to avoid the dependency on
// the GRANDPA crate.
// NOTE: when storing justifications the previous API would get a `Vec<u8>` and still
// call encode on it.
let justification = Vec::<u8>::decode(&mut &justification[..])
.map_err(|_| sp_blockchain::Error::Backend("Invalid justification blob".into()))?;
.map_err(|_| UpgradeError::DecodingJustificationBlock)?;
let justifications = sp_runtime::Justifications::from((*b"FRNK", justification));
transaction.put_vec(columns::JUSTIFICATIONS, &key, justifications.encode());
}
}
db.write(transaction).map_err(db_err)?;
db.write(transaction)?;
Ok(())
}
/// Reads current database version from the file at given path.
/// If the file does not exist returns 0.
fn current_version(path: &Path) -> sp_blockchain::Result<u32> {
let unknown_version_err = || sp_blockchain::Error::Backend("Unknown database version".into());
fn current_version(path: &Path) -> UpgradeResult<u32> {
match fs::File::open(version_file_path(path)) {
Err(ref err) if err.kind() == ErrorKind::NotFound => Ok(0),
Err(_) => Err(unknown_version_err()),
Err(ref err) if err.kind() == ErrorKind::NotFound =>
Err(UpgradeError::MissingDatabaseVersionFile),
Err(_) => Err(UpgradeError::UnknownDatabaseVersion),
Ok(mut file) => {
let mut s = String::new();
file.read_to_string(&mut s).map_err(|_| unknown_version_err())?;
u32::from_str_radix(&s, 10).map_err(|_| unknown_version_err())
file.read_to_string(&mut s).map_err(|_| UpgradeError::UnknownDatabaseVersion)?;
u32::from_str_radix(&s, 10).map_err(|_| UpgradeError::UnknownDatabaseVersion)
},
}
}
/// Maps database error to client error
fn db_err(err: std::io::Error) -> sp_blockchain::Error {
sp_blockchain::Error::Backend(format!("{}", err))
}
/// Writes current database version to the file.
/// Creates a new file if the version file does not exist yet.
fn update_version(path: &Path) -> sp_blockchain::Result<()> {
fs::create_dir_all(path).map_err(db_err)?;
let mut file = fs::File::create(version_file_path(path)).map_err(db_err)?;
file.write_all(format!("{}", CURRENT_VERSION).as_bytes()).map_err(db_err)?;
pub fn update_version(path: &Path) -> io::Result<()> {
fs::create_dir_all(path)?;
let mut file = fs::File::create(version_file_path(path))?;
file.write_all(format!("{}", CURRENT_VERSION).as_bytes())?;
Ok(())
}
@@ -158,7 +170,7 @@ fn version_file_path(path: &Path) -> PathBuf {
mod tests {
use super::*;
use crate::{
tests::Block, DatabaseSettings, DatabaseSettingsSrc, KeepBlocks, TransactionStorageMode,
tests::Block, DatabaseSettings, DatabaseSource, KeepBlocks, TransactionStorageMode,
};
use sc_state_db::PruningMode;
@@ -176,7 +188,7 @@ mod tests {
state_cache_size: 0,
state_cache_child_ratio: None,
state_pruning: PruningMode::ArchiveAll,
source: DatabaseSettingsSrc::RocksDb { path: db_path.to_owned(), cache_size: 128 },
source: DatabaseSource::RocksDb { path: db_path.to_owned(), cache_size: 128 },
keep_blocks: KeepBlocks::All,
transaction_storage: TransactionStorageMode::BlockBody,
},