Make rolling session more resilient in case of long finality stalls (#6106)

* Impl dynamic window size. Keep sessions for unfinalized chain

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* feedback

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Stretch also in contructor plus  tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* review feedback

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix approval-voting tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* grunting: dispute coordinator tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* add session window column

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* integrate approval vote and fix tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix rolling session tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Small refactor

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* WIP, tests failing

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Fix approval voting tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix dispute-coordinator tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* remove uneeded param

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fmt

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix loose ends

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* allow failure and tests for it

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix comment

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* comment fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* style fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* new col doesn't need to be ordered

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fmt and spellcheck

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* db persist tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Add v2 config and cols

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* DB upgrade WIP

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Fix comments

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* add todo

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* update to parity-db to "0.4.2"

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* migration complete

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* One session window size

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix merge damage

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix build errors

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fmt

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* comment fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix build

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* make error more explicit

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* add comment

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* refactor conflict merge

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* rename col_data

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* add doc comment

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix build

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* migration: move all cols to v2

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
Andrei Sandu
2022-11-08 19:16:31 +02:00
committed by GitHub
parent be4b5fad47
commit c261353380
15 changed files with 944 additions and 291 deletions
@@ -90,41 +90,45 @@ impl Backend for DbBackend {
match op {
BackendWriteOp::WriteStoredBlockRange(stored_block_range) => {
tx.put_vec(
self.config.col_data,
self.config.col_approval_data,
&STORED_BLOCKS_KEY,
stored_block_range.encode(),
);
},
BackendWriteOp::DeleteStoredBlockRange => {
tx.delete(self.config.col_data, &STORED_BLOCKS_KEY);
tx.delete(self.config.col_approval_data, &STORED_BLOCKS_KEY);
},
BackendWriteOp::WriteBlocksAtHeight(h, blocks) => {
tx.put_vec(self.config.col_data, &blocks_at_height_key(h), blocks.encode());
tx.put_vec(
self.config.col_approval_data,
&blocks_at_height_key(h),
blocks.encode(),
);
},
BackendWriteOp::DeleteBlocksAtHeight(h) => {
tx.delete(self.config.col_data, &blocks_at_height_key(h));
tx.delete(self.config.col_approval_data, &blocks_at_height_key(h));
},
BackendWriteOp::WriteBlockEntry(block_entry) => {
let block_entry: BlockEntry = block_entry.into();
tx.put_vec(
self.config.col_data,
self.config.col_approval_data,
&block_entry_key(&block_entry.block_hash),
block_entry.encode(),
);
},
BackendWriteOp::DeleteBlockEntry(hash) => {
tx.delete(self.config.col_data, &block_entry_key(&hash));
tx.delete(self.config.col_approval_data, &block_entry_key(&hash));
},
BackendWriteOp::WriteCandidateEntry(candidate_entry) => {
let candidate_entry: CandidateEntry = candidate_entry.into();
tx.put_vec(
self.config.col_data,
self.config.col_approval_data,
&candidate_entry_key(&candidate_entry.candidate.hash()),
candidate_entry.encode(),
);
},
BackendWriteOp::DeleteCandidateEntry(candidate_hash) => {
tx.delete(self.config.col_data, &candidate_entry_key(&candidate_hash));
tx.delete(self.config.col_approval_data, &candidate_entry_key(&candidate_hash));
},
}
}
@@ -149,7 +153,9 @@ pub type Bitfield = BitVec<u8, BitOrderLsb0>;
#[derive(Debug, Clone, Copy)]
pub struct Config {
/// The column family in the database where data is stored.
pub col_data: u32,
pub col_approval_data: u32,
/// The column of the database where rolling session window data is stored.
pub col_session_data: u32,
}
/// Details pertaining to our assignment on a block.
@@ -243,10 +249,10 @@ pub type Result<T> = std::result::Result<T, Error>;
pub(crate) fn load_decode<D: Decode>(
store: &dyn Database,
col_data: u32,
col_approval_data: u32,
key: &[u8],
) -> Result<Option<D>> {
match store.get(col_data, key)? {
match store.get(col_approval_data, key)? {
None => Ok(None),
Some(raw) => D::decode(&mut &raw[..]).map(Some).map_err(Into::into),
}
@@ -303,7 +309,7 @@ pub fn load_stored_blocks(
store: &dyn Database,
config: &Config,
) -> SubsystemResult<Option<StoredBlockRange>> {
load_decode(store, config.col_data, STORED_BLOCKS_KEY)
load_decode(store, config.col_approval_data, STORED_BLOCKS_KEY)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
@@ -313,7 +319,7 @@ pub fn load_blocks_at_height(
config: &Config,
block_number: &BlockNumber,
) -> SubsystemResult<Vec<Hash>> {
load_decode(store, config.col_data, &blocks_at_height_key(*block_number))
load_decode(store, config.col_approval_data, &blocks_at_height_key(*block_number))
.map(|x| x.unwrap_or_default())
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
@@ -324,7 +330,7 @@ pub fn load_block_entry(
config: &Config,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>> {
load_decode(store, config.col_data, &block_entry_key(block_hash))
load_decode(store, config.col_approval_data, &block_entry_key(block_hash))
.map(|u: Option<BlockEntry>| u.map(|v| v.into()))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
@@ -335,7 +341,7 @@ pub fn load_candidate_entry(
config: &Config,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>> {
load_decode(store, config.col_data, &candidate_entry_key(candidate_hash))
load_decode(store, config.col_approval_data, &candidate_entry_key(candidate_hash))
.map(|u: Option<CandidateEntry>| u.map(|v| v.into()))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
@@ -28,9 +28,12 @@ use std::{collections::HashMap, sync::Arc};
use ::test_helpers::{dummy_candidate_receipt, dummy_candidate_receipt_bad_sig, dummy_hash};
const DATA_COL: u32 = 0;
const NUM_COLUMNS: u32 = 1;
const SESSION_DATA_COL: u32 = 1;
const TEST_CONFIG: Config = Config { col_data: DATA_COL };
const NUM_COLUMNS: u32 = 2;
const TEST_CONFIG: Config =
Config { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
fn make_db() -> (DbBackend, Arc<dyn Database>) {
let db = kvdb_memorydb::create(NUM_COLUMNS);
@@ -632,14 +632,15 @@ pub(crate) mod tests {
pub(crate) use sp_runtime::{Digest, DigestItem};
use std::{pin::Pin, sync::Arc};
use crate::{
approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry, APPROVAL_SESSIONS,
};
use crate::{approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry};
const DATA_COL: u32 = 0;
const NUM_COLUMNS: u32 = 1;
const SESSION_DATA_COL: u32 = 1;
const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_data: DATA_COL };
const NUM_COLUMNS: u32 = 2;
const TEST_CONFIG: DatabaseConfig =
DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
#[derive(Default)]
struct MockClock;
@@ -654,22 +655,23 @@ pub(crate) mod tests {
}
fn blank_state() -> State {
let db = kvdb_memorydb::create(NUM_COLUMNS);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
let db: Arc<dyn Database> = Arc::new(db);
State {
session_window: None,
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
clock: Box::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria),
db,
db_config: TEST_CONFIG,
}
}
fn single_session_state(index: SessionIndex, info: SessionInfo) -> State {
State {
session_window: Some(RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
index,
vec![info],
)),
session_window: Some(RollingSessionWindow::with_session_info(index, vec![info])),
..blank_state()
}
}
@@ -782,11 +784,8 @@ pub(crate) mod tests {
.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
.collect::<Vec<_>>();
let session_window = RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
);
let session_window =
RollingSessionWindow::with_session_info(session, vec![session_info]);
let header = header.clone();
Box::pin(async move {
@@ -891,11 +890,8 @@ pub(crate) mod tests {
.collect::<Vec<_>>();
let test_fut = {
let session_window = RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
);
let session_window =
RollingSessionWindow::with_session_info(session, vec![session_info]);
let header = header.clone();
Box::pin(async move {
@@ -1089,11 +1085,8 @@ pub(crate) mod tests {
.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
.collect::<Vec<_>>();
let session_window = Some(RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
));
let session_window =
Some(RollingSessionWindow::with_session_info(session, vec![session_info]));
let header = header.clone();
Box::pin(async move {
@@ -1304,38 +1297,6 @@ pub(crate) mod tests {
}
);
// Caching of sesssions needs sessoion of first unfinalied block.
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, header.number);
let _ = s_tx.send(Ok(Some(header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, header.hash());
let _ = s_tx.send(Ok(session));
}
);
// determine_new_blocks exits early as the parent_hash is in the DB
assert_matches!(
+29 -10
View File
@@ -44,8 +44,7 @@ use polkadot_node_subsystem_util::{
database::Database,
metrics::{self, prometheus},
rolling_session_window::{
new_session_window_size, RollingSessionWindow, SessionWindowSize, SessionWindowUpdate,
SessionsUnavailable,
DatabaseParams, RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable,
},
TimeoutExt,
};
@@ -97,8 +96,6 @@ use crate::{
#[cfg(test)]
mod tests;
pub const APPROVAL_SESSIONS: SessionWindowSize = new_session_window_size!(6);
const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
/// How long are we willing to wait for approval signatures?
///
@@ -118,7 +115,9 @@ const LOG_TARGET: &str = "parachain::approval-voting";
#[derive(Debug, Clone)]
pub struct Config {
/// The column family in the DB where approval-voting data is stored.
pub col_data: u32,
pub col_approval_data: u32,
/// The of the DB where rolling session info is stored.
pub col_session_data: u32,
/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
/// divisible by 500.
pub slot_duration_millis: u64,
@@ -358,7 +357,10 @@ impl ApprovalVotingSubsystem {
keystore,
slot_duration_millis: config.slot_duration_millis,
db,
db_config: DatabaseConfig { col_data: config.col_data },
db_config: DatabaseConfig {
col_approval_data: config.col_approval_data,
col_session_data: config.col_session_data,
},
mode: Mode::Syncing(sync_oracle),
metrics,
}
@@ -367,7 +369,10 @@ impl ApprovalVotingSubsystem {
/// Revert to the block corresponding to the specified `hash`.
/// The operation is not allowed for blocks older than the last finalized one.
pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> {
let config = approval_db::v1::Config { col_data: self.db_config.col_data };
let config = approval_db::v1::Config {
col_approval_data: self.db_config.col_approval_data,
col_session_data: self.db_config.col_session_data,
};
let mut backend = approval_db::v1::DbBackend::new(self.db.clone(), config);
let mut overlay = OverlayedBackend::new(&backend);
@@ -634,6 +639,9 @@ struct State {
slot_duration_millis: u64,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
// Require for `RollingSessionWindow`.
db_config: DatabaseConfig,
db: Arc<dyn Database>,
}
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
@@ -655,8 +663,17 @@ impl State {
match session_window {
None => {
let sender = ctx.sender().clone();
self.session_window =
Some(RollingSessionWindow::new(sender, APPROVAL_SESSIONS, head).await?);
self.session_window = Some(
RollingSessionWindow::new(
sender,
head,
DatabaseParams {
db: self.db.clone(),
db_column: self.db_config.col_session_data,
},
)
.await?,
);
Ok(None)
},
Some(mut session_window) => {
@@ -751,7 +768,7 @@ async fn run<B, Context>(
where
B: Backend,
{
if let Err(err) = db_sanity_check(subsystem.db, subsystem.db_config) {
if let Err(err) = db_sanity_check(subsystem.db.clone(), subsystem.db_config.clone()) {
gum::warn!(target: LOG_TARGET, ?err, "Could not run approval vote DB sanity check");
}
@@ -761,6 +778,8 @@ where
slot_duration_millis: subsystem.slot_duration_millis,
clock,
assignment_criteria,
db_config: subsystem.db_config,
db: subsystem.db,
};
let mut wakeups = Wakeups::default();
+40 -34
View File
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::tests::test_constants::TEST_CONFIG;
use super::*;
use polkadot_node_primitives::{
approval::{
@@ -111,9 +113,12 @@ fn make_sync_oracle(val: bool) -> (Box<dyn SyncOracle + Send>, TestSyncOracleHan
pub mod test_constants {
use crate::approval_db::v1::Config as DatabaseConfig;
const DATA_COL: u32 = 0;
pub(crate) const NUM_COLUMNS: u32 = 1;
const SESSION_DATA_COL: u32 = 1;
pub(crate) const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_data: DATA_COL };
pub(crate) const NUM_COLUMNS: u32 = 2;
pub(crate) const TEST_CONFIG: DatabaseConfig =
DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL };
}
struct MockSupportsParachains;
@@ -487,8 +492,9 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
context,
ApprovalVotingSubsystem::with_config(
Config {
col_data: test_constants::TEST_CONFIG.col_data,
col_approval_data: test_constants::TEST_CONFIG.col_approval_data,
slot_duration_millis: SLOT_DURATION_MILLIS,
col_session_data: TEST_CONFIG.col_session_data,
},
Arc::new(db),
Arc::new(keystore),
@@ -810,38 +816,38 @@ async fn import_block(
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(number));
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, number);
let _ = s_tx.send(Ok(Some(hashes[number as usize].0)));
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, hashes[number as usize].0);
let _ = s_tx.send(Ok(number.into()));
}
);
if !fork {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(number));
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, number);
let _ = s_tx.send(Ok(Some(hashes[number as usize].0)));
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, hashes[number as usize].0);
let _ = s_tx.send(Ok(number.into()));
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::RuntimeApi(