mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
RollingSessionWindow cleanup (#7204)
* Replace `RollingSessionWindow` with `RuntimeInfo` - initial commit * Fix tests in import * Fix the rest of the tests * Remove dead code * Fix todos * Simplify session caching * Comments for `SessionInfoProvider` * Separate `SessionInfoProvider` from `State` * `cache_session_info_for_head` becomes freestanding function * Remove unneeded `mut` usage * fn session_info -> fn get_session_info() to avoid name clashes. The function also tries to initialize `SessionInfoProvider` * Fix SessionInfo retrieval * Code cleanup * Don't wrap `SessionInfoProvider` in an `Option` * Remove `earliest_session()` * Remove pre-caching -> wip * Fix some tests and code cleanup * Fix all tests * Fixes in tests * Fix comments, variable names and small style changes * Fix a warning * impl From<SessionWindowSize> for NonZeroUsize * Fix logging for `get_session_info` - remove redundant logs and decrease log level to DEBUG * Code review feedback * Storage migration removing `COL_SESSION_WINDOW_DATA` from parachains db * Remove `col_session_data` usages * Storage migration clearing columns w/o removing them * Remove session data column usages from `approval-voting` and `dispute-coordinator` tests * Add some test cases from `RollingSessionWindow` to `dispute-coordinator` tests * Fix formatting in initialized.rs * Fix a corner case in `SessionInfo` caching for `dispute-coordinator` * Remove `RollingSessionWindow` ;( * Revert "Fix formatting in initialized.rs" This reverts commit 0f94664ec9f3a7e3737a30291195990e1e7065fc. * v2 to v3 migration drops `COL_DISPUTE_COORDINATOR_DATA` instead of clearing it * Fix `NUM_COLUMNS` in `approval-voting` * Use `columns::v3::NUM_COLUMNS` when opening db * Update node/service/src/parachains_db/upgrade.rs Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com> * Don't write in `COL_DISPUTE_COORDINATOR_DATA` for `test_rocksdb_migrate_2_to_3` * Fix `NUM+COLUMNS` in approval_voting * Fix formatting * Fix columns usage * Clarification comments about the different db versions --------- Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
8d117412dc
commit
eb1ed63b9d
@@ -15,6 +15,12 @@
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Version 1 of the DB schema.
|
||||
//!
|
||||
//! Note that the version here differs from the actual version of the parachains
|
||||
//! database (check `CURRENT_VERSION` in `node/service/src/parachains_db/upgrade.rs`).
|
||||
//! The code in this module implements the way approval voting works with
|
||||
//! its data in the database. Any breaking changes here will still
|
||||
//! require a db migration (check `node/service/src/parachains_db/upgrade.rs`).
|
||||
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
use polkadot_node_primitives::approval::{AssignmentCert, DelayTranche};
|
||||
@@ -154,8 +160,6 @@ pub type Bitfield = BitVec<u8, BitOrderLsb0>;
|
||||
pub struct Config {
|
||||
/// The column family in the database where data is stored.
|
||||
pub col_approval_data: u32,
|
||||
/// The column of the database where rolling session window data is stored.
|
||||
pub col_session_data: u32,
|
||||
}
|
||||
|
||||
/// Details pertaining to our assignment on a block.
|
||||
|
||||
@@ -28,12 +28,10 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use ::test_helpers::{dummy_candidate_receipt, dummy_candidate_receipt_bad_sig, dummy_hash};
|
||||
|
||||
const DATA_COL: u32 = 0;
|
||||
const SESSION_DATA_COL: u32 = 1;
|
||||
|
||||
const NUM_COLUMNS: u32 = 2;
|
||||
const NUM_COLUMNS: u32 = 1;
|
||||
|
||||
const TEST_CONFIG: Config =
|
||||
Config { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
|
||||
const TEST_CONFIG: Config = Config { col_approval_data: DATA_COL };
|
||||
|
||||
fn make_db() -> (DbBackend, Arc<dyn Database>) {
|
||||
let db = kvdb_memorydb::create(NUM_COLUMNS);
|
||||
|
||||
@@ -609,12 +609,10 @@ pub(crate) mod tests {
|
||||
use crate::{approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry};
|
||||
|
||||
const DATA_COL: u32 = 0;
|
||||
const SESSION_DATA_COL: u32 = 1;
|
||||
|
||||
const NUM_COLUMNS: u32 = 2;
|
||||
const NUM_COLUMNS: u32 = 1;
|
||||
|
||||
const TEST_CONFIG: DatabaseConfig =
|
||||
DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
|
||||
const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_approval_data: DATA_COL };
|
||||
#[derive(Default)]
|
||||
struct MockClock;
|
||||
|
||||
|
||||
@@ -116,8 +116,6 @@ const LOG_TARGET: &str = "parachain::approval-voting";
|
||||
pub struct Config {
|
||||
/// The column family in the DB where approval-voting data is stored.
|
||||
pub col_approval_data: u32,
|
||||
/// The of the DB where rolling session info is stored.
|
||||
pub col_session_data: u32,
|
||||
/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
|
||||
/// divisible by 500.
|
||||
pub slot_duration_millis: u64,
|
||||
@@ -357,10 +355,7 @@ impl ApprovalVotingSubsystem {
|
||||
keystore,
|
||||
slot_duration_millis: config.slot_duration_millis,
|
||||
db,
|
||||
db_config: DatabaseConfig {
|
||||
col_approval_data: config.col_approval_data,
|
||||
col_session_data: config.col_session_data,
|
||||
},
|
||||
db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
|
||||
mode: Mode::Syncing(sync_oracle),
|
||||
metrics,
|
||||
}
|
||||
@@ -369,10 +364,8 @@ impl ApprovalVotingSubsystem {
|
||||
/// Revert to the block corresponding to the specified `hash`.
|
||||
/// The operation is not allowed for blocks older than the last finalized one.
|
||||
pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
|
||||
let config = approval_db::v1::Config {
|
||||
col_approval_data: self.db_config.col_approval_data,
|
||||
col_session_data: self.db_config.col_session_data,
|
||||
};
|
||||
let config =
|
||||
approval_db::v1::Config { col_approval_data: self.db_config.col_approval_data };
|
||||
let mut backend = approval_db::v1::DbBackend::new(self.db.clone(), config);
|
||||
let mut overlay = OverlayedBackend::new(&backend);
|
||||
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::tests::test_constants::TEST_CONFIG;
|
||||
|
||||
use super::*;
|
||||
use polkadot_node_primitives::{
|
||||
approval::{
|
||||
@@ -115,12 +113,10 @@ fn make_sync_oracle(val: bool) -> (Box<dyn SyncOracle + Send>, TestSyncOracleHan
|
||||
pub mod test_constants {
|
||||
use crate::approval_db::v1::Config as DatabaseConfig;
|
||||
const DATA_COL: u32 = 0;
|
||||
const SESSION_DATA_COL: u32 = 1;
|
||||
|
||||
pub(crate) const NUM_COLUMNS: u32 = 2;
|
||||
pub(crate) const NUM_COLUMNS: u32 = 1;
|
||||
|
||||
pub(crate) const TEST_CONFIG: DatabaseConfig =
|
||||
DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
|
||||
pub(crate) const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_approval_data: DATA_COL };
|
||||
}
|
||||
|
||||
struct MockSupportsParachains;
|
||||
@@ -493,7 +489,6 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
|
||||
Config {
|
||||
col_approval_data: test_constants::TEST_CONFIG.col_approval_data,
|
||||
slot_duration_millis: SLOT_DURATION_MILLIS,
|
||||
col_session_data: TEST_CONFIG.col_session_data,
|
||||
},
|
||||
Arc::new(db),
|
||||
Arc::new(keystore),
|
||||
|
||||
@@ -15,6 +15,12 @@
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! `V1` database for the dispute coordinator.
|
||||
//!
|
||||
//! Note that the version here differs from the actual version of the parachains
|
||||
//! database (check `CURRENT_VERSION` in `node/service/src/parachains_db/upgrade.rs`).
|
||||
//! The code in this module implements the way dispute coordinator works with
|
||||
//! the dispute data in the database. Any breaking changes here will still
|
||||
//! require a db migration (check `node/service/src/parachains_db/upgrade.rs`).
|
||||
|
||||
use polkadot_node_primitives::DisputeStatus;
|
||||
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
|
||||
@@ -206,8 +212,6 @@ fn candidate_votes_session_prefix(session: SessionIndex) -> [u8; 15 + 4] {
|
||||
pub struct ColumnConfiguration {
|
||||
/// The column in the key-value DB where data is stored.
|
||||
pub col_dispute_data: u32,
|
||||
/// The column in the key-value DB where session data is stored.
|
||||
pub col_session_data: u32,
|
||||
}
|
||||
|
||||
/// Tracked votes on candidates, for the purposes of dispute resolution.
|
||||
@@ -378,7 +382,7 @@ mod tests {
|
||||
let db = kvdb_memorydb::create(1);
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]);
|
||||
let store = Arc::new(db);
|
||||
let config = ColumnConfiguration { col_dispute_data: 0, col_session_data: 1 };
|
||||
let config = ColumnConfiguration { col_dispute_data: 0 };
|
||||
DbBackend::new(store, config, Metrics::default())
|
||||
}
|
||||
|
||||
|
||||
@@ -305,13 +305,12 @@ impl Initialized {
|
||||
Ok(session_idx)
|
||||
if self.gaps_in_cache || session_idx > self.highest_session_seen =>
|
||||
{
|
||||
// If error has occurred during last session caching - fetch the whole window
|
||||
// Otherwise - cache only the new sessions
|
||||
let lower_bound = if self.gaps_in_cache {
|
||||
session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)
|
||||
} else {
|
||||
self.highest_session_seen + 1
|
||||
};
|
||||
// Fetch the last `DISPUTE_WINDOW` number of sessions unless there are no gaps in
|
||||
// cache and we are not missing too many `SessionInfo`s
|
||||
let mut lower_bound = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
|
||||
if !self.gaps_in_cache && self.highest_session_seen > lower_bound {
|
||||
lower_bound = self.highest_session_seen + 1
|
||||
}
|
||||
|
||||
// There is a new session. Perform a dummy fetch to cache it.
|
||||
for idx in lower_bound..=session_idx {
|
||||
|
||||
@@ -127,16 +127,11 @@ pub struct DisputeCoordinatorSubsystem {
|
||||
pub struct Config {
|
||||
/// The data column in the store to use for dispute data.
|
||||
pub col_dispute_data: u32,
|
||||
/// The data column in the store to use for session data.
|
||||
pub col_session_data: u32,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
fn column_config(&self) -> db::v1::ColumnConfiguration {
|
||||
db::v1::ColumnConfiguration {
|
||||
col_dispute_data: self.col_dispute_data,
|
||||
col_session_data: self.col_session_data,
|
||||
}
|
||||
db::v1::ColumnConfiguration { col_dispute_data: self.col_dispute_data }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ use polkadot_node_subsystem_util::database::Database;
|
||||
|
||||
use polkadot_node_primitives::{
|
||||
DisputeMessage, DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement,
|
||||
DISPUTE_WINDOW,
|
||||
};
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{
|
||||
@@ -214,9 +215,9 @@ impl Default for TestState {
|
||||
make_keystore(vec![Sr25519Keyring::Alice.to_seed()].into_iter()).into();
|
||||
|
||||
let db = kvdb_memorydb::create(1);
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]);
|
||||
let db = Arc::new(db);
|
||||
let config = Config { col_dispute_data: 0, col_session_data: 1 };
|
||||
let config = Config { col_dispute_data: 0 };
|
||||
|
||||
let genesis_header = Header {
|
||||
parent_hash: Hash::zero(),
|
||||
@@ -330,9 +331,11 @@ impl TestState {
|
||||
assert_eq!(h, block_hash);
|
||||
let _ = tx.send(Ok(session));
|
||||
|
||||
let first_expected_session = session.saturating_sub(DISPUTE_WINDOW.get() - 1);
|
||||
|
||||
// Queries for session caching - see `handle_startup`
|
||||
if self.known_session.is_none() {
|
||||
for i in 0..=session {
|
||||
for i in first_expected_session..=session {
|
||||
assert_matches!(
|
||||
overseer_recv(virtual_overseer).await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
@@ -3393,3 +3396,174 @@ fn informs_chain_selection_when_dispute_concluded_against() {
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
// On startup `SessionInfo` cache should be populated
|
||||
#[test]
|
||||
fn session_info_caching_on_startup_works() {
|
||||
test_harness(|mut test_state, mut virtual_overseer| {
|
||||
Box::pin(async move {
|
||||
let session = 1;
|
||||
|
||||
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
|
||||
|
||||
test_state
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
// Underflow means that no more than `DISPUTE_WINDOW` sessions should be fetched on startup
|
||||
#[test]
|
||||
fn session_info_caching_doesnt_underflow() {
|
||||
test_harness(|mut test_state, mut virtual_overseer| {
|
||||
Box::pin(async move {
|
||||
let session = DISPUTE_WINDOW.get() + 1;
|
||||
|
||||
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
|
||||
|
||||
test_state
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
// Cached `SessionInfo` shouldn't be re-requested from the runtime
|
||||
#[test]
|
||||
fn session_info_is_requested_only_once() {
|
||||
test_harness(|mut test_state, mut virtual_overseer| {
|
||||
Box::pin(async move {
|
||||
let session = 1;
|
||||
|
||||
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
|
||||
|
||||
// This leaf activation shouldn't fetch `SessionInfo` because the session is already cached
|
||||
test_state
|
||||
.activate_leaf_at_session(
|
||||
&mut virtual_overseer,
|
||||
session,
|
||||
3,
|
||||
vec![make_candidate_included_event(make_valid_candidate_receipt())],
|
||||
)
|
||||
.await;
|
||||
|
||||
// This leaf activation should fetch `SessionInfo` because the session is new
|
||||
test_state
|
||||
.activate_leaf_at_session(
|
||||
&mut virtual_overseer,
|
||||
session + 1,
|
||||
4,
|
||||
vec![make_candidate_included_event(make_valid_candidate_receipt())],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_,
|
||||
RuntimeApiRequest::SessionInfo(session_index, tx),
|
||||
)) => {
|
||||
assert_eq!(session_index, 2);
|
||||
let _ = tx.send(Ok(Some(test_state.session_info())));
|
||||
}
|
||||
);
|
||||
test_state
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
// Big jump means the new session we see with a leaf update is at least a `DISPUTE_WINDOW` bigger than
|
||||
// the already known one. In this case The whole `DISPUTE_WINDOW` should be fetched.
|
||||
#[test]
|
||||
fn session_info_big_jump_works() {
|
||||
test_harness(|mut test_state, mut virtual_overseer| {
|
||||
Box::pin(async move {
|
||||
let session_on_startup = 1;
|
||||
|
||||
test_state.handle_resume_sync(&mut virtual_overseer, session_on_startup).await;
|
||||
|
||||
// This leaf activation shouldn't fetch `SessionInfo` because the session is already cached
|
||||
test_state
|
||||
.activate_leaf_at_session(
|
||||
&mut virtual_overseer,
|
||||
session_on_startup,
|
||||
3,
|
||||
vec![make_candidate_included_event(make_valid_candidate_receipt())],
|
||||
)
|
||||
.await;
|
||||
|
||||
let session_after_jump = session_on_startup + DISPUTE_WINDOW.get() + 10;
|
||||
// This leaf activation should cache all missing `SessionInfo`s
|
||||
test_state
|
||||
.activate_leaf_at_session(
|
||||
&mut virtual_overseer,
|
||||
session_after_jump,
|
||||
4,
|
||||
vec![make_candidate_included_event(make_valid_candidate_receipt())],
|
||||
)
|
||||
.await;
|
||||
|
||||
let first_expected_session =
|
||||
session_after_jump.saturating_sub(DISPUTE_WINDOW.get() - 1);
|
||||
for expected_idx in first_expected_session..=session_after_jump {
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_,
|
||||
RuntimeApiRequest::SessionInfo(session_index, tx),
|
||||
)) => {
|
||||
assert_eq!(session_index, expected_idx);
|
||||
let _ = tx.send(Ok(Some(test_state.session_info())));
|
||||
}
|
||||
);
|
||||
}
|
||||
test_state
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
// Small jump means the new session we see with a leaf update is at less than last known one + `DISPUTE_WINDOW`. In this
|
||||
// case fetching should start from last known one + 1.
|
||||
#[test]
|
||||
fn session_info_small_jump_works() {
|
||||
test_harness(|mut test_state, mut virtual_overseer| {
|
||||
Box::pin(async move {
|
||||
let session_on_startup = 1;
|
||||
|
||||
test_state.handle_resume_sync(&mut virtual_overseer, session_on_startup).await;
|
||||
|
||||
// This leaf activation shouldn't fetch `SessionInfo` because the session is already cached
|
||||
test_state
|
||||
.activate_leaf_at_session(
|
||||
&mut virtual_overseer,
|
||||
session_on_startup,
|
||||
3,
|
||||
vec![make_candidate_included_event(make_valid_candidate_receipt())],
|
||||
)
|
||||
.await;
|
||||
|
||||
let session_after_jump = session_on_startup + DISPUTE_WINDOW.get() - 1;
|
||||
// This leaf activation should cache all missing `SessionInfo`s
|
||||
test_state
|
||||
.activate_leaf_at_session(
|
||||
&mut virtual_overseer,
|
||||
session_after_jump,
|
||||
4,
|
||||
vec![make_candidate_included_event(make_valid_candidate_receipt())],
|
||||
)
|
||||
.await;
|
||||
|
||||
let first_expected_session = session_on_startup + 1;
|
||||
for expected_idx in first_expected_session..=session_after_jump {
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
|
||||
_,
|
||||
RuntimeApiRequest::SessionInfo(session_index, tx),
|
||||
)) => {
|
||||
assert_eq!(session_index, expected_idx);
|
||||
let _ = tx.send(Ok(Some(test_state.session_info())));
|
||||
}
|
||||
);
|
||||
}
|
||||
test_state
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
@@ -896,7 +896,6 @@ where
|
||||
|
||||
let approval_voting_config = ApprovalVotingConfig {
|
||||
col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data,
|
||||
col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data,
|
||||
slot_duration_millis: slot_duration.as_millis() as u64,
|
||||
};
|
||||
|
||||
@@ -920,7 +919,6 @@ where
|
||||
|
||||
let dispute_coordinator_config = DisputeCoordinatorConfig {
|
||||
col_dispute_data: parachains_db::REAL_COLUMNS.col_dispute_coordinator_data,
|
||||
col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data,
|
||||
};
|
||||
|
||||
let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams {
|
||||
@@ -1512,7 +1510,6 @@ fn revert_chain_selection(db: Arc<dyn Database>, hash: Hash) -> sp_blockchain::R
|
||||
fn revert_approval_voting(db: Arc<dyn Database>, hash: Hash) -> sp_blockchain::Result<()> {
|
||||
let config = approval_voting_subsystem::Config {
|
||||
col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data,
|
||||
col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data,
|
||||
slot_duration_millis: Default::default(),
|
||||
};
|
||||
|
||||
|
||||
@@ -36,12 +36,18 @@ pub(crate) mod columns {
|
||||
|
||||
pub mod v2 {
|
||||
pub const NUM_COLUMNS: u32 = 6;
|
||||
|
||||
#[cfg(test)]
|
||||
pub const COL_SESSION_WINDOW_DATA: u32 = 5;
|
||||
}
|
||||
|
||||
pub mod v3 {
|
||||
pub const NUM_COLUMNS: u32 = 5;
|
||||
pub const COL_AVAILABILITY_DATA: u32 = 0;
|
||||
pub const COL_AVAILABILITY_META: u32 = 1;
|
||||
pub const COL_APPROVAL_DATA: u32 = 2;
|
||||
pub const COL_CHAIN_SELECTION_DATA: u32 = 3;
|
||||
pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4;
|
||||
pub const COL_SESSION_WINDOW_DATA: u32 = 5;
|
||||
|
||||
pub const ORDERED_COL: &[u32] =
|
||||
&[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA];
|
||||
@@ -62,19 +68,16 @@ pub struct ColumnsConfig {
|
||||
pub col_chain_selection_data: u32,
|
||||
/// The column used by dispute coordinator for data.
|
||||
pub col_dispute_coordinator_data: u32,
|
||||
/// The column used for session window data.
|
||||
pub col_session_window_data: u32,
|
||||
}
|
||||
|
||||
/// The real columns used by the parachains DB.
|
||||
#[cfg(any(test, feature = "full-node"))]
|
||||
pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig {
|
||||
col_availability_data: columns::v2::COL_AVAILABILITY_DATA,
|
||||
col_availability_meta: columns::v2::COL_AVAILABILITY_META,
|
||||
col_approval_data: columns::v2::COL_APPROVAL_DATA,
|
||||
col_chain_selection_data: columns::v2::COL_CHAIN_SELECTION_DATA,
|
||||
col_dispute_coordinator_data: columns::v2::COL_DISPUTE_COORDINATOR_DATA,
|
||||
col_session_window_data: columns::v2::COL_SESSION_WINDOW_DATA,
|
||||
col_availability_data: columns::v3::COL_AVAILABILITY_DATA,
|
||||
col_availability_meta: columns::v3::COL_AVAILABILITY_META,
|
||||
col_approval_data: columns::v3::COL_APPROVAL_DATA,
|
||||
col_chain_selection_data: columns::v3::COL_CHAIN_SELECTION_DATA,
|
||||
col_dispute_coordinator_data: columns::v3::COL_DISPUTE_COORDINATOR_DATA,
|
||||
};
|
||||
|
||||
#[derive(PartialEq)]
|
||||
@@ -122,20 +125,17 @@ pub fn open_creating_rocksdb(
|
||||
|
||||
let path = root.join("parachains").join("db");
|
||||
|
||||
let mut db_config = DatabaseConfig::with_columns(columns::v2::NUM_COLUMNS);
|
||||
let mut db_config = DatabaseConfig::with_columns(columns::v3::NUM_COLUMNS);
|
||||
|
||||
let _ = db_config
|
||||
.memory_budget
|
||||
.insert(columns::v2::COL_AVAILABILITY_DATA, cache_sizes.availability_data);
|
||||
.insert(columns::v3::COL_AVAILABILITY_DATA, cache_sizes.availability_data);
|
||||
let _ = db_config
|
||||
.memory_budget
|
||||
.insert(columns::v2::COL_AVAILABILITY_META, cache_sizes.availability_meta);
|
||||
.insert(columns::v3::COL_AVAILABILITY_META, cache_sizes.availability_meta);
|
||||
let _ = db_config
|
||||
.memory_budget
|
||||
.insert(columns::v2::COL_APPROVAL_DATA, cache_sizes.approval_data);
|
||||
let _ = db_config
|
||||
.memory_budget
|
||||
.insert(columns::v2::COL_SESSION_WINDOW_DATA, cache_sizes.session_data);
|
||||
.insert(columns::v3::COL_APPROVAL_DATA, cache_sizes.approval_data);
|
||||
|
||||
let path_str = path
|
||||
.to_str()
|
||||
@@ -146,7 +146,7 @@ pub fn open_creating_rocksdb(
|
||||
let db = Database::open(&db_config, &path_str)?;
|
||||
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(
|
||||
db,
|
||||
columns::v2::ORDERED_COL,
|
||||
columns::v3::ORDERED_COL,
|
||||
);
|
||||
|
||||
Ok(Arc::new(db))
|
||||
@@ -166,12 +166,12 @@ pub fn open_creating_paritydb(
|
||||
std::fs::create_dir_all(&path_str)?;
|
||||
upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB)?;
|
||||
|
||||
let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_2_config(&path))
|
||||
let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_3_config(&path))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;
|
||||
|
||||
let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new(
|
||||
db,
|
||||
columns::v2::ORDERED_COL,
|
||||
columns::v3::ORDERED_COL,
|
||||
);
|
||||
Ok(Arc::new(db))
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ type Version = u32;
|
||||
const VERSION_FILE_NAME: &'static str = "parachain_db_version";
|
||||
|
||||
/// Current db version.
|
||||
const CURRENT_VERSION: Version = 2;
|
||||
const CURRENT_VERSION: Version = 3;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
@@ -58,6 +58,8 @@ pub(crate) fn try_upgrade_db(db_path: &Path, db_kind: DatabaseKind) -> Result<()
|
||||
Some(0) => migrate_from_version_0_to_1(db_path, db_kind)?,
|
||||
// 1 -> 2 migration
|
||||
Some(1) => migrate_from_version_1_to_2(db_path, db_kind)?,
|
||||
// 2 -> 3 migration
|
||||
Some(2) => migrate_from_version_2_to_3(db_path, db_kind)?,
|
||||
// Already at current version, do nothing.
|
||||
Some(CURRENT_VERSION) => (),
|
||||
// This is an arbitrary future version, we don't handle it.
|
||||
@@ -127,6 +129,18 @@ fn migrate_from_version_1_to_2(path: &Path, db_kind: DatabaseKind) -> Result<(),
|
||||
})
|
||||
}
|
||||
|
||||
fn migrate_from_version_2_to_3(path: &Path, db_kind: DatabaseKind) -> Result<(), Error> {
|
||||
gum::info!(target: LOG_TARGET, "Migrating parachains db from version 2 to version 3 ...");
|
||||
match db_kind {
|
||||
DatabaseKind::ParityDB => paritydb_migrate_from_version_2_to_3(path),
|
||||
DatabaseKind::RocksDB => rocksdb_migrate_from_version_2_to_3(path),
|
||||
}
|
||||
.and_then(|result| {
|
||||
gum::info!(target: LOG_TARGET, "Migration complete! ");
|
||||
Ok(result)
|
||||
})
|
||||
}
|
||||
|
||||
/// Migration from version 0 to version 1:
|
||||
/// * the number of columns has changed from 3 to 5;
|
||||
fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
|
||||
@@ -160,6 +174,20 @@ fn rocksdb_migrate_from_version_1_to_2(path: &Path) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rocksdb_migrate_from_version_2_to_3(path: &Path) -> Result<(), Error> {
|
||||
use kvdb_rocksdb::{Database, DatabaseConfig};
|
||||
|
||||
let db_path = path
|
||||
.to_str()
|
||||
.ok_or_else(|| super::other_io_error("Invalid database path".into()))?;
|
||||
let db_cfg = DatabaseConfig::with_columns(super::columns::v2::NUM_COLUMNS);
|
||||
let mut db = Database::open(&db_cfg, db_path)?;
|
||||
|
||||
db.remove_last_column()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// This currently clears columns which had their configs altered between versions.
|
||||
// The columns to be changed are constrained by the `allowed_columns` vector.
|
||||
fn paritydb_fix_columns(
|
||||
@@ -221,7 +249,7 @@ fn paritydb_fix_columns(
|
||||
pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options {
|
||||
let mut options =
|
||||
parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
|
||||
for i in columns::v2::ORDERED_COL {
|
||||
for i in columns::v3::ORDERED_COL {
|
||||
options.columns[*i as usize].btree_index = true;
|
||||
}
|
||||
|
||||
@@ -232,7 +260,18 @@ pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options {
|
||||
pub(crate) fn paritydb_version_2_config(path: &Path) -> parity_db::Options {
|
||||
let mut options =
|
||||
parity_db::Options::with_columns(&path, super::columns::v2::NUM_COLUMNS as u8);
|
||||
for i in columns::v2::ORDERED_COL {
|
||||
for i in columns::v3::ORDERED_COL {
|
||||
options.columns[*i as usize].btree_index = true;
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
|
||||
/// Database configuration for version 3.
|
||||
pub(crate) fn paritydb_version_3_config(path: &Path) -> parity_db::Options {
|
||||
let mut options =
|
||||
parity_db::Options::with_columns(&path, super::columns::v3::NUM_COLUMNS as u8);
|
||||
for i in columns::v3::ORDERED_COL {
|
||||
options.columns[*i as usize].btree_index = true;
|
||||
}
|
||||
|
||||
@@ -244,8 +283,8 @@ pub(crate) fn paritydb_version_2_config(path: &Path) -> parity_db::Options {
|
||||
pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options {
|
||||
let mut options =
|
||||
parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
|
||||
options.columns[super::columns::v2::COL_AVAILABILITY_META as usize].btree_index = true;
|
||||
options.columns[super::columns::v2::COL_CHAIN_SELECTION_DATA as usize].btree_index = true;
|
||||
options.columns[super::columns::v3::COL_AVAILABILITY_META as usize].btree_index = true;
|
||||
options.columns[super::columns::v3::COL_CHAIN_SELECTION_DATA as usize].btree_index = true;
|
||||
|
||||
options
|
||||
}
|
||||
@@ -260,7 +299,7 @@ fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
|
||||
paritydb_fix_columns(
|
||||
path,
|
||||
paritydb_version_1_config(path),
|
||||
vec![super::columns::v2::COL_DISPUTE_COORDINATOR_DATA],
|
||||
vec![super::columns::v3::COL_DISPUTE_COORDINATOR_DATA],
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
@@ -278,9 +317,20 @@ fn paritydb_migrate_from_version_1_to_2(path: &Path) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Migration from version 2 to version 3:
|
||||
/// - drop the column used by `RollingSessionWindow`
|
||||
fn paritydb_migrate_from_version_2_to_3(path: &Path) -> Result<(), Error> {
|
||||
parity_db::Db::drop_last_column(&mut paritydb_version_2_config(path))
|
||||
.map_err(|e| other_io_error(format!("Error removing COL_SESSION_WINDOW_DATA {:?}", e)))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{columns::v2::*, *};
|
||||
use super::{
|
||||
columns::{v2::COL_SESSION_WINDOW_DATA, v3::*},
|
||||
*,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_paritydb_migrate_0_to_1() {
|
||||
@@ -375,7 +425,7 @@ mod tests {
|
||||
// We need to properly set db version for upgrade to work.
|
||||
fs::write(version_file_path(db_dir.path()), "1").expect("Failed to write DB version");
|
||||
{
|
||||
let db = DbAdapter::new(db, columns::v2::ORDERED_COL);
|
||||
let db = DbAdapter::new(db, columns::v3::ORDERED_COL);
|
||||
db.write(DBTransaction {
|
||||
ops: vec![DBOp::Insert {
|
||||
col: COL_DISPUTE_COORDINATOR_DATA,
|
||||
@@ -393,7 +443,7 @@ mod tests {
|
||||
|
||||
assert_eq!(db.num_columns(), super::columns::v2::NUM_COLUMNS);
|
||||
|
||||
let db = DbAdapter::new(db, columns::v2::ORDERED_COL);
|
||||
let db = DbAdapter::new(db, columns::v3::ORDERED_COL);
|
||||
|
||||
assert_eq!(
|
||||
db.get(COL_DISPUTE_COORDINATOR_DATA, b"1234").unwrap(),
|
||||
@@ -416,4 +466,59 @@ mod tests {
|
||||
Some("0xdeadb00b".as_bytes().to_vec())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_paritydb_migrate_2_to_3() {
|
||||
use parity_db::Db;
|
||||
|
||||
let db_dir = tempfile::tempdir().unwrap();
|
||||
let path = db_dir.path();
|
||||
let test_key = b"1337";
|
||||
|
||||
// We need to properly set db version for upgrade to work.
|
||||
fs::write(version_file_path(path), "2").expect("Failed to write DB version");
|
||||
|
||||
{
|
||||
let db = Db::open_or_create(&paritydb_version_2_config(&path)).unwrap();
|
||||
|
||||
// Write some dummy data
|
||||
db.commit(vec![(
|
||||
COL_SESSION_WINDOW_DATA as u8,
|
||||
test_key.to_vec(),
|
||||
Some(b"0xdeadb00b".to_vec()),
|
||||
)])
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(db.num_columns(), columns::v2::NUM_COLUMNS as u8);
|
||||
}
|
||||
|
||||
try_upgrade_db(&path, DatabaseKind::ParityDB).unwrap();
|
||||
|
||||
let db = Db::open(&paritydb_version_3_config(&path)).unwrap();
|
||||
|
||||
assert_eq!(db.num_columns(), columns::v3::NUM_COLUMNS as u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rocksdb_migrate_2_to_3() {
|
||||
use kvdb_rocksdb::{Database, DatabaseConfig};
|
||||
|
||||
let db_dir = tempfile::tempdir().unwrap();
|
||||
let db_path = db_dir.path().to_str().unwrap();
|
||||
let db_cfg = DatabaseConfig::with_columns(super::columns::v2::NUM_COLUMNS);
|
||||
{
|
||||
let db = Database::open(&db_cfg, db_path).unwrap();
|
||||
assert_eq!(db.num_columns(), super::columns::v2::NUM_COLUMNS as u32);
|
||||
}
|
||||
|
||||
// We need to properly set db version for upgrade to work.
|
||||
fs::write(version_file_path(db_dir.path()), "2").expect("Failed to write DB version");
|
||||
|
||||
try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB).unwrap();
|
||||
|
||||
let db_cfg = DatabaseConfig::with_columns(super::columns::v3::NUM_COLUMNS);
|
||||
let db = Database::open(&db_cfg, db_path).unwrap();
|
||||
|
||||
assert_eq!(db.num_columns(), super::columns::v3::NUM_COLUMNS);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,8 +65,6 @@ pub mod reexports {
|
||||
pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
|
||||
}
|
||||
|
||||
/// A rolling session window cache.
|
||||
pub mod rolling_session_window;
|
||||
/// Convenient and efficient runtime info access.
|
||||
pub mod runtime;
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user