Refactored block body database scheme (#10779)

* Refactored tx storage database scheme

* Bump parity-db

* fmt

* Fix handling invalid index size + test

* Removed superflous result

* Minor changes

* fmt
This commit is contained in:
Arkadiy Paronyan
2022-03-04 11:30:29 +01:00
committed by GitHub
parent e6b6c8aac6
commit 2bd493ff12
15 changed files with 332 additions and 295 deletions
+261 -218
View File
@@ -111,13 +111,18 @@ pub const DB_HASH_LEN: usize = 32;
/// Hash type that this backend uses for the database.
pub type DbHash = sp_core::H256;
/// This is used as block body when storage-chain mode is enabled.
/// An extrinsic entry in the database.
#[derive(Debug, Encode, Decode)]
struct ExtrinsicHeader {
/// Hash of the indexed part
indexed_hash: DbHash, // Zero hash if there's no indexed data
/// The rest of the data.
data: Vec<u8>,
enum DbExtrinsic<B: BlockT> {
/// Extrinsic that contains indexed data.
Indexed {
/// Hash of the indexed part.
hash: DbHash,
/// Extrinsic header.
header: Vec<u8>,
},
/// Complete extrinsic data.
Full(B::Extrinsic),
}
/// A reference tracking state.
@@ -295,8 +300,6 @@ pub struct DatabaseSettings {
pub source: DatabaseSource,
/// Block pruning mode.
pub keep_blocks: KeepBlocks,
/// Block body/Transaction storage scheme.
pub transaction_storage: TransactionStorageMode,
}
/// Block pruning settings.
@@ -308,16 +311,6 @@ pub enum KeepBlocks {
Some(u32),
}
/// Block body storage scheme.
#[derive(Debug, Clone, Copy)]
pub enum TransactionStorageMode {
/// Store block body as an encoded list of full transactions in the BODY column
BlockBody,
/// Store a list of hashes in the BODY column and each transaction individually
/// in the TRANSACTION column.
StorageChain,
}
/// Where to find the database..
#[derive(Debug, Clone)]
pub enum DatabaseSource {
@@ -406,6 +399,7 @@ pub(crate) mod columns {
pub const OFFCHAIN: u32 = 9;
/// Transactions
pub const TRANSACTION: u32 = 11;
pub const BODY_INDEX: u32 = 12;
}
struct PendingBlock<Block: BlockT> {
@@ -453,14 +447,10 @@ pub struct BlockchainDb<Block: BlockT> {
leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
header_metadata_cache: Arc<HeaderMetadataCache<Block>>,
header_cache: Mutex<LinkedHashMap<Block::Hash, Option<Block::Header>>>,
transaction_storage: TransactionStorageMode,
}
impl<Block: BlockT> BlockchainDb<Block> {
fn new(
db: Arc<dyn Database<DbHash>>,
transaction_storage: TransactionStorageMode,
) -> ClientResult<Self> {
fn new(db: Arc<dyn Database<DbHash>>) -> ClientResult<Self> {
let meta = read_meta::<Block>(&*db, columns::HEADER)?;
let leaves = LeafSet::read_from_db(&*db, columns::META, meta_keys::LEAF_PREFIX)?;
Ok(BlockchainDb {
@@ -469,7 +459,6 @@ impl<Block: BlockT> BlockchainDb<Block> {
meta: Arc::new(RwLock::new(meta)),
header_metadata_cache: Arc::new(HeaderMetadataCache::default()),
header_cache: Default::default(),
transaction_storage,
})
}
@@ -558,59 +547,61 @@ impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for Blockcha
impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<Block> {
fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
let body = match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
Some(body) => body,
None => return Ok(None),
};
match self.transaction_storage {
TransactionStorageMode::BlockBody => match Decode::decode(&mut &body[..]) {
Ok(body) => Ok(Some(body)),
if let Some(body) = read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
// Plain body
match Decode::decode(&mut &body[..]) {
Ok(body) => return Ok(Some(body)),
Err(err) =>
return Err(sp_blockchain::Error::Backend(format!(
"Error decoding body: {}",
err
))),
},
TransactionStorageMode::StorageChain => {
match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) {
Ok(index) => {
let extrinsics: ClientResult<Vec<Block::Extrinsic>> = index
.into_iter()
.map(|ExtrinsicHeader { indexed_hash, data }| {
let decode_result = if indexed_hash != Default::default() {
match self.db.get(columns::TRANSACTION, indexed_hash.as_ref()) {
Some(t) => {
let mut input =
utils::join_input(data.as_ref(), t.as_ref());
Block::Extrinsic::decode(&mut input)
},
None =>
return Err(sp_blockchain::Error::Backend(format!(
"Missing indexed transaction {:?}",
indexed_hash
))),
}
} else {
Block::Extrinsic::decode(&mut data.as_ref())
};
decode_result.map_err(|err| {
sp_blockchain::Error::Backend(format!(
"Error decoding extrinsic: {}",
err
))
})
})
.collect();
Ok(Some(extrinsics?))
},
Err(err) =>
return Err(sp_blockchain::Error::Backend(format!(
"Error decoding body list: {}",
err
))),
}
},
}
}
if let Some(index) = read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY_INDEX, id)? {
match Vec::<DbExtrinsic<Block>>::decode(&mut &index[..]) {
Ok(index) => {
let mut body = Vec::new();
for ex in index {
match ex {
DbExtrinsic::Indexed { hash, header } => {
match self.db.get(columns::TRANSACTION, hash.as_ref()) {
Some(t) => {
let mut input =
utils::join_input(header.as_ref(), t.as_ref());
let ex = Block::Extrinsic::decode(&mut input).map_err(
|err| {
sp_blockchain::Error::Backend(format!(
"Error decoding indexed extrinsic: {}",
err
))
},
)?;
body.push(ex);
},
None =>
return Err(sp_blockchain::Error::Backend(format!(
"Missing indexed transaction {:?}",
hash
))),
};
},
DbExtrinsic::Full(ex) => {
body.push(ex);
},
}
}
return Ok(Some(body))
},
Err(err) =>
return Err(sp_blockchain::Error::Backend(format!(
"Error decoding body list: {}",
err
))),
}
}
Ok(None)
}
fn justifications(&self, id: BlockId<Block>) -> ClientResult<Option<Justifications>> {
@@ -648,37 +639,29 @@ impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<B
}
fn block_indexed_body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Vec<u8>>>> {
match self.transaction_storage {
TransactionStorageMode::BlockBody => Ok(None),
TransactionStorageMode::StorageChain => {
let body = match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
Some(body) => body,
None => return Ok(None),
};
match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) {
Ok(index) => {
let mut transactions = Vec::new();
for ExtrinsicHeader { indexed_hash, .. } in index.into_iter() {
if indexed_hash != Default::default() {
match self.db.get(columns::TRANSACTION, indexed_hash.as_ref()) {
Some(t) => transactions.push(t),
None =>
return Err(sp_blockchain::Error::Backend(format!(
"Missing indexed transaction {:?}",
indexed_hash
))),
}
}
let body = match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY_INDEX, id)? {
Some(body) => body,
None => return Ok(None),
};
match Vec::<DbExtrinsic<Block>>::decode(&mut &body[..]) {
Ok(index) => {
let mut transactions = Vec::new();
for ex in index.into_iter() {
if let DbExtrinsic::Indexed { hash, .. } = ex {
match self.db.get(columns::TRANSACTION, hash.as_ref()) {
Some(t) => transactions.push(t),
None =>
return Err(sp_blockchain::Error::Backend(format!(
"Missing indexed transaction {:?}",
hash
))),
}
Ok(Some(transactions))
},
Err(err) =>
return Err(sp_blockchain::Error::Backend(format!(
"Error decoding body list: {}",
err
))),
}
}
Ok(Some(transactions))
},
Err(err) =>
Err(sp_blockchain::Error::Backend(format!("Error decoding body list: {}", err))),
}
}
}
@@ -1005,7 +988,6 @@ pub struct Backend<Block: BlockT> {
import_lock: Arc<RwLock<()>>,
is_archive: bool,
keep_blocks: KeepBlocks,
transaction_storage: TransactionStorageMode,
io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>,
state_usage: Arc<StateUsageStats>,
genesis_state: RwLock<Option<Arc<DbGenesisStorage<Block>>>>,
@@ -1023,20 +1005,12 @@ impl<Block: BlockT> Backend<Block> {
/// Create new memory-backed client backend for tests.
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_test(keep_blocks: u32, canonicalization_delay: u64) -> Self {
Self::new_test_with_tx_storage(
keep_blocks,
canonicalization_delay,
TransactionStorageMode::BlockBody,
)
Self::new_test_with_tx_storage(keep_blocks, canonicalization_delay)
}
/// Create new memory-backed client backend for tests.
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_test_with_tx_storage(
keep_blocks: u32,
canonicalization_delay: u64,
transaction_storage: TransactionStorageMode,
) -> Self {
pub fn new_test_with_tx_storage(keep_blocks: u32, canonicalization_delay: u64) -> Self {
let db = kvdb_memorydb::create(crate::utils::NUM_COLUMNS);
let db = sp_database::as_database(db);
let db_setting = DatabaseSettings {
@@ -1045,7 +1019,6 @@ impl<Block: BlockT> Backend<Block> {
state_pruning: PruningMode::keep_blocks(keep_blocks),
source: DatabaseSource::Custom(db),
keep_blocks: KeepBlocks::Some(keep_blocks),
transaction_storage,
};
Self::new(db_setting, canonicalization_delay).expect("failed to create test-db")
@@ -1074,7 +1047,7 @@ impl<Block: BlockT> Backend<Block> {
config: &DatabaseSettings,
) -> ClientResult<Self> {
let is_archive_pruning = config.state_pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone(), config.transaction_storage.clone())?;
let blockchain = BlockchainDb::new(db.clone())?;
let map_e = |e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from_state_db(e);
let state_db: StateDb<_, _> = StateDb::new(
config.state_pruning.clone(),
@@ -1100,7 +1073,6 @@ impl<Block: BlockT> Backend<Block> {
io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1)),
state_usage: Arc::new(StateUsageStats::new()),
keep_blocks: config.keep_blocks.clone(),
transaction_storage: config.transaction_storage.clone(),
genesis_state: RwLock::new(None),
};
@@ -1334,26 +1306,18 @@ impl<Block: BlockT> Backend<Block> {
transaction.set_from_vec(columns::HEADER, &lookup_key, pending_block.header.encode());
if let Some(body) = pending_block.body {
match self.transaction_storage {
TransactionStorageMode::BlockBody => {
transaction.set_from_vec(columns::BODY, &lookup_key, body.encode());
},
TransactionStorageMode::StorageChain => {
let body =
apply_index_ops::<Block>(&mut transaction, body, operation.index_ops);
transaction.set_from_vec(columns::BODY, &lookup_key, body);
},
// If we have any index operations we save block in the new format with indexed
// extrinsic headers Otherwise we save the body as a single blob.
if operation.index_ops.is_empty() {
transaction.set_from_vec(columns::BODY, &lookup_key, body.encode());
} else {
let body =
apply_index_ops::<Block>(&mut transaction, body, operation.index_ops);
transaction.set_from_vec(columns::BODY_INDEX, &lookup_key, body);
}
}
if let Some(body) = pending_block.indexed_body {
match self.transaction_storage {
TransactionStorageMode::BlockBody => {
debug!(target: "db", "Commit: ignored indexed block body");
},
TransactionStorageMode::StorageChain => {
apply_indexed_body::<Block>(&mut transaction, body);
},
}
apply_indexed_body::<Block>(&mut transaction, body);
}
if let Some(justifications) = pending_block.justifications {
transaction.set_from_vec(
@@ -1691,7 +1655,7 @@ impl<Block: BlockT> Backend<Block> {
let mut hash = h.clone();
// Follow displaced chains back until we reach a finalized block.
// Since leaves are discarded due to finality, they can't have parents
// that are canonical, but not yet finalized. So we stop deletig as soon as
// that are canonical, but not yet finalized. So we stop deleting as soon as
// we reach canonical chain.
while self.blockchain.hash(number)? != Some(hash.clone()) {
let id = BlockId::<Block>::hash(hash.clone());
@@ -1714,36 +1678,37 @@ impl<Block: BlockT> Backend<Block> {
transaction: &mut Transaction<DbHash>,
id: BlockId<Block>,
) -> ClientResult<()> {
match read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY, id)? {
Some(body) => {
debug!(target: "db", "Removing block #{}", id);
utils::remove_from_db(
transaction,
&*self.storage.db,
columns::KEY_LOOKUP,
columns::BODY,
id,
)?;
match self.transaction_storage {
TransactionStorageMode::BlockBody => {},
TransactionStorageMode::StorageChain => {
match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) {
Ok(body) =>
for ExtrinsicHeader { indexed_hash, .. } in body {
if indexed_hash != Default::default() {
transaction.release(columns::TRANSACTION, indexed_hash);
}
},
Err(err) =>
return Err(sp_blockchain::Error::Backend(format!(
"Error decoding body list: {}",
err
))),
debug!(target: "db", "Removing block #{}", id);
utils::remove_from_db(
transaction,
&*self.storage.db,
columns::KEY_LOOKUP,
columns::BODY,
id,
)?;
if let Some(index) =
read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY_INDEX, id)?
{
utils::remove_from_db(
transaction,
&*self.storage.db,
columns::KEY_LOOKUP,
columns::BODY_INDEX,
id,
)?;
match Vec::<DbExtrinsic<Block>>::decode(&mut &index[..]) {
Ok(index) =>
for ex in index {
if let DbExtrinsic::Indexed { hash, .. } = ex {
transaction.release(columns::TRANSACTION, hash);
}
},
}
},
None => return Ok(()),
Err(err) =>
return Err(sp_blockchain::Error::Backend(format!(
"Error decoding body list: {}",
err
))),
}
}
Ok(())
}
@@ -1785,7 +1750,7 @@ fn apply_index_ops<Block: BlockT>(
body: Vec<Block::Extrinsic>,
ops: Vec<IndexOperation>,
) -> Vec<u8> {
let mut extrinsic_headers: Vec<ExtrinsicHeader> = Vec::with_capacity(body.len());
let mut extrinsic_index: Vec<DbExtrinsic<Block>> = Vec::with_capacity(body.len());
let mut index_map = HashMap::new();
let mut renewed_map = HashMap::new();
for op in ops {
@@ -1799,37 +1764,44 @@ fn apply_index_ops<Block: BlockT>(
}
}
for (index, extrinsic) in body.into_iter().enumerate() {
let extrinsic = extrinsic.encode();
let extrinsic_header = if let Some(hash) = renewed_map.get(&(index as u32)) {
let db_extrinsic = if let Some(hash) = renewed_map.get(&(index as u32)) {
// Bump ref counter
let extrinsic = extrinsic.encode();
transaction.reference(columns::TRANSACTION, DbHash::from_slice(hash.as_ref()));
ExtrinsicHeader { indexed_hash: hash.clone(), data: extrinsic }
DbExtrinsic::Indexed { hash: hash.clone(), header: extrinsic }
} else {
match index_map.get(&(index as u32)) {
Some((hash, size)) if *size as usize <= extrinsic.len() => {
let offset = extrinsic.len() - *size as usize;
transaction.store(
columns::TRANSACTION,
DbHash::from_slice(hash.as_ref()),
extrinsic[offset..].to_vec(),
);
ExtrinsicHeader {
indexed_hash: DbHash::from_slice(hash.as_ref()),
data: extrinsic[..offset].to_vec(),
Some((hash, size)) => {
let encoded = extrinsic.encode();
if *size as usize <= encoded.len() {
let offset = encoded.len() - *size as usize;
transaction.store(
columns::TRANSACTION,
DbHash::from_slice(hash.as_ref()),
encoded[offset..].to_vec(),
);
DbExtrinsic::Indexed {
hash: DbHash::from_slice(hash.as_ref()),
header: encoded[..offset].to_vec(),
}
} else {
// Invalid indexed slice. Just store full data and don't index anything.
DbExtrinsic::Full(extrinsic)
}
},
_ => ExtrinsicHeader { indexed_hash: Default::default(), data: extrinsic },
_ => DbExtrinsic::Full(extrinsic),
}
};
extrinsic_headers.push(extrinsic_header);
extrinsic_index.push(db_extrinsic);
}
debug!(
target: "db",
"DB transaction index: {} inserted, {} renewed",
"DB transaction index: {} inserted, {} renewed, {} full",
index_map.len(),
renewed_map.len()
renewed_map.len(),
extrinsic_index.len() - index_map.len() - renewed_map.len(),
);
extrinsic_headers.encode()
extrinsic_index.encode()
}
fn apply_indexed_body<Block: BlockT>(transaction: &mut Transaction<DbHash>, body: Vec<Vec<u8>>) {
@@ -2417,7 +2389,6 @@ pub(crate) mod tests {
state_pruning: PruningMode::keep_blocks(1),
source: DatabaseSource::Custom(backing),
keep_blocks: KeepBlocks::All,
transaction_storage: TransactionStorageMode::BlockBody,
},
0,
)
@@ -3087,46 +3058,43 @@ pub(crate) mod tests {
#[test]
fn prune_blocks_on_finalize() {
for storage in &[TransactionStorageMode::BlockBody, TransactionStorageMode::StorageChain] {
let backend = Backend::<Block>::new_test_with_tx_storage(2, 0, *storage);
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
for i in 0..5 {
let hash = insert_block(
&backend,
i,
prev_hash,
None,
Default::default(),
vec![i.into()],
None,
)
.unwrap();
blocks.push(hash);
prev_hash = hash;
}
{
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap();
for i in 1..5 {
op.mark_finalized(BlockId::Hash(blocks[i]), None).unwrap();
}
backend.commit_operation(op).unwrap();
}
let bc = backend.blockchain();
assert_eq!(None, bc.body(BlockId::hash(blocks[0])).unwrap());
assert_eq!(None, bc.body(BlockId::hash(blocks[1])).unwrap());
assert_eq!(None, bc.body(BlockId::hash(blocks[2])).unwrap());
assert_eq!(Some(vec![3.into()]), bc.body(BlockId::hash(blocks[3])).unwrap());
assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap());
let backend = Backend::<Block>::new_test_with_tx_storage(2, 0);
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
for i in 0..5 {
let hash = insert_block(
&backend,
i,
prev_hash,
None,
Default::default(),
vec![i.into()],
None,
)
.unwrap();
blocks.push(hash);
prev_hash = hash;
}
{
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap();
for i in 1..5 {
op.mark_finalized(BlockId::Hash(blocks[i]), None).unwrap();
}
backend.commit_operation(op).unwrap();
}
let bc = backend.blockchain();
assert_eq!(None, bc.body(BlockId::hash(blocks[0])).unwrap());
assert_eq!(None, bc.body(BlockId::hash(blocks[1])).unwrap());
assert_eq!(None, bc.body(BlockId::hash(blocks[2])).unwrap());
assert_eq!(Some(vec![3.into()]), bc.body(BlockId::hash(blocks[3])).unwrap());
assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap());
}
#[test]
fn prune_blocks_on_finalize_with_fork() {
let backend =
Backend::<Block>::new_test_with_tx_storage(2, 10, TransactionStorageMode::StorageChain);
let backend = Backend::<Block>::new_test_with_tx_storage(2, 10);
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
for i in 0..5 {
@@ -3185,10 +3153,86 @@ pub(crate) mod tests {
assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap());
}
#[test]
fn indexed_data_block_body() {
let backend = Backend::<Block>::new_test_with_tx_storage(1, 10);
let x0 = ExtrinsicWrapper::from(0u64).encode();
let x1 = ExtrinsicWrapper::from(1u64).encode();
let x0_hash = <HashFor<Block> as sp_core::Hasher>::hash(&x0[1..]);
let x1_hash = <HashFor<Block> as sp_core::Hasher>::hash(&x1[1..]);
let index = vec![
IndexOperation::Insert {
extrinsic: 0,
hash: x0_hash.as_ref().to_vec(),
size: (x0.len() - 1) as u32,
},
IndexOperation::Insert {
extrinsic: 1,
hash: x1_hash.as_ref().to_vec(),
size: (x1.len() - 1) as u32,
},
];
let hash = insert_block(
&backend,
0,
Default::default(),
None,
Default::default(),
vec![0u64.into(), 1u64.into()],
Some(index),
)
.unwrap();
let bc = backend.blockchain();
assert_eq!(bc.indexed_transaction(&x0_hash).unwrap().unwrap(), &x0[1..]);
assert_eq!(bc.indexed_transaction(&x1_hash).unwrap().unwrap(), &x1[1..]);
// Push one more blocks and make sure block is pruned and transaction index is cleared.
insert_block(&backend, 1, hash, None, Default::default(), vec![], None).unwrap();
backend.finalize_block(BlockId::Number(1), None).unwrap();
assert_eq!(bc.body(BlockId::Number(0)).unwrap(), None);
assert_eq!(bc.indexed_transaction(&x0_hash).unwrap(), None);
assert_eq!(bc.indexed_transaction(&x1_hash).unwrap(), None);
}
#[test]
fn index_invalid_size() {
let backend = Backend::<Block>::new_test_with_tx_storage(1, 10);
let x0 = ExtrinsicWrapper::from(0u64).encode();
let x1 = ExtrinsicWrapper::from(1u64).encode();
let x0_hash = <HashFor<Block> as sp_core::Hasher>::hash(&x0[..]);
let x1_hash = <HashFor<Block> as sp_core::Hasher>::hash(&x1[..]);
let index = vec![
IndexOperation::Insert {
extrinsic: 0,
hash: x0_hash.as_ref().to_vec(),
size: (x0.len()) as u32,
},
IndexOperation::Insert {
extrinsic: 1,
hash: x1_hash.as_ref().to_vec(),
size: (x1.len() + 1) as u32,
},
];
insert_block(
&backend,
0,
Default::default(),
None,
Default::default(),
vec![0u64.into(), 1u64.into()],
Some(index),
)
.unwrap();
let bc = backend.blockchain();
assert_eq!(bc.indexed_transaction(&x0_hash).unwrap().unwrap(), &x0[..]);
assert_eq!(bc.indexed_transaction(&x1_hash).unwrap(), None);
}
#[test]
fn renew_transaction_storage() {
let backend =
Backend::<Block>::new_test_with_tx_storage(2, 10, TransactionStorageMode::StorageChain);
let backend = Backend::<Block>::new_test_with_tx_storage(2, 10);
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
let x1 = ExtrinsicWrapper::from(0u64).encode();
@@ -3235,8 +3279,7 @@ pub(crate) mod tests {
#[test]
fn remove_leaf_block_works() {
let backend =
Backend::<Block>::new_test_with_tx_storage(2, 10, TransactionStorageMode::StorageChain);
let backend = Backend::<Block>::new_test_with_tx_storage(2, 10);
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
for i in 0..2 {
+16 -2
View File
@@ -38,20 +38,22 @@ pub fn open<H: Clone + AsRef<[u8]>>(
path: &std::path::Path,
db_type: DatabaseType,
create: bool,
upgrade: bool,
) -> parity_db::Result<std::sync::Arc<dyn Database<H>>> {
let mut config = parity_db::Options::with_columns(path, NUM_COLUMNS as u8);
match db_type {
DatabaseType::Full => {
let indexes = [
let compressed = [
columns::STATE,
columns::HEADER,
columns::BODY,
columns::BODY_INDEX,
columns::TRANSACTION,
columns::JUSTIFICATIONS,
];
for i in indexes {
for i in compressed {
let mut column = &mut config.columns[i as usize];
column.compression = parity_db::CompressionType::Lz4;
}
@@ -60,9 +62,21 @@ pub fn open<H: Clone + AsRef<[u8]>>(
state_col.ref_counted = true;
state_col.preimage = true;
state_col.uniform = true;
let mut tx_col = &mut config.columns[columns::TRANSACTION as usize];
tx_col.ref_counted = true;
tx_col.preimage = true;
tx_col.uniform = true;
},
}
if upgrade {
log::info!("Upgrading database metadata.");
if let Some(meta) = parity_db::Options::load_metadata(path)? {
config.write_metadata(path, &meta.salt)?;
}
}
let db = if create {
parity_db::Db::open_or_create(&config)?
} else {
+34 -8
View File
@@ -33,11 +33,12 @@ use sp_runtime::traits::Block as BlockT;
const VERSION_FILE_NAME: &'static str = "db_version";
/// Current db version.
const CURRENT_VERSION: u32 = 3;
const CURRENT_VERSION: u32 = 4;
/// Number of columns in v1.
const V1_NUM_COLUMNS: u32 = 11;
const V2_NUM_COLUMNS: u32 = 12;
const V3_NUM_COLUMNS: u32 = 12;
/// Database upgrade errors.
#[derive(Debug)]
@@ -68,7 +69,7 @@ impl fmt::Display for UpgradeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
UpgradeError::UnknownDatabaseVersion => {
write!(f, "Database version cannot be read from exisiting db_version file")
write!(f, "Database version cannot be read from existing db_version file")
},
UpgradeError::MissingDatabaseVersionFile => write!(f, "Missing database version file"),
UpgradeError::UnsupportedVersion(version) => {
@@ -92,9 +93,16 @@ pub fn upgrade_db<Block: BlockT>(db_path: &Path, db_type: DatabaseType) -> Upgra
0 => return Err(UpgradeError::UnsupportedVersion(db_version)),
1 => {
migrate_1_to_2::<Block>(db_path, db_type)?;
migrate_2_to_3::<Block>(db_path, db_type)?
migrate_2_to_3::<Block>(db_path, db_type)?;
migrate_3_to_4::<Block>(db_path, db_type)?;
},
2 => {
migrate_2_to_3::<Block>(db_path, db_type)?;
migrate_3_to_4::<Block>(db_path, db_type)?;
},
3 => {
migrate_3_to_4::<Block>(db_path, db_type)?;
},
2 => migrate_2_to_3::<Block>(db_path, db_type)?,
CURRENT_VERSION => (),
_ => return Err(UpgradeError::FutureDatabaseVersion(db_version)),
}
@@ -139,6 +147,15 @@ fn migrate_2_to_3<Block: BlockT>(db_path: &Path, _db_type: DatabaseType) -> Upgr
Ok(())
}
/// Migration from version3 to version4:
/// 1) the number of columns has changed from 12 to 13;
/// 2) BODY_INDEX column is added;
fn migrate_3_to_4<Block: BlockT>(db_path: &Path, _db_type: DatabaseType) -> UpgradeResult<()> {
let db_cfg = DatabaseConfig::with_columns(V3_NUM_COLUMNS);
let db = Database::open(&db_cfg, db_path)?;
db.add_column().map_err(Into::into)
}
/// Reads current database version from the file at given path.
/// If the file does not exist returns 0.
fn current_version(path: &Path) -> UpgradeResult<u32> {
@@ -173,9 +190,7 @@ fn version_file_path(path: &Path) -> PathBuf {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
tests::Block, DatabaseSettings, DatabaseSource, KeepBlocks, TransactionStorageMode,
};
use crate::{tests::Block, DatabaseSettings, DatabaseSource, KeepBlocks};
use sc_state_db::PruningMode;
fn create_db(db_path: &Path, version: Option<u32>) {
@@ -194,7 +209,6 @@ mod tests {
state_pruning: PruningMode::ArchiveAll,
source: DatabaseSource::RocksDb { path: db_path.to_owned(), cache_size: 128 },
keep_blocks: KeepBlocks::All,
transaction_storage: TransactionStorageMode::BlockBody,
},
db_type,
)
@@ -229,4 +243,16 @@ mod tests {
assert_eq!(current_version(&db_path).unwrap(), CURRENT_VERSION);
}
}
#[test]
fn upgrade_to_4_works() {
let db_type = DatabaseType::Full;
for version_from_file in &[None, Some(1), Some(2), Some(3)] {
let db_dir = tempfile::TempDir::new().unwrap();
let db_path = db_dir.path().join(db_type.as_str());
create_db(&db_path, *version_from_file);
open_database(&db_path, db_type).unwrap();
assert_eq!(current_version(&db_path).unwrap(), CURRENT_VERSION);
}
}
}
+11 -6
View File
@@ -40,7 +40,7 @@ use sp_trie::DBValue;
feature = "test-helpers",
test
))]
pub const NUM_COLUMNS: u32 = 12;
pub const NUM_COLUMNS: u32 = 13;
/// Meta column. The set of keys in the column is shared by full && light storages.
pub const COLUMN_META: u32 = 0;
@@ -252,7 +252,7 @@ impl From<OpenDbError> for sp_blockchain::Error {
#[cfg(feature = "with-parity-db")]
impl From<parity_db::Error> for OpenDbError {
fn from(err: parity_db::Error) -> Self {
if err.to_string().contains("use open_or_create") {
if matches!(err, parity_db::Error::DatabaseNotFound) {
OpenDbError::DoesNotExist
} else {
OpenDbError::Internal(err.to_string())
@@ -272,8 +272,14 @@ impl From<io::Error> for OpenDbError {
#[cfg(feature = "with-parity-db")]
fn open_parity_db<Block: BlockT>(path: &Path, db_type: DatabaseType, create: bool) -> OpenDbResult {
let db = crate::parity_db::open(path, db_type, create)?;
Ok(db)
match crate::parity_db::open(path, db_type, create, false) {
Ok(db) => Ok(db),
Err(parity_db::Error::InvalidConfiguration(_)) => {
// Try to update the database with the new config
Ok(crate::parity_db::open(path, db_type, create, true)?)
},
Err(e) => Err(e.into()),
}
}
#[cfg(not(feature = "with-parity-db"))]
@@ -573,7 +579,7 @@ impl<'a, 'b> codec::Input for JoinInput<'a, 'b> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{KeepBlocks, TransactionStorageMode};
use crate::KeepBlocks;
use codec::Input;
use sc_state_db::PruningMode;
use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper};
@@ -689,7 +695,6 @@ mod tests {
state_pruning: PruningMode::ArchiveAll,
source,
keep_blocks: KeepBlocks::All,
transaction_storage: TransactionStorageMode::BlockBody,
}
}