Approval voting overlay db (#3366)

* node/approval-voting: Introduce Backend trait and Overlaybackend

This commit introduces a Backend trait and attempts to move away
from the Action model via an OverlayBackend as in the ChainSelection
subsystem.

* node/approval-voting: Add WriteOps for StoredBlockRange and BlocksAtHeight

* node/approval-voting: Add load_all_blocks to overlay

* node/approval-voting: Get all module tests to pass.

This commit modifies all tests to ensure tests are passing.

* node/approval-voting: Address oversights in the previous commit

This commit addresses some oversights in the prior commit.

1. Inner errors in backend.write were swallowed
2. One-off write functions removed to avoid useless abstraction
3. Touch-ups in general

* node/approval-voting: Move from TestDB to dyn KeyValueDB

This commit removes the TestDB from tests.rs and replaces it with
an in-memory kvdb.

* node/approval-voting: Address feedback

* node/approval-voting: Add license to ops.rs

* node/approval-voting: Address second-pass feedback

* Add TODO

* node/approval-voting: Bump spec_version

* node/approval-voting: Address final comments.
This commit is contained in:
Lldenaurois
2021-07-08 11:00:57 -04:00
committed by GitHub
parent de445adb6d
commit 7313e485d0
12 changed files with 1445 additions and 1145 deletions
@@ -17,6 +17,7 @@
//! Version 1 of the DB schema.
use kvdb::{DBTransaction, KeyValueDB};
use polkadot_node_subsystem::{SubsystemResult, SubsystemError};
use polkadot_node_primitives::approval::{DelayTranche, AssignmentCert};
use polkadot_primitives::v1::{
ValidatorIndex, GroupIndex, CandidateReceipt, SessionIndex, CoreIndex,
@@ -25,22 +26,141 @@ use polkadot_primitives::v1::{
use sp_consensus_slots::Slot;
use parity_scale_codec::{Encode, Decode};
use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry;
use std::collections::BTreeMap;
use std::sync::Arc;
use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0};
use crate::backend::{Backend, BackendWriteOp};
use crate::persisted_entries;
const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks";
#[cfg(test)]
pub mod tests;
/// DbBackend is a concrete implementation of the higher-level Backend trait
pub struct DbBackend {
inner: Arc<dyn KeyValueDB>,
config: Config,
}
impl DbBackend {
/// Create a new [`DbBackend`] with the supplied key-value store and
/// config.
pub fn new(db: Arc<dyn KeyValueDB>, config: Config) -> Self {
DbBackend {
inner: db,
config,
}
}
}
impl Backend for DbBackend {
fn load_block_entry(
&self,
block_hash: &Hash,
) -> SubsystemResult<Option<persisted_entries::BlockEntry>> {
load_block_entry(&*self.inner, &self.config, block_hash)
.map(|e| e.map(Into::into))
}
fn load_candidate_entry(
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<persisted_entries::CandidateEntry>> {
load_candidate_entry(&*self.inner, &self.config, candidate_hash)
.map(|e| e.map(Into::into))
}
fn load_blocks_at_height(
&self,
block_height: &BlockNumber
) -> SubsystemResult<Vec<Hash>> {
load_blocks_at_height(&*self.inner, &self.config, block_height)
}
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>> {
load_all_blocks(&*self.inner, &self.config)
}
fn load_stored_blocks(&self) -> SubsystemResult<Option<StoredBlockRange>> {
load_stored_blocks(&*self.inner, &self.config)
}
/// Atomically write the list of operations, with later operations taking precedence over prior.
fn write<I>(&mut self, ops: I) -> SubsystemResult<()>
where I: IntoIterator<Item = BackendWriteOp>
{
let mut tx = DBTransaction::new();
for op in ops {
match op {
BackendWriteOp::WriteStoredBlockRange(stored_block_range) => {
tx.put_vec(
self.config.col_data,
&STORED_BLOCKS_KEY,
stored_block_range.encode(),
);
}
BackendWriteOp::WriteBlocksAtHeight(h, blocks) => {
tx.put_vec(
self.config.col_data,
&blocks_at_height_key(h),
blocks.encode(),
);
}
BackendWriteOp::DeleteBlocksAtHeight(h) => {
tx.delete(
self.config.col_data,
&blocks_at_height_key(h),
);
}
BackendWriteOp::WriteBlockEntry(block_entry) => {
let block_entry: BlockEntry = block_entry.into();
tx.put_vec(
self.config.col_data,
&block_entry_key(&block_entry.block_hash),
block_entry.encode(),
);
}
BackendWriteOp::DeleteBlockEntry(hash) => {
tx.delete(
self.config.col_data,
&block_entry_key(&hash),
);
}
BackendWriteOp::WriteCandidateEntry(candidate_entry) => {
let candidate_entry: CandidateEntry = candidate_entry.into();
tx.put_vec(
self.config.col_data,
&candidate_entry_key(&candidate_entry.candidate.hash()),
candidate_entry.encode(),
);
}
BackendWriteOp::DeleteCandidateEntry(candidate_hash) => {
tx.delete(
self.config.col_data,
&candidate_entry_key(&candidate_hash),
);
}
}
}
self.inner.write(tx).map_err(|e| e.into())
}
}
/// A range from earliest..last block number stored within the DB.
#[derive(Encode, Decode, Debug, Clone, PartialEq)]
pub struct StoredBlockRange(pub BlockNumber, pub BlockNumber);
// slot_duration * 2 + DelayTranche gives the number of delay tranches since the
// unix epoch.
#[derive(Encode, Decode, Clone, Copy, Debug, PartialEq)]
pub struct Tick(u64);
/// Convenience type definition
pub type Bitfield = BitVec<BitOrderLsb0, u8>;
const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks";
/// The database config.
#[derive(Debug, Clone, Copy)]
pub struct Config {
@@ -113,10 +233,6 @@ pub struct BlockEntry {
pub children: Vec<Hash>,
}
/// A range from earliest..last block number stored within the DB.
#[derive(Encode, Decode, Debug, Clone, PartialEq)]
pub struct StoredBlockRange(BlockNumber, BlockNumber);
impl From<crate::Tick> for Tick {
fn from(tick: crate::Tick) -> Tick {
Tick(tick)
@@ -141,163 +257,7 @@ impl std::error::Error for Error {}
/// Result alias for DB errors.
pub type Result<T> = std::result::Result<T, Error>;
/// Canonicalize some particular block, pruning everything before it and
/// pruning any competing branches at the same height.
pub(crate) fn canonicalize(
store: &dyn KeyValueDB,
config: &Config,
canon_number: BlockNumber,
canon_hash: Hash,
)
-> Result<()>
{
let range = match load_stored_blocks(store, config)? {
None => return Ok(()),
Some(range) => if range.0 >= canon_number {
return Ok(())
} else {
range
},
};
let mut transaction = DBTransaction::new();
// Storing all candidates in memory is potentially heavy, but should be fine
// as long as finality doesn't stall for a long while. We could optimize this
// by keeping only the metadata about which blocks reference each candidate.
let mut visited_candidates = HashMap::new();
// All the block heights we visited but didn't necessarily delete everything from.
let mut visited_heights = HashMap::new();
let visit_and_remove_block_entry = |
block_hash: Hash,
transaction: &mut DBTransaction,
visited_candidates: &mut HashMap<CandidateHash, CandidateEntry>,
| -> Result<Vec<Hash>> {
let block_entry = match load_block_entry(store, config, &block_hash)? {
None => return Ok(Vec::new()),
Some(b) => b,
};
transaction.delete(config.col_data, &block_entry_key(&block_hash)[..]);
for &(_, ref candidate_hash) in &block_entry.candidates {
let candidate = match visited_candidates.entry(*candidate_hash) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => {
e.insert(match load_candidate_entry(store, config, candidate_hash)? {
None => continue, // Should not happen except for corrupt DB
Some(c) => c,
})
}
};
candidate.block_assignments.remove(&block_hash);
}
Ok(block_entry.children)
};
// First visit everything before the height.
for i in range.0..canon_number {
let at_height = load_blocks_at_height(store, config, i)?;
transaction.delete(config.col_data, &blocks_at_height_key(i)[..]);
for b in at_height {
let _ = visit_and_remove_block_entry(
b,
&mut transaction,
&mut visited_candidates,
)?;
}
}
// Then visit everything at the height.
let pruned_branches = {
let at_height = load_blocks_at_height(store, config, canon_number)?;
transaction.delete(config.col_data, &blocks_at_height_key(canon_number));
// Note that while there may be branches descending from blocks at earlier heights,
// we have already covered them by removing everything at earlier heights.
let mut pruned_branches = Vec::new();
for b in at_height {
let children = visit_and_remove_block_entry(
b,
&mut transaction,
&mut visited_candidates,
)?;
if b != canon_hash {
pruned_branches.extend(children);
}
}
pruned_branches
};
// Follow all children of non-canonicalized blocks.
{
let mut frontier: Vec<_> = pruned_branches.into_iter().map(|h| (canon_number + 1, h)).collect();
while let Some((height, next_child)) = frontier.pop() {
let children = visit_and_remove_block_entry(
next_child,
&mut transaction,
&mut visited_candidates,
)?;
// extend the frontier of branches to include the given height.
frontier.extend(children.into_iter().map(|h| (height + 1, h)));
// visit the at-height key for this deleted block's height.
let at_height = match visited_heights.entry(height) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => e.insert(load_blocks_at_height(store, config, height)?),
};
if let Some(i) = at_height.iter().position(|x| x == &next_child) {
at_height.remove(i);
}
}
}
// Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`.
for (candidate_hash, candidate) in visited_candidates {
if candidate.block_assignments.is_empty() {
transaction.delete(config.col_data, &candidate_entry_key(&candidate_hash)[..]);
} else {
transaction.put_vec(
config.col_data,
&candidate_entry_key(&candidate_hash)[..],
candidate.encode(),
);
}
}
// Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`.
for (h, at) in visited_heights {
if at.is_empty() {
transaction.delete(config.col_data, &blocks_at_height_key(h)[..]);
} else {
transaction.put_vec(config.col_data, &blocks_at_height_key(h), at.encode());
}
}
// due to the fork pruning, this range actually might go too far above where our actual highest block is,
// if a relatively short fork is canonicalized.
let new_range = StoredBlockRange(
canon_number + 1,
std::cmp::max(range.1, canon_number + 2),
).encode();
transaction.put_vec(config.col_data, &STORED_BLOCKS_KEY[..], new_range);
// Update the values on-disk.
store.write(transaction).map_err(Into::into)
}
fn load_decode<D: Decode>(store: &dyn KeyValueDB, col_data: u32, key: &[u8])
-> Result<Option<D>>
pub(crate) fn load_decode<D: Decode>(store: &dyn KeyValueDB, col_data: u32, key: &[u8]) -> Result<Option<D>>
{
match store.get(col_data, key)? {
None => Ok(None),
@@ -307,282 +267,8 @@ fn load_decode<D: Decode>(store: &dyn KeyValueDB, col_data: u32, key: &[u8])
}
}
/// Information about a new candidate necessary to instantiate the requisite
/// candidate and approval entries.
#[derive(Clone)]
pub(crate) struct NewCandidateInfo {
pub candidate: CandidateReceipt,
pub backing_group: GroupIndex,
pub our_assignment: Option<OurAssignment>,
}
/// Record a new block entry.
///
/// This will update the blocks-at-height mapping, the stored block range, if necessary,
/// and add block and candidate entries. It will also add approval entries to existing
/// candidate entries and add this as a child of any block entry corresponding to the
/// parent hash.
///
/// Has no effect if there is already an entry for the block or `candidate_info` returns
/// `None` for any of the candidates referenced by the block entry. In these cases,
/// no information about new candidates will be referred to by this function.
pub(crate) fn add_block_entry(
store: &dyn KeyValueDB,
config: &Config,
entry: BlockEntry,
n_validators: usize,
candidate_info: impl Fn(&CandidateHash) -> Option<NewCandidateInfo>,
) -> Result<Vec<(CandidateHash, CandidateEntry)>> {
let mut transaction = DBTransaction::new();
let session = entry.session;
let parent_hash = entry.parent_hash;
let number = entry.block_number;
// Update the stored block range.
{
let new_range = match load_stored_blocks(store, config)? {
None => Some(StoredBlockRange(number, number + 1)),
Some(range) => if range.1 <= number {
Some(StoredBlockRange(range.0, number + 1))
} else {
None
}
};
new_range.map(|n| transaction.put_vec(config.col_data, &STORED_BLOCKS_KEY[..], n.encode()))
};
// Update the blocks at height meta key.
{
let mut blocks_at_height = load_blocks_at_height(store, config, number)?;
if blocks_at_height.contains(&entry.block_hash) {
// seems we already have a block entry for this block. nothing to do here.
return Ok(Vec::new())
}
blocks_at_height.push(entry.block_hash);
transaction.put_vec(config.col_data, &blocks_at_height_key(number)[..], blocks_at_height.encode())
};
let mut candidate_entries = Vec::with_capacity(entry.candidates.len());
// read and write all updated entries.
{
for &(_, ref candidate_hash) in &entry.candidates {
let NewCandidateInfo {
candidate,
backing_group,
our_assignment,
} = match candidate_info(candidate_hash) {
None => return Ok(Vec::new()),
Some(info) => info,
};
let mut candidate_entry = load_candidate_entry(store, config, &candidate_hash)?
.unwrap_or_else(move || CandidateEntry {
candidate,
session,
block_assignments: BTreeMap::new(),
approvals: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators],
});
candidate_entry.block_assignments.insert(
entry.block_hash,
ApprovalEntry {
tranches: Vec::new(),
backing_group,
our_assignment,
our_approval_sig: None,
assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators],
approved: false,
}
);
transaction.put_vec(
config.col_data,
&candidate_entry_key(&candidate_hash)[..],
candidate_entry.encode(),
);
candidate_entries.push((*candidate_hash, candidate_entry));
}
};
// Update the child index for the parent.
load_block_entry(store, config, &parent_hash)?.map(|mut e| {
e.children.push(entry.block_hash);
transaction.put_vec(config.col_data, &block_entry_key(&parent_hash)[..], e.encode())
});
// Put the new block entry in.
transaction.put_vec(config.col_data, &block_entry_key(&entry.block_hash)[..], entry.encode());
store.write(transaction)?;
Ok(candidate_entries)
}
/// Forcibly approve all candidates included at up to the given relay-chain height in the indicated
/// chain.
///
/// Returns a list of block hashes that were not approved and are now.
pub fn force_approve(
store: &dyn KeyValueDB,
db_config: Config,
chain_head: Hash,
up_to: BlockNumber,
) -> Result<Vec<Hash>> {
enum State {
WalkTo,
Approving,
}
let mut approved_hashes = Vec::new();
let mut cur_hash = chain_head;
let mut state = State::WalkTo;
let mut tx = Transaction::new(db_config);
// iterate back to the `up_to` block, and then iterate backwards until all blocks
// are updated.
while let Some(mut entry) = load_block_entry(store, &db_config, &cur_hash)? {
if entry.block_number <= up_to {
state = State::Approving;
}
cur_hash = entry.parent_hash;
match state {
State::WalkTo => {},
State::Approving => {
let is_approved = entry.approved_bitfield.count_ones()
== entry.approved_bitfield.len();
if !is_approved {
entry.approved_bitfield.iter_mut().for_each(|mut b| *b = true);
approved_hashes.push(entry.block_hash);
tx.put_block_entry(entry);
}
}
}
}
tx.write(store)?;
Ok(approved_hashes)
}
/// Return all blocks which have entries in the DB, ascending, by height.
pub(crate) fn load_all_blocks(store: &dyn KeyValueDB, config: &Config) -> Result<Vec<Hash>> {
let stored_blocks = load_stored_blocks(store, config)?;
let mut hashes = Vec::new();
for height in stored_blocks.into_iter().flat_map(|s| s.0..s.1) {
hashes.extend(load_blocks_at_height(store, config, height)?);
}
Ok(hashes)
}
// An atomic transaction of multiple candidate or block entries.
#[must_use = "Transactions do nothing unless written to a DB"]
pub struct Transaction {
block_entries: HashMap<Hash, BlockEntry>,
candidate_entries: HashMap<CandidateHash, CandidateEntry>,
config: Config,
}
impl Transaction {
pub(crate) fn new(config: Config) -> Self {
Transaction {
block_entries: HashMap::default(),
candidate_entries: HashMap::default(),
config,
}
}
/// Put a block entry in the transaction, overwriting any other with the
/// same hash.
pub(crate) fn put_block_entry(&mut self, entry: BlockEntry) {
let hash = entry.block_hash;
let _ = self.block_entries.insert(hash, entry);
}
/// Put a candidate entry in the transaction, overwriting any other with the
/// same hash.
pub(crate) fn put_candidate_entry(&mut self, hash: CandidateHash, entry: CandidateEntry) {
let _ = self.candidate_entries.insert(hash, entry);
}
/// Returns true if the transaction contains no actions
pub(crate) fn is_empty(&self) -> bool {
self.block_entries.is_empty() && self.candidate_entries.is_empty()
}
/// Write the contents of the transaction, atomically, to the DB.
pub(crate) fn write(self, db: &dyn KeyValueDB) -> Result<()> {
if self.is_empty() {
return Ok(())
}
let mut db_transaction = DBTransaction::new();
for (hash, entry) in self.block_entries {
let k = block_entry_key(&hash);
let v = entry.encode();
db_transaction.put_vec(self.config.col_data, &k, v);
}
for (hash, entry) in self.candidate_entries {
let k = candidate_entry_key(&hash);
let v = entry.encode();
db_transaction.put_vec(self.config.col_data, &k, v);
}
db.write(db_transaction).map_err(Into::into)
}
}
/// Load the stored-blocks key from the state.
fn load_stored_blocks(store: &dyn KeyValueDB, config: &Config)
-> Result<Option<StoredBlockRange>>
{
load_decode(store, config.col_data, STORED_BLOCKS_KEY)
}
/// Load a blocks-at-height entry for a given block number.
pub(crate) fn load_blocks_at_height(
store: &dyn KeyValueDB,
config: &Config,
block_number: BlockNumber,
)
-> Result<Vec<Hash>> {
load_decode(store, config.col_data, &blocks_at_height_key(block_number))
.map(|x| x.unwrap_or_default())
}
/// Load a block entry from the aux store.
pub(crate) fn load_block_entry(store: &dyn KeyValueDB, config: &Config, block_hash: &Hash)
-> Result<Option<BlockEntry>>
{
load_decode(store, config.col_data, &block_entry_key(block_hash))
}
/// Load a candidate entry from the aux store.
pub(crate) fn load_candidate_entry(
store: &dyn KeyValueDB,
config: &Config,
candidate_hash: &CandidateHash,
)
-> Result<Option<CandidateEntry>>
{
load_decode(store, config.col_data, &candidate_entry_key(candidate_hash))
}
/// The key a given block entry is stored under.
fn block_entry_key(block_hash: &Hash) -> [u8; 46] {
pub(crate) fn block_entry_key(block_hash: &Hash) -> [u8; 46] {
const BLOCK_ENTRY_PREFIX: [u8; 14] = *b"Approvals_blck";
let mut key = [0u8; 14 + 32];
@@ -593,7 +279,7 @@ fn block_entry_key(block_hash: &Hash) -> [u8; 46] {
}
/// The key a given candidate entry is stored under.
fn candidate_entry_key(candidate_hash: &CandidateHash) -> [u8; 46] {
pub(crate) fn candidate_entry_key(candidate_hash: &CandidateHash) -> [u8; 46] {
const CANDIDATE_ENTRY_PREFIX: [u8; 14] = *b"Approvals_cand";
let mut key = [0u8; 14 + 32];
@@ -604,7 +290,7 @@ fn candidate_entry_key(candidate_hash: &CandidateHash) -> [u8; 46] {
}
/// The key a set of block hashes corresponding to a block number is stored under.
fn blocks_at_height_key(block_number: BlockNumber) -> [u8; 16] {
pub(crate) fn blocks_at_height_key(block_number: BlockNumber) -> [u8; 16] {
const BLOCKS_AT_HEIGHT_PREFIX: [u8; 12] = *b"Approvals_at";
let mut key = [0u8; 12 + 4];
@@ -613,3 +299,59 @@ fn blocks_at_height_key(block_number: BlockNumber) -> [u8; 16] {
key
}
/// Return all blocks which have entries in the DB, ascending, by height.
pub fn load_all_blocks(store: &dyn KeyValueDB, config: &Config) -> SubsystemResult<Vec<Hash>> {
let mut hashes = Vec::new();
if let Some(stored_blocks) = load_stored_blocks(store, config)? {
for height in stored_blocks.0..stored_blocks.1 {
let blocks = load_blocks_at_height(store, config, &height)?;
hashes.extend(blocks);
}
}
Ok(hashes)
}
/// Load the stored-blocks key from the state.
pub fn load_stored_blocks(
store: &dyn KeyValueDB,
config: &Config,
) -> SubsystemResult<Option<StoredBlockRange>> {
load_decode(store, config.col_data, STORED_BLOCKS_KEY)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
/// Load a blocks-at-height entry for a given block number.
pub fn load_blocks_at_height(
store: &dyn KeyValueDB,
config: &Config,
block_number: &BlockNumber,
) -> SubsystemResult<Vec<Hash>> {
load_decode(store, config.col_data, &blocks_at_height_key(*block_number))
.map(|x| x.unwrap_or_default())
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
/// Load a block entry from the aux store.
pub fn load_block_entry(
store: &dyn KeyValueDB,
config: &Config,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>> {
load_decode(store, config.col_data, &block_entry_key(block_hash))
.map(|u: Option<BlockEntry>| u.map(|v| v.into()))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
/// Load a candidate entry from the aux store.
pub fn load_candidate_entry(
store: &dyn KeyValueDB,
config: &Config,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>> {
load_decode(store, config.col_data, &candidate_entry_key(candidate_hash))
.map(|u: Option<CandidateEntry>| u.map(|v| v.into()))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
@@ -17,7 +17,13 @@
//! Tests for the aux-schema of approval voting.
use super::*;
use std::sync::Arc;
use std::collections::HashMap;
use polkadot_primitives::v1::Id as ParaId;
use kvdb::KeyValueDB;
use super::{DbBackend, StoredBlockRange};
use crate::backend::{Backend, OverlayedBackend};
use crate::ops::{NewCandidateInfo, add_block_entry, force_approve, canonicalize};
const DATA_COL: u32 = 0;
const NUM_COLUMNS: u32 = 1;
@@ -26,36 +32,9 @@ const TEST_CONFIG: Config = Config {
col_data: DATA_COL,
};
pub(crate) fn write_stored_blocks(tx: &mut DBTransaction, range: StoredBlockRange) {
tx.put_vec(
DATA_COL,
&STORED_BLOCKS_KEY[..],
range.encode(),
);
}
pub(crate) fn write_blocks_at_height(tx: &mut DBTransaction, height: BlockNumber, blocks: &[Hash]) {
tx.put_vec(
DATA_COL,
&blocks_at_height_key(height)[..],
blocks.encode(),
);
}
pub(crate) fn write_block_entry(tx: &mut DBTransaction, block_hash: &Hash, entry: &BlockEntry) {
tx.put_vec(
DATA_COL,
&block_entry_key(block_hash)[..],
entry.encode(),
);
}
pub(crate) fn write_candidate_entry(tx: &mut DBTransaction, candidate_hash: &CandidateHash, entry: &CandidateEntry) {
tx.put_vec(
DATA_COL,
&candidate_entry_key(candidate_hash)[..],
entry.encode(),
);
fn make_db() -> (DbBackend, Arc<dyn KeyValueDB>) {
let db_writer: Arc<dyn KeyValueDB> = Arc::new(kvdb_memorydb::create(NUM_COLUMNS));
(DbBackend::new(db_writer.clone(), TEST_CONFIG), db_writer)
}
fn make_bitvec(len: usize) -> BitVec<BitOrderLsb0, u8> {
@@ -92,11 +71,11 @@ fn make_candidate(para_id: ParaId, relay_parent: Hash) -> CandidateReceipt {
#[test]
fn read_write() {
let store = kvdb_memorydb::create(NUM_COLUMNS);
let (mut db, store) = make_db();
let hash_a = Hash::repeat_byte(1);
let hash_b = Hash::repeat_byte(2);
let candidate_hash = CandidateHash(Hash::repeat_byte(3));
let candidate_hash = CandidateReceipt::<Hash>::default().hash();
let range = StoredBlockRange(10, 20);
let at_height = vec![hash_a, hash_b];
@@ -124,53 +103,48 @@ fn read_write() {
approvals: Default::default(),
};
let mut tx = DBTransaction::new();
let mut overlay_db = OverlayedBackend::new(&db);
overlay_db.write_stored_block_range(range.clone());
overlay_db.write_blocks_at_height(1, at_height.clone());
overlay_db.write_block_entry(block_entry.clone().into());
overlay_db.write_candidate_entry(candidate_entry.clone().into());
write_stored_blocks(&mut tx, range.clone());
write_blocks_at_height(&mut tx, 1, &at_height);
write_block_entry(&mut tx, &hash_a, &block_entry);
write_candidate_entry(&mut tx, &candidate_hash, &candidate_entry);
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
store.write(tx).unwrap();
assert_eq!(load_stored_blocks(&store, &TEST_CONFIG).unwrap(), Some(range));
assert_eq!(load_blocks_at_height(&store, &TEST_CONFIG, 1).unwrap(), at_height);
assert_eq!(load_block_entry(&store, &TEST_CONFIG, &hash_a).unwrap(), Some(block_entry));
assert_eq!(load_stored_blocks(store.as_ref(), &TEST_CONFIG).unwrap(), Some(range));
assert_eq!(load_blocks_at_height(store.as_ref(), &TEST_CONFIG, &1).unwrap(), at_height);
assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &hash_a).unwrap(), Some(block_entry.into()));
assert_eq!(
load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash).unwrap(),
Some(candidate_entry),
load_candidate_entry(store.as_ref(), &TEST_CONFIG, &candidate_hash).unwrap(),
Some(candidate_entry.into()),
);
let delete_keys = vec![
STORED_BLOCKS_KEY.to_vec(),
blocks_at_height_key(1).to_vec(),
block_entry_key(&hash_a).to_vec(),
candidate_entry_key(&candidate_hash).to_vec(),
];
let mut overlay_db = OverlayedBackend::new(&db);
overlay_db.delete_blocks_at_height(1);
overlay_db.delete_block_entry(&hash_a);
overlay_db.delete_candidate_entry(&candidate_hash);
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
let mut tx = DBTransaction::new();
for key in delete_keys {
tx.delete(DATA_COL, &key[..]);
}
store.write(tx).unwrap();
assert!(load_stored_blocks(&store, &TEST_CONFIG).unwrap().is_none());
assert!(load_blocks_at_height(&store, &TEST_CONFIG, 1).unwrap().is_empty());
assert!(load_block_entry(&store, &TEST_CONFIG, &hash_a).unwrap().is_none());
assert!(load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash).unwrap().is_none());
assert!(load_blocks_at_height(store.as_ref(), &TEST_CONFIG, &1).unwrap().is_empty());
assert!(load_block_entry(store.as_ref(), &TEST_CONFIG, &hash_a).unwrap().is_none());
assert!(load_candidate_entry(store.as_ref(), &TEST_CONFIG, &candidate_hash).unwrap().is_none());
}
#[test]
fn add_block_entry_works() {
let store = kvdb_memorydb::create(NUM_COLUMNS);
let (mut db, store) = make_db();
let parent_hash = Hash::repeat_byte(1);
let block_hash_a = Hash::repeat_byte(2);
let block_hash_b = Hash::repeat_byte(69);
let candidate_hash_a = CandidateHash(Hash::repeat_byte(3));
let candidate_hash_b = CandidateHash(Hash::repeat_byte(4));
let candidate_receipt_a = make_candidate(1.into(), parent_hash);
let candidate_receipt_b = make_candidate(2.into(), parent_hash);
let candidate_hash_a = candidate_receipt_a.hash();
let candidate_hash_b = candidate_receipt_b.hash();
let block_number = 10;
@@ -191,49 +165,53 @@ fn add_block_entry_works() {
let n_validators = 10;
let mut new_candidate_info = HashMap::new();
new_candidate_info.insert(candidate_hash_a, NewCandidateInfo {
candidate: make_candidate(1.into(), parent_hash),
backing_group: GroupIndex(0),
our_assignment: None,
});
new_candidate_info.insert(candidate_hash_a, NewCandidateInfo::new(
candidate_receipt_a,
GroupIndex(0),
None,
));
let mut overlay_db = OverlayedBackend::new(&db);
add_block_entry(
&store,
&TEST_CONFIG,
block_entry_a.clone(),
&mut overlay_db,
block_entry_a.clone().into(),
n_validators,
|h| new_candidate_info.get(h).map(|x| x.clone()),
).unwrap();
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
new_candidate_info.insert(candidate_hash_b, NewCandidateInfo {
candidate: make_candidate(2.into(), parent_hash),
backing_group: GroupIndex(1),
our_assignment: None,
});
new_candidate_info.insert(candidate_hash_b, NewCandidateInfo::new(
candidate_receipt_b,
GroupIndex(1),
None,
));
let mut overlay_db = OverlayedBackend::new(&db);
add_block_entry(
&store,
&TEST_CONFIG,
block_entry_b.clone(),
&mut overlay_db,
block_entry_b.clone().into(),
n_validators,
|h| new_candidate_info.get(h).map(|x| x.clone()),
).unwrap();
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a));
assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b));
assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a.into()));
assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b.into()));
let candidate_entry_a = load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash_a)
let candidate_entry_a = load_candidate_entry(store.as_ref(), &TEST_CONFIG, &candidate_hash_a)
.unwrap().unwrap();
assert_eq!(candidate_entry_a.block_assignments.keys().collect::<Vec<_>>(), vec![&block_hash_a, &block_hash_b]);
let candidate_entry_b = load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash_b)
let candidate_entry_b = load_candidate_entry(store.as_ref(), &TEST_CONFIG, &candidate_hash_b)
.unwrap().unwrap();
assert_eq!(candidate_entry_b.block_assignments.keys().collect::<Vec<_>>(), vec![&block_hash_b]);
}
#[test]
fn add_block_entry_adds_child() {
let store = kvdb_memorydb::create(NUM_COLUMNS);
let (mut db, store) = make_db();
let parent_hash = Hash::repeat_byte(1);
let block_hash_a = Hash::repeat_byte(2);
@@ -255,31 +233,33 @@ fn add_block_entry_adds_child() {
let n_validators = 10;
let mut overlay_db = OverlayedBackend::new(&db);
add_block_entry(
&store,
&TEST_CONFIG,
block_entry_a.clone(),
&mut overlay_db,
block_entry_a.clone().into(),
n_validators,
|_| None,
).unwrap();
add_block_entry(
&store,
&TEST_CONFIG,
block_entry_b.clone(),
&mut overlay_db,
block_entry_b.clone().into(),
n_validators,
|_| None,
).unwrap();
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
block_entry_a.children.push(block_hash_b);
assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a));
assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b));
assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a.into()));
assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b.into()));
}
#[test]
fn canonicalize_works() {
let store = kvdb_memorydb::create(NUM_COLUMNS);
let (mut db, store) = make_db();
// -> B1 -> C1 -> D1
// A -> B2 -> C2 -> D2
@@ -296,9 +276,10 @@ fn canonicalize_works() {
let n_validators = 10;
let mut tx = DBTransaction::new();
write_stored_blocks(&mut tx, StoredBlockRange(1, 5));
store.write(tx).unwrap();
let mut overlay_db = OverlayedBackend::new(&db);
overlay_db.write_stored_block_range(StoredBlockRange(1, 5));
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
let genesis = Hash::repeat_byte(0);
@@ -310,11 +291,17 @@ fn canonicalize_works() {
let block_hash_d1 = Hash::repeat_byte(6);
let block_hash_d2 = Hash::repeat_byte(7);
let cand_hash_1 = CandidateHash(Hash::repeat_byte(10));
let cand_hash_2 = CandidateHash(Hash::repeat_byte(11));
let cand_hash_3 = CandidateHash(Hash::repeat_byte(12));
let cand_hash_4 = CandidateHash(Hash::repeat_byte(13));
let cand_hash_5 = CandidateHash(Hash::repeat_byte(15));
let candidate_receipt_genesis = make_candidate(1.into(), genesis);
let candidate_receipt_a = make_candidate(2.into(), block_hash_a);
let candidate_receipt_b = make_candidate(3.into(), block_hash_a);
let candidate_receipt_b1 = make_candidate(4.into(), block_hash_b1);
let candidate_receipt_c1 = make_candidate(5.into(), block_hash_c1);
let cand_hash_1 = candidate_receipt_genesis.hash();
let cand_hash_2 = candidate_receipt_a.hash();
let cand_hash_3 = candidate_receipt_b.hash();
let cand_hash_4 = candidate_receipt_b1.hash();
let cand_hash_5 = candidate_receipt_c1.hash();
let block_entry_a = make_block_entry(block_hash_a, genesis, 1, Vec::new());
let block_entry_b1 = make_block_entry(block_hash_b1, block_hash_a, 2, Vec::new());
@@ -344,38 +331,37 @@ fn canonicalize_works() {
vec![(CoreIndex(0), cand_hash_5)],
);
let candidate_info = {
let mut candidate_info = HashMap::new();
candidate_info.insert(cand_hash_1, NewCandidateInfo {
candidate: make_candidate(1.into(), genesis),
backing_group: GroupIndex(1),
our_assignment: None,
});
candidate_info.insert(cand_hash_1, NewCandidateInfo::new(
candidate_receipt_genesis,
GroupIndex(1),
None,
));
candidate_info.insert(cand_hash_2, NewCandidateInfo {
candidate: make_candidate(2.into(), block_hash_a),
backing_group: GroupIndex(2),
our_assignment: None,
});
candidate_info.insert(cand_hash_2, NewCandidateInfo::new(
candidate_receipt_a,
GroupIndex(2),
None,
));
candidate_info.insert(cand_hash_3, NewCandidateInfo {
candidate: make_candidate(3.into(), block_hash_a),
backing_group: GroupIndex(3),
our_assignment: None,
});
candidate_info.insert(cand_hash_3, NewCandidateInfo::new(
candidate_receipt_b,
GroupIndex(3),
None,
));
candidate_info.insert(cand_hash_4, NewCandidateInfo {
candidate: make_candidate(4.into(), block_hash_b1),
backing_group: GroupIndex(4),
our_assignment: None,
});
candidate_info.insert(cand_hash_4, NewCandidateInfo::new(
candidate_receipt_b1,
GroupIndex(4),
None,
));
candidate_info.insert(cand_hash_5, NewCandidateInfo {
candidate: make_candidate(5.into(), block_hash_c1),
backing_group: GroupIndex(5),
our_assignment: None,
});
candidate_info.insert(cand_hash_5, NewCandidateInfo::new(
candidate_receipt_c1,
GroupIndex(5),
None,
));
candidate_info
};
@@ -391,25 +377,27 @@ fn canonicalize_works() {
block_entry_d2.clone(),
];
let mut overlay_db = OverlayedBackend::new(&db);
for block_entry in blocks {
add_block_entry(
&store,
&TEST_CONFIG,
block_entry,
&mut overlay_db,
block_entry.into(),
n_validators,
|h| candidate_info.get(h).map(|x| x.clone()),
).unwrap();
}
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
let check_candidates_in_store = |expected: Vec<(CandidateHash, Option<Vec<_>>)>| {
for (c_hash, in_blocks) in expected {
let (entry, in_blocks) = match in_blocks {
None => {
assert!(load_candidate_entry(&store, &TEST_CONFIG, &c_hash).unwrap().is_none());
assert!(load_candidate_entry(store.as_ref(), &TEST_CONFIG, &c_hash).unwrap().is_none());
continue
}
Some(i) => (
load_candidate_entry(&store, &TEST_CONFIG, &c_hash).unwrap().unwrap(),
load_candidate_entry(store.as_ref(), &TEST_CONFIG, &c_hash).unwrap().unwrap(),
i,
),
};
@@ -426,11 +414,11 @@ fn canonicalize_works() {
for (hash, with_candidates) in expected {
let (entry, with_candidates) = match with_candidates {
None => {
assert!(load_block_entry(&store, &TEST_CONFIG, &hash).unwrap().is_none());
assert!(load_block_entry(store.as_ref(), &TEST_CONFIG, &hash).unwrap().is_none());
continue
}
Some(i) => (
load_block_entry(&store, &TEST_CONFIG, &hash).unwrap().unwrap(),
load_block_entry(store.as_ref(), &TEST_CONFIG, &hash).unwrap().unwrap(),
i,
),
};
@@ -461,9 +449,12 @@ fn canonicalize_works() {
(block_hash_d2, Some(vec![cand_hash_5])),
]);
canonicalize(&store, &TEST_CONFIG, 3, block_hash_c1).unwrap();
let mut overlay_db = OverlayedBackend::new(&db);
canonicalize(&mut overlay_db, 3, block_hash_c1).unwrap();
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
assert_eq!(load_stored_blocks(&store, &TEST_CONFIG).unwrap().unwrap(), StoredBlockRange(4, 5));
assert_eq!(load_stored_blocks(store.as_ref(), &TEST_CONFIG).unwrap().unwrap(), StoredBlockRange(4, 5));
check_candidates_in_store(vec![
(cand_hash_1, None),
@@ -486,22 +477,23 @@ fn canonicalize_works() {
#[test]
fn force_approve_works() {
let store = kvdb_memorydb::create(NUM_COLUMNS);
let (mut db, store) = make_db();
let n_validators = 10;
let mut tx = DBTransaction::new();
write_stored_blocks(&mut tx, StoredBlockRange(1, 4));
store.write(tx).unwrap();
let mut overlay_db = OverlayedBackend::new(&db);
overlay_db.write_stored_block_range(StoredBlockRange(1, 4));
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
let candidate_hash = CandidateHash(Hash::repeat_byte(42));
let single_candidate_vec = vec![(CoreIndex(0), candidate_hash)];
let candidate_info = {
let mut candidate_info = HashMap::new();
candidate_info.insert(candidate_hash, NewCandidateInfo {
candidate: make_candidate(1.into(), Default::default()),
backing_group: GroupIndex(1),
our_assignment: None,
});
candidate_info.insert(candidate_hash, NewCandidateInfo::new(
make_candidate(1.into(), Default::default()),
GroupIndex(1),
None,
));
candidate_info
};
@@ -524,35 +516,36 @@ fn force_approve_works() {
block_entry_d.clone(),
];
let mut overlay_db = OverlayedBackend::new(&db);
for block_entry in blocks {
add_block_entry(
&store,
&TEST_CONFIG,
block_entry,
&mut overlay_db,
block_entry.into(),
n_validators,
|h| candidate_info.get(h).map(|x| x.clone()),
).unwrap();
}
let approved_hashes = force_approve(&store, TEST_CONFIG, block_hash_d, 2).unwrap();
let approved_hashes = force_approve(&mut overlay_db, block_hash_d, 2).unwrap();
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
assert!(load_block_entry(
&store,
store.as_ref(),
&TEST_CONFIG,
&block_hash_a,
).unwrap().unwrap().approved_bitfield.all());
assert!(load_block_entry(
&store,
store.as_ref(),
&TEST_CONFIG,
&block_hash_b,
).unwrap().unwrap().approved_bitfield.all());
assert!(load_block_entry(
&store,
store.as_ref(),
&TEST_CONFIG,
&block_hash_c,
).unwrap().unwrap().approved_bitfield.not_any());
assert!(load_block_entry(
&store,
store.as_ref(),
&TEST_CONFIG,
&block_hash_d,
).unwrap().unwrap().approved_bitfield.not_any());
@@ -564,7 +557,7 @@ fn force_approve_works() {
#[test]
fn load_all_blocks_works() {
let store = kvdb_memorydb::create(NUM_COLUMNS);
let (mut db, store) = make_db();
let parent_hash = Hash::repeat_byte(1);
let block_hash_a = Hash::repeat_byte(2);
@@ -596,34 +589,35 @@ fn load_all_blocks_works() {
let n_validators = 10;
let mut overlay_db = OverlayedBackend::new(&db);
add_block_entry(
&store,
&TEST_CONFIG,
block_entry_a.clone(),
&mut overlay_db,
block_entry_a.clone().into(),
n_validators,
|_| None
).unwrap();
// add C before B to test sorting.
add_block_entry(
&store,
&TEST_CONFIG,
block_entry_c.clone(),
&mut overlay_db,
block_entry_c.clone().into(),
n_validators,
|_| None
).unwrap();
add_block_entry(
&store,
&TEST_CONFIG,
block_entry_b.clone(),
&mut overlay_db,
block_entry_b.clone().into(),
n_validators,
|_| None
).unwrap();
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
assert_eq!(
load_all_blocks(
&store,
store.as_ref(),
&TEST_CONFIG
).unwrap(),
vec![block_hash_a, block_hash_b, block_hash_c],
@@ -0,0 +1,194 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! An abstraction over storage used by the chain selection subsystem.
//!
//! This provides both a [`Backend`] trait and an [`OverlayedBackend`]
//! struct which allows in-memory changes to be applied on top of a
//! [`Backend`], maintaining consistency between queries and temporary writes,
//! before any commit to the underlying storage is made.
use polkadot_node_subsystem::{SubsystemResult};
use polkadot_primitives::v1::{BlockNumber, CandidateHash, Hash};
use std::collections::HashMap;
use super::approval_db::v1::StoredBlockRange;
use super::persisted_entries::{BlockEntry, CandidateEntry};
#[derive(Debug)]
pub enum BackendWriteOp {
WriteStoredBlockRange(StoredBlockRange),
WriteBlocksAtHeight(BlockNumber, Vec<Hash>),
WriteBlockEntry(BlockEntry),
WriteCandidateEntry(CandidateEntry),
DeleteBlocksAtHeight(BlockNumber),
DeleteBlockEntry(Hash),
DeleteCandidateEntry(CandidateHash),
}
/// An abstraction over backend storage for the logic of this subsystem.
pub trait Backend {
/// Load a block entry from the DB.
fn load_block_entry(&self, hash: &Hash) -> SubsystemResult<Option<BlockEntry>>;
/// Load a candidate entry from the DB.
fn load_candidate_entry(&self, candidate_hash: &CandidateHash) -> SubsystemResult<Option<CandidateEntry>>;
/// Load all blocks at a specific height.
fn load_blocks_at_height(&self, height: &BlockNumber) -> SubsystemResult<Vec<Hash>>;
/// Load all block from the DB.
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>>;
/// Load stored block range form the DB.
fn load_stored_blocks(&self) -> SubsystemResult<Option<StoredBlockRange>>;
/// Atomically write the list of operations, with later operations taking precedence over prior.
fn write<I>(&mut self, ops: I) -> SubsystemResult<()>
where I: IntoIterator<Item = BackendWriteOp>;
}
/// An in-memory overlay over the backend.
///
/// This maintains read-only access to the underlying backend, but can be
/// converted into a set of write operations which will, when written to
/// the underlying backend, give the same view as the state of the overlay.
pub struct OverlayedBackend<'a, B: 'a> {
inner: &'a B,
// `None` means unchanged
stored_block_range: Option<StoredBlockRange>,
// `None` means 'deleted', missing means query inner.
blocks_at_height: HashMap<BlockNumber, Option<Vec<Hash>>>,
// `None` means 'deleted', missing means query inner.
block_entries: HashMap<Hash, Option<BlockEntry>>,
// `None` means 'deleted', missing means query inner.
candidate_entries: HashMap<CandidateHash, Option<CandidateEntry>>,
}
impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> {
pub fn new(backend: &'a B) -> Self {
OverlayedBackend {
inner: backend,
stored_block_range: None,
blocks_at_height: HashMap::new(),
block_entries: HashMap::new(),
candidate_entries: HashMap::new(),
}
}
pub fn is_empty(&self) -> bool {
self.block_entries.is_empty() &&
self.candidate_entries.is_empty() &&
self.blocks_at_height.is_empty() &&
self.stored_block_range.is_none()
}
pub fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>> {
let mut hashes = Vec::new();
if let Some(stored_blocks) = self.load_stored_blocks()? {
for height in stored_blocks.0..stored_blocks.1 {
hashes.extend(self.load_blocks_at_height(&height)?);
}
}
Ok(hashes)
}
pub fn load_stored_blocks(&self) -> SubsystemResult<Option<StoredBlockRange>> {
if let Some(val) = self.stored_block_range.clone() {
return Ok(Some(val))
}
self.inner.load_stored_blocks()
}
pub fn load_blocks_at_height(&self, height: &BlockNumber) -> SubsystemResult<Vec<Hash>> {
if let Some(val) = self.blocks_at_height.get(&height) {
return Ok(val.clone().unwrap_or_default())
}
self.inner.load_blocks_at_height(height)
}
pub fn load_block_entry(&self, hash: &Hash) -> SubsystemResult<Option<BlockEntry>> {
if let Some(val) = self.block_entries.get(&hash) {
return Ok(val.clone())
}
self.inner.load_block_entry(hash)
}
pub fn load_candidate_entry(&self, candidate_hash: &CandidateHash) -> SubsystemResult<Option<CandidateEntry>> {
if let Some(val) = self.candidate_entries.get(&candidate_hash) {
return Ok(val.clone())
}
self.inner.load_candidate_entry(candidate_hash)
}
// The assumption is that stored block range is only None on initialization.
// Therefore, there is no need to delete_stored_block_range.
pub fn write_stored_block_range(&mut self, range: StoredBlockRange) {
self.stored_block_range = Some(range);
}
pub fn write_blocks_at_height(&mut self, height: BlockNumber, blocks: Vec<Hash>) {
self.blocks_at_height.insert(height, Some(blocks));
}
pub fn delete_blocks_at_height(&mut self, height: BlockNumber) {
self.blocks_at_height.insert(height, None);
}
pub fn write_block_entry(&mut self, entry: BlockEntry) {
self.block_entries.insert(entry.block_hash(), Some(entry));
}
pub fn delete_block_entry(&mut self, hash: &Hash) {
self.block_entries.insert(*hash, None);
}
pub fn write_candidate_entry(&mut self, entry: CandidateEntry) {
self.candidate_entries.insert(entry.candidate_receipt().hash(), Some(entry));
}
pub fn delete_candidate_entry(&mut self, hash: &CandidateHash) {
self.candidate_entries.insert(*hash, None);
}
/// Transform this backend into a set of write-ops to be written to the
/// inner backend.
pub fn into_write_ops(self) -> impl Iterator<Item = BackendWriteOp> {
let blocks_at_height_ops = self.blocks_at_height.into_iter().map(|(h, v)| match v {
Some(v) => BackendWriteOp::WriteBlocksAtHeight(h, v),
None => BackendWriteOp::DeleteBlocksAtHeight(h),
});
let block_entry_ops = self.block_entries.into_iter().map(|(h, v)| match v {
Some(v) => BackendWriteOp::WriteBlockEntry(v),
None => BackendWriteOp::DeleteBlockEntry(h),
});
let candidate_entry_ops = self.candidate_entries.into_iter().map(|(h, v)| match v {
Some(v) => BackendWriteOp::WriteCandidateEntry(v),
None => BackendWriteOp::DeleteCandidateEntry(h),
});
self.stored_block_range
.map(|v| BackendWriteOp::WriteStoredBlockRange(v))
.into_iter()
.chain(blocks_at_height_ops)
.chain(block_entry_ops)
.chain(candidate_entry_ops)
}
}
@@ -49,7 +49,6 @@ use polkadot_node_primitives::approval::{
use polkadot_node_jaeger as jaeger;
use sc_keystore::LocalKeystore;
use sp_consensus_slots::Slot;
use kvdb::KeyValueDB;
use futures::prelude::*;
use futures::channel::oneshot;
@@ -58,12 +57,13 @@ use bitvec::order::Lsb0 as BitOrderLsb0;
use std::collections::HashMap;
use std::convert::TryFrom;
use crate::approval_db::{self, v1::Config as DatabaseConfig};
use super::approval_db::v1;
use crate::backend::{Backend, OverlayedBackend};
use crate::persisted_entries::CandidateEntry;
use crate::criteria::{AssignmentCriteria, OurAssignment};
use crate::time::{slot_number_to_tick, Tick};
use super::{LOG_TARGET, State, DBReader};
use super::{LOG_TARGET, State};
struct ImportedBlockInfo {
included_candidates: Vec<(CandidateHash, CandidateReceipt, CoreIndex, GroupIndex)>,
@@ -284,11 +284,10 @@ pub struct BlockImportedCandidates {
/// * and return information about all candidates imported under each block.
///
/// It is the responsibility of the caller to schedule wakeups for each block.
pub(crate) async fn handle_new_head(
pub(crate) async fn handle_new_head<'a>(
ctx: &mut impl SubsystemContext,
state: &mut State<impl DBReader>,
db_writer: &dyn KeyValueDB,
db_config: DatabaseConfig,
state: &mut State,
db: &mut OverlayedBackend<'a, impl Backend>,
head: Hash,
finalized_number: &Option<BlockNumber>,
) -> SubsystemResult<Vec<BlockImportedCandidates>> {
@@ -345,7 +344,7 @@ pub(crate) async fn handle_new_head(
let new_blocks = determine_new_blocks(
ctx.sender(),
|h| state.db.load_block_entry(h).map(|e| e.is_some()),
|h| db.load_block_entry(h).map(|e| e.is_some()),
head,
&header,
lower_bound_number,
@@ -473,7 +472,7 @@ pub(crate) async fn handle_new_head(
ctx.send_message(ChainSelectionMessage::Approved(block_hash).into()).await;
}
let block_entry = approval_db::v1::BlockEntry {
let block_entry = v1::BlockEntry {
block_hash,
parent_hash: block_header.parent_hash,
block_number: block_header.number,
@@ -494,12 +493,7 @@ pub(crate) async fn handle_new_head(
"Enacting force-approve",
);
let approved_hashes = approval_db::v1::force_approve(
db_writer,
db_config,
block_hash,
up_to,
)
let approved_hashes = crate::ops::force_approve(db, block_hash, up_to)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))?;
// Notify chain-selection of all approved hashes.
@@ -515,18 +509,17 @@ pub(crate) async fn handle_new_head(
"Writing BlockEntry",
);
let candidate_entries = approval_db::v1::add_block_entry(
db_writer,
&db_config,
block_entry,
let candidate_entries = crate::ops::add_block_entry(
db,
block_entry.into(),
n_validators,
|candidate_hash| {
included_candidates.iter().find(|(hash, _, _, _)| candidate_hash == hash)
.map(|(_, receipt, core, backing_group)| approval_db::v1::NewCandidateInfo {
candidate: receipt.clone(),
backing_group: *backing_group,
our_assignment: assignments.get(core).map(|a| a.clone().into()),
})
.map(|(_, receipt, core, backing_group)| super::ops::NewCandidateInfo::new(
receipt.clone(),
*backing_group,
assignments.get(core).map(|a| a.clone().into()),
))
}
).map_err(|e| SubsystemError::with_origin("approval-voting", e))?;
approval_meta.push(BlockApprovalMeta {
@@ -566,6 +559,7 @@ pub(crate) async fn handle_new_head(
#[cfg(test)]
mod tests {
use super::*;
use crate::approval_db::v1::DbBackend;
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_primitives::approval::{VRFOutput, VRFProof};
use polkadot_primitives::v1::{SessionInfo, ValidatorIndex};
@@ -580,8 +574,12 @@ mod tests {
use assert_matches::assert_matches;
use merlin::Transcript;
use std::{pin::Pin, sync::Arc};
use kvdb::KeyValueDB;
use crate::{APPROVAL_SESSIONS, criteria, BlockEntry};
use crate::{
APPROVAL_SESSIONS, criteria, BlockEntry,
approval_db::v1::Config as DatabaseConfig,
};
const DATA_COL: u32 = 0;
const NUM_COLUMNS: u32 = 1;
@@ -589,37 +587,6 @@ mod tests {
const TEST_CONFIG: DatabaseConfig = DatabaseConfig {
col_data: DATA_COL,
};
#[derive(Default)]
struct TestDB {
block_entries: HashMap<Hash, BlockEntry>,
candidate_entries: HashMap<CandidateHash, CandidateEntry>,
}
impl DBReader for TestDB {
fn load_block_entry(
&self,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>> {
Ok(self.block_entries.get(block_hash).map(|c| c.clone()))
}
fn load_candidate_entry(
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>> {
Ok(self.candidate_entries.get(candidate_hash).map(|c| c.clone()))
}
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>> {
let mut hashes: Vec<_> = self.block_entries.keys().cloned().collect();
hashes.sort_by_key(|k| self.block_entries.get(k).unwrap().block_number());
Ok(hashes)
}
}
#[derive(Default)]
struct MockClock;
@@ -635,20 +602,17 @@ mod tests {
}
}
fn blank_state() -> State<TestDB> {
fn blank_state() -> State {
State {
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
db: TestDB::default(),
clock: Box::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria),
}
}
fn single_session_state(index: SessionIndex, info: SessionInfo)
-> State<TestDB>
{
fn single_session_state(index: SessionIndex, info: SessionInfo) -> State {
State {
session_window: RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
@@ -1162,6 +1126,10 @@ mod tests {
#[test]
fn insta_approval_works() {
let db_writer: Arc<dyn KeyValueDB> = Arc::new(kvdb_memorydb::create(NUM_COLUMNS));
let mut db = DbBackend::new(db_writer.clone(), TEST_CONFIG);
let mut overlay_db = OverlayedBackend::new(&db);
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
@@ -1221,9 +1189,8 @@ mod tests {
.collect::<Vec<_>>();
let mut state = single_session_state(session, session_info);
state.db.block_entries.insert(
parent_hash.clone(),
crate::approval_db::v1::BlockEntry {
overlay_db.write_block_entry(
v1::BlockEntry {
block_hash: parent_hash.clone(),
parent_hash: Default::default(),
block_number: 4,
@@ -1233,22 +1200,26 @@ mod tests {
candidates: Vec::new(),
approved_bitfield: Default::default(),
children: Vec::new(),
}.into(),
}.into()
);
let db_writer = kvdb_memorydb::create(NUM_COLUMNS);
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
let test_fut = {
Box::pin(async move {
let mut overlay_db = OverlayedBackend::new(&db);
let result = handle_new_head(
&mut ctx,
&mut state,
&db_writer,
TEST_CONFIG,
&mut overlay_db,
hash,
&Some(1),
).await.unwrap();
let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
assert_eq!(result.len(), 1);
let candidates = &result[0].imported_candidates;
assert_eq!(candidates.len(), 2);
@@ -1256,8 +1227,8 @@ mod tests {
assert_eq!(candidates[1].1.approvals().len(), 6);
// the first candidate should be insta-approved
// the second should not
let entry: BlockEntry = crate::approval_db::v1::load_block_entry(
&db_writer,
let entry: BlockEntry = v1::load_block_entry(
db_writer.as_ref(),
&TEST_CONFIG,
&hash,
)
+75 -141
View File
@@ -72,12 +72,15 @@ use time::{slot_number_to_tick, Tick, Clock, ClockExt, SystemClock};
mod approval_checking;
mod approval_db;
mod backend;
mod criteria;
mod import;
mod ops;
mod time;
mod persisted_entries;
use crate::approval_db::v1::Config as DatabaseConfig;
use crate::approval_db::v1::{DbBackend, Config as DatabaseConfig};
use crate::backend::{Backend, OverlayedBackend};
#[cfg(test)]
mod tests;
@@ -330,11 +333,13 @@ impl<C> Subsystem<C> for ApprovalVotingSubsystem
where C: SubsystemContext<Message = ApprovalVotingMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
let future = run::<C>(
let backend = DbBackend::new(self.db.clone(), self.db_config);
let future = run::<DbBackend, C>(
ctx,
self,
Box::new(SystemClock),
Box::new(RealAssignmentCriteria),
backend,
)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
.boxed();
@@ -461,72 +466,6 @@ impl Wakeups {
}
}
/// A read-only handle to a database.
trait DBReader {
fn load_block_entry(
&self,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>>;
fn load_candidate_entry(
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>>;
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>>;
}
// This is a submodule to enforce opacity of the inner DB type.
mod approval_db_v1_reader {
use super::{
DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry,
SubsystemResult, SubsystemError, DatabaseConfig, approval_db,
};
/// A DB reader that uses the approval-db V1 under the hood.
pub(super) struct ApprovalDBV1Reader<T> {
inner: T,
config: DatabaseConfig,
}
impl<T> ApprovalDBV1Reader<T> {
pub(super) fn new(inner: T, config: DatabaseConfig) -> Self {
ApprovalDBV1Reader {
inner,
config,
}
}
}
impl<'a, T: 'a> DBReader for ApprovalDBV1Reader<T>
where T: std::ops::Deref<Target=(dyn KeyValueDB + 'a)>
{
fn load_block_entry(
&self,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>> {
approval_db::v1::load_block_entry(&*self.inner, &self.config, block_hash)
.map(|e| e.map(Into::into))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
fn load_candidate_entry(
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>> {
approval_db::v1::load_candidate_entry(&*self.inner, &self.config, candidate_hash)
.map(|e| e.map(Into::into))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>> {
approval_db::v1::load_all_blocks(&*self.inner, &self.config)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
}
}
use approval_db_v1_reader::ApprovalDBV1Reader;
struct ApprovalStatus {
required_tranches: RequiredTranches,
tranche_now: DelayTranche,
@@ -636,16 +575,15 @@ impl CurrentlyCheckingSet {
}
}
struct State<T> {
struct State {
session_window: RollingSessionWindow,
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
db: T,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
}
impl<T> State<T> {
impl State {
fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
self.session_window.session_info(i)
}
@@ -705,8 +643,6 @@ enum Action {
candidate_hash: CandidateHash,
tick: Tick,
},
WriteBlockEntry(BlockEntry),
WriteCandidateEntry(CandidateHash, CandidateEntry),
LaunchApproval {
candidate_hash: CandidateHash,
indirect_cert: IndirectAssignmentCert,
@@ -723,19 +659,21 @@ enum Action {
Conclude,
}
async fn run<C>(
async fn run<B, C>(
mut ctx: C,
mut subsystem: ApprovalVotingSubsystem,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
mut backend: B,
) -> SubsystemResult<()>
where C: SubsystemContext<Message = ApprovalVotingMessage>
where
C: SubsystemContext<Message = ApprovalVotingMessage>,
B: Backend,
{
let mut state = State {
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
keystore: subsystem.keystore,
slot_duration_millis: subsystem.slot_duration_millis,
db: ApprovalDBV1Reader::new(subsystem.db.clone(), subsystem.db_config.clone()),
clock,
assignment_criteria,
};
@@ -746,14 +684,14 @@ async fn run<C>(
let mut last_finalized_height: Option<BlockNumber> = None;
let db_writer = &*subsystem.db;
loop {
let mut overlayed_db = OverlayedBackend::new(&backend);
let actions = futures::select! {
(tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
subsystem.metrics.on_wakeup();
process_wakeup(
&mut state,
&mut overlayed_db,
woken_block,
woken_candidate,
tick,
@@ -763,9 +701,8 @@ async fn run<C>(
let mut actions = handle_from_overseer(
&mut ctx,
&mut state,
&mut overlayed_db,
&subsystem.metrics,
db_writer,
subsystem.db_config,
next_msg?,
&mut last_finalized_height,
&mut wakeups,
@@ -814,17 +751,23 @@ async fn run<C>(
if handle_actions(
&mut ctx,
&mut state,
&mut overlayed_db,
&subsystem.metrics,
&mut wakeups,
&mut currently_checking_set,
&mut approvals_cache,
db_writer,
subsystem.db_config,
&mut subsystem.mode,
actions,
).await? {
break;
}
if !overlayed_db.is_empty() {
let _timer = subsystem.metrics.time_db_transaction();
let ops = overlayed_db.into_write_ops();
backend.write(ops)?;
}
}
Ok(())
@@ -851,17 +794,15 @@ async fn run<C>(
// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
ctx: &mut impl SubsystemContext,
state: &mut State<impl DBReader>,
state: &mut State,
overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
wakeups: &mut Wakeups,
currently_checking_set: &mut CurrentlyCheckingSet,
approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
db: &dyn KeyValueDB,
db_config: DatabaseConfig,
mode: &mut Mode,
actions: Vec<Action>,
) -> SubsystemResult<bool> {
let mut transaction = approval_db::v1::Transaction::new(db_config);
let mut conclude = false;
let mut actions_iter = actions.into_iter();
@@ -872,15 +813,7 @@ async fn handle_actions(
block_number,
candidate_hash,
tick,
} => {
wakeups.schedule(block_hash, block_number, candidate_hash, tick)
}
Action::WriteBlockEntry(block_entry) => {
transaction.put_block_entry(block_entry.into());
}
Action::WriteCandidateEntry(candidate_hash, candidate_entry) => {
transaction.put_candidate_entry(candidate_hash, candidate_entry.into());
}
} => wakeups.schedule(block_hash, block_number, candidate_hash, tick),
Action::IssueApproval(candidate_hash, approval_request) => {
let mut sender = ctx.sender().clone();
// Note that the IssueApproval action will create additional
@@ -897,6 +830,7 @@ async fn handle_actions(
let next_actions: Vec<Action> = issue_approval(
&mut sender,
state,
overlayed_db,
metrics,
candidate_hash,
approval_request,
@@ -969,9 +903,7 @@ async fn handle_actions(
Action::BecomeActive => {
*mode = Mode::Active;
let messages = distribution_messages_for_activation(
ApprovalDBV1Reader::new(db, db_config)
)?;
let messages = distribution_messages_for_activation(overlayed_db)?;
ctx.send_messages(messages.into_iter().map(Into::into)).await;
}
@@ -979,20 +911,13 @@ async fn handle_actions(
}
}
if !transaction.is_empty() {
let _timer = metrics.time_db_transaction();
transaction.write(db)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))?;
}
Ok(conclude)
}
fn distribution_messages_for_activation<'a>(
db: impl DBReader + 'a,
fn distribution_messages_for_activation(
db: &OverlayedBackend<'_, impl Backend>,
) -> SubsystemResult<Vec<ApprovalDistributionMessage>> {
let all_blocks = db.load_all_blocks()?;
let all_blocks: Vec<Hash> = db.load_all_blocks()?;
let mut approval_meta = Vec::with_capacity(all_blocks.len());
let mut messages = Vec::new();
@@ -1089,10 +1014,9 @@ fn distribution_messages_for_activation<'a>(
// Handle an incoming signal from the overseer. Returns true if execution should conclude.
async fn handle_from_overseer(
ctx: &mut impl SubsystemContext,
state: &mut State<impl DBReader>,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
db_writer: &dyn KeyValueDB,
db_config: DatabaseConfig,
x: FromOverseer<ApprovalVotingMessage>,
last_finalized_height: &mut Option<BlockNumber>,
wakeups: &mut Wakeups,
@@ -1107,8 +1031,7 @@ async fn handle_from_overseer(
match import::handle_new_head(
ctx,
state,
db_writer,
db_config,
db,
head,
&*last_finalized_height,
).await {
@@ -1162,7 +1085,7 @@ async fn handle_from_overseer(
FromOverseer::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
*last_finalized_height = Some(block_number);
approval_db::v1::canonicalize(db_writer, &db_config, block_number, block_hash)
crate::ops::canonicalize(db, block_number, block_hash)
.map_err(|e| SubsystemError::with_origin("db", e))?;
wakeups.prune_finalized_wakeups(block_number);
@@ -1175,15 +1098,16 @@ async fn handle_from_overseer(
FromOverseer::Communication { msg } => match msg {
ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => {
let (check_outcome, actions)
= check_and_import_assignment(state, a, claimed_core)?;
= check_and_import_assignment(state, db, a, claimed_core)?;
let _ = res.send(check_outcome);
actions
}
ApprovalVotingMessage::CheckAndImportApproval(a, res) => {
check_and_import_approval(state, metrics, a, |r| { let _ = res.send(r); })?.0
check_and_import_approval(state, db, metrics, a, |r| { let _ = res.send(r); })?.0
}
ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res ) => {
match handle_approved_ancestor(ctx, &state.db, target, lower_bound, wakeups).await {
match handle_approved_ancestor(ctx, db, target, lower_bound, wakeups).await {
Ok(v) => {
let _ = res.send(v);
}
@@ -1203,7 +1127,7 @@ async fn handle_from_overseer(
async fn handle_approved_ancestor(
ctx: &mut impl SubsystemContext,
db: &impl DBReader,
db: &OverlayedBackend<'_, impl Backend>,
target: Hash,
lower_bound: BlockNumber,
wakeups: &Wakeups,
@@ -1491,14 +1415,15 @@ fn schedule_wakeup_action(
}
fn check_and_import_assignment(
state: &State<impl DBReader>,
state: &State,
db: &mut OverlayedBackend<'_, impl Backend>,
assignment: IndirectAssignmentCert,
candidate_index: CandidateIndex,
) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)> {
const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
let tick_now = state.clock.tick_now();
let block_entry = match state.db.load_block_entry(&assignment.block_hash)? {
let block_entry = match db.load_block_entry(&assignment.block_hash)? {
Some(b) => b,
None => return Ok((AssignmentCheckResult::Bad(
AssignmentCheckError::UnknownBlock(assignment.block_hash),
@@ -1523,7 +1448,7 @@ fn check_and_import_assignment(
), Vec::new())), // no candidate at core.
};
let mut candidate_entry = match state.db.load_candidate_entry(&assigned_candidate_hash)? {
let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
Some(c) => c,
None => {
return Ok((AssignmentCheckResult::Bad(
@@ -1605,13 +1530,14 @@ fn check_and_import_assignment(
}
// We also write the candidate entry as it now contains the new candidate.
actions.push(Action::WriteCandidateEntry(assigned_candidate_hash, candidate_entry));
db.write_candidate_entry(candidate_entry.into());
Ok((res, actions))
}
fn check_and_import_approval<T>(
state: &State<impl DBReader>,
state: &State,
db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
approval: IndirectSignedApprovalVote,
with_response: impl FnOnce(ApprovalCheckResult) -> T,
@@ -1623,7 +1549,7 @@ fn check_and_import_approval<T>(
} }
}
let block_entry = match state.db.load_block_entry(&approval.block_hash)? {
let block_entry = match db.load_block_entry(&approval.block_hash)? {
Some(b) => b,
None => {
respond_early!(ApprovalCheckResult::Bad(
@@ -1666,7 +1592,7 @@ fn check_and_import_approval<T>(
))
}
let candidate_entry = match state.db.load_candidate_entry(&approved_candidate_hash)? {
let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? {
Some(c) => c,
None => {
respond_early!(ApprovalCheckResult::Bad(
@@ -1704,6 +1630,7 @@ fn check_and_import_approval<T>(
let actions = import_checked_approval(
state,
db,
&metrics,
block_entry,
approved_candidate_hash,
@@ -1738,7 +1665,8 @@ impl ApprovalSource {
// validator on the candidate and block. This updates the block entry and candidate entry as
// necessary and schedules any further wakeups.
fn import_checked_approval(
state: &State<impl DBReader>,
state: &State,
db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
mut block_entry: BlockEntry,
candidate_hash: CandidateHash,
@@ -1812,7 +1740,7 @@ fn import_checked_approval(
actions.push(Action::NoteApprovedInChainSelection(block_hash));
}
actions.push(Action::WriteBlockEntry(block_entry));
db.write_block_entry(block_entry.into());
}
(is_approved, status)
@@ -1856,11 +1784,11 @@ fn import_checked_approval(
//
// 1. The source is remote, as we don't store anything new in the approval entry.
// 2. The candidate is not newly approved, as we haven't altered the approval entry's
// approved flag with `mark_approved` above.
// approved flag with `mark_approved` above.
// 3. The source had already approved the candidate, as we haven't altered the bitfield.
if !source.is_remote() || newly_approved || !already_approved_by {
// In all other cases, we need to write the candidate entry.
actions.push(Action::WriteCandidateEntry(candidate_hash, candidate_entry));
db.write_candidate_entry(candidate_entry);
}
}
@@ -1904,7 +1832,8 @@ fn should_trigger_assignment(
}
fn process_wakeup(
state: &State<impl DBReader>,
state: &State,
db: &mut OverlayedBackend<'_, impl Backend>,
relay_block: Hash,
candidate_hash: CandidateHash,
expected_tick: Tick,
@@ -1917,8 +1846,8 @@ fn process_wakeup(
.with_candidate(candidate_hash)
.with_stage(jaeger::Stage::ApprovalChecking);
let block_entry = state.db.load_block_entry(&relay_block)?;
let candidate_entry = state.db.load_candidate_entry(&candidate_hash)?;
let block_entry = db.load_block_entry(&relay_block)?;
let candidate_entry = db.load_candidate_entry(&candidate_hash)?;
// If either is not present, we have nothing to wakeup. Might have lost a race with finality
let (block_entry, mut candidate_entry) = match (block_entry, candidate_entry) {
@@ -1981,7 +1910,10 @@ fn process_wakeup(
(should_trigger, approval_entry.backing_group())
};
let (mut actions, maybe_cert) = if should_trigger {
let mut actions = Vec::new();
let candidate_receipt = candidate_entry.candidate_receipt().clone();
let maybe_cert = if should_trigger {
let maybe_cert = {
let approval_entry = candidate_entry.approval_entry_mut(&relay_block)
.expect("should_trigger only true if this fetched earlier; qed");
@@ -1989,11 +1921,11 @@ fn process_wakeup(
approval_entry.trigger_our_assignment(state.clock.tick_now())
};
let actions = vec![Action::WriteCandidateEntry(candidate_hash, candidate_entry.clone())];
db.write_candidate_entry(candidate_entry.clone());
(actions, maybe_cert)
maybe_cert
} else {
(Vec::new(), None)
None
};
if let Some((cert, val_index, tranche)) = maybe_cert {
@@ -2010,7 +1942,7 @@ fn process_wakeup(
tracing::trace!(
target: LOG_TARGET,
?candidate_hash,
para_id = ?candidate_entry.candidate_receipt().descriptor.para_id,
para_id = ?candidate_receipt.descriptor.para_id,
block_hash = ?relay_block,
"Launching approval work.",
);
@@ -2023,7 +1955,7 @@ fn process_wakeup(
relay_block_hash: relay_block,
candidate_index: i as _,
session: block_entry.session(),
candidate: candidate_entry.candidate_receipt().clone(),
candidate: candidate_receipt,
backing_group,
});
}
@@ -2274,12 +2206,13 @@ async fn launch_approval(
// have been done.
fn issue_approval(
ctx: &mut impl SubsystemSender,
state: &State<impl DBReader>,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
candidate_hash: CandidateHash,
ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest,
) -> SubsystemResult<Vec<Action>> {
let block_entry = match state.db.load_block_entry(&block_hash)? {
let block_entry = match db.load_block_entry(&block_hash)? {
Some(b) => b,
None => {
// not a cause for alarm - just lost a race with pruning, most likely.
@@ -2337,7 +2270,7 @@ fn issue_approval(
}
};
let candidate_entry = match state.db.load_candidate_entry(&candidate_hash)? {
let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
Some(c) => c,
None => {
tracing::warn!(
@@ -2397,6 +2330,7 @@ fn issue_approval(
let actions = import_checked_approval(
state,
db,
metrics,
block_entry,
candidate_hash,
@@ -0,0 +1,334 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Middleware interface that leverages low-level database operations
//! to provide a clean API for processing block and candidate imports.
use polkadot_node_subsystem::SubsystemResult;
use polkadot_primitives::v1::{
CandidateHash, CandidateReceipt, BlockNumber, GroupIndex, Hash,
};
use bitvec::{order::Lsb0 as BitOrderLsb0};
use std::convert::Into;
use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry;
use super::persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry};
use super::backend::{Backend, OverlayedBackend};
use super::approval_db::{
v1::{
OurAssignment, StoredBlockRange,
},
};
/// Information about a new candidate necessary to instantiate the requisite
/// candidate and approval entries.
#[derive(Clone)]
pub struct NewCandidateInfo {
candidate: CandidateReceipt,
backing_group: GroupIndex,
our_assignment: Option<OurAssignment>,
}
impl NewCandidateInfo {
/// Convenience constructor
pub fn new(
candidate: CandidateReceipt,
backing_group: GroupIndex,
our_assignment: Option<OurAssignment>,
) -> Self {
Self { candidate, backing_group, our_assignment }
}
}
fn visit_and_remove_block_entry(
block_hash: Hash,
overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
visited_candidates: &mut HashMap<CandidateHash, CandidateEntry>,
) -> SubsystemResult<Vec<Hash>> {
let block_entry = match overlayed_db.load_block_entry(&block_hash)? {
None => return Ok(Vec::new()),
Some(b) => b,
};
overlayed_db.delete_block_entry(&block_hash);
for &(_, ref candidate_hash) in block_entry.candidates() {
let candidate = match visited_candidates.entry(*candidate_hash) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => {
e.insert(match overlayed_db.load_candidate_entry(candidate_hash)? {
None => continue, // Should not happen except for corrupt DB
Some(c) => c,
})
}
};
candidate.block_assignments.remove(&block_hash);
}
Ok(block_entry.children)
}
/// Canonicalize some particular block, pruning everything before it and
/// pruning any competing branches at the same height.
pub fn canonicalize(
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
canon_number: BlockNumber,
canon_hash: Hash,
) -> SubsystemResult<()> {
let range = match overlay_db.load_stored_blocks()? {
None => return Ok(()),
Some(range) if range.0 >= canon_number => return Ok(()),
Some(range) => range,
};
// Storing all candidates in memory is potentially heavy, but should be fine
// as long as finality doesn't stall for a long while. We could optimize this
// by keeping only the metadata about which blocks reference each candidate.
let mut visited_candidates = HashMap::new();
// All the block heights we visited but didn't necessarily delete everything from.
let mut visited_heights = HashMap::new();
// First visit everything before the height.
for i in range.0..canon_number {
let at_height = overlay_db.load_blocks_at_height(&i)?;
overlay_db.delete_blocks_at_height(i);
for b in at_height {
let _ = visit_and_remove_block_entry(
b,
overlay_db,
&mut visited_candidates,
)?;
}
}
// Then visit everything at the height.
let pruned_branches = {
let at_height = overlay_db.load_blocks_at_height(&canon_number)?;
overlay_db.delete_blocks_at_height(canon_number);
// Note that while there may be branches descending from blocks at earlier heights,
// we have already covered them by removing everything at earlier heights.
let mut pruned_branches = Vec::new();
for b in at_height {
let children = visit_and_remove_block_entry(
b,
overlay_db,
&mut visited_candidates,
)?;
if b != canon_hash {
pruned_branches.extend(children);
}
}
pruned_branches
};
// Follow all children of non-canonicalized blocks.
{
let mut frontier: Vec<(BlockNumber, Hash)> = pruned_branches.into_iter().map(|h| (canon_number + 1, h)).collect();
while let Some((height, next_child)) = frontier.pop() {
let children = visit_and_remove_block_entry(
next_child,
overlay_db,
&mut visited_candidates,
)?;
// extend the frontier of branches to include the given height.
frontier.extend(children.into_iter().map(|h| (height + 1, h)));
// visit the at-height key for this deleted block's height.
let at_height = match visited_heights.entry(height) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => e.insert(overlay_db.load_blocks_at_height(&height)?),
};
if let Some(i) = at_height.iter().position(|x| x == &next_child) {
at_height.remove(i);
}
}
}
// Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`.
for (candidate_hash, candidate) in visited_candidates.into_iter() {
if candidate.block_assignments.is_empty() {
overlay_db.delete_candidate_entry(&candidate_hash);
} else {
overlay_db.write_candidate_entry(candidate);
}
}
// Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`.
for (h, at) in visited_heights.into_iter() {
if at.is_empty() {
overlay_db.delete_blocks_at_height(h);
} else {
overlay_db.write_blocks_at_height(h, at);
}
}
// due to the fork pruning, this range actually might go too far above where our actual highest block is,
// if a relatively short fork is canonicalized.
// TODO https://github.com/paritytech/polkadot/issues/3389
let new_range = StoredBlockRange(
canon_number + 1,
std::cmp::max(range.1, canon_number + 2),
);
overlay_db.write_stored_block_range(new_range);
Ok(())
}
/// Record a new block entry.
///
/// This will update the blocks-at-height mapping, the stored block range, if necessary,
/// and add block and candidate entries. It will also add approval entries to existing
/// candidate entries and add this as a child of any block entry corresponding to the
/// parent hash.
///
/// Has no effect if there is already an entry for the block or `candidate_info` returns
/// `None` for any of the candidates referenced by the block entry. In these cases,
/// no information about new candidates will be referred to by this function.
pub fn add_block_entry(
store: &mut OverlayedBackend<'_, impl Backend>,
entry: BlockEntry,
n_validators: usize,
candidate_info: impl Fn(&CandidateHash) -> Option<NewCandidateInfo>,
) -> SubsystemResult<Vec<(CandidateHash, CandidateEntry)>> {
let session = entry.session();
let parent_hash = entry.parent_hash();
let number = entry.block_number();
// Update the stored block range.
{
let new_range = match store.load_stored_blocks()? {
None => Some(StoredBlockRange(number, number + 1)),
Some(range) if range.1 <= number => Some(StoredBlockRange(range.0, number + 1)),
Some(_) => None,
};
new_range.map(|n| store.write_stored_block_range(n));
};
// Update the blocks at height meta key.
{
let mut blocks_at_height = store.load_blocks_at_height(&number)?;
if blocks_at_height.contains(&entry.block_hash()) {
// seems we already have a block entry for this block. nothing to do here.
return Ok(Vec::new())
}
blocks_at_height.push(entry.block_hash());
store.write_blocks_at_height(number, blocks_at_height)
};
let mut candidate_entries = Vec::with_capacity(entry.candidates().len());
// read and write all updated entries.
{
for &(_, ref candidate_hash) in entry.candidates() {
let NewCandidateInfo {
candidate,
backing_group,
our_assignment,
} = match candidate_info(candidate_hash) {
None => return Ok(Vec::new()),
Some(info) => info,
};
let mut candidate_entry = store.load_candidate_entry(&candidate_hash)?
.unwrap_or_else(move || CandidateEntry {
candidate,
session,
block_assignments: BTreeMap::new(),
approvals: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators],
});
candidate_entry.block_assignments.insert(
entry.block_hash(),
ApprovalEntry::new(
Vec::new(),
backing_group,
our_assignment.map(|v| v.into()),
None,
bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators],
false,
)
);
store.write_candidate_entry(candidate_entry.clone());
candidate_entries.push((*candidate_hash, candidate_entry));
}
};
// Update the child index for the parent.
store.load_block_entry(&parent_hash)?.map(|mut e| {
e.children.push(entry.block_hash());
store.write_block_entry(e);
});
// Put the new block entry in.
store.write_block_entry(entry);
Ok(candidate_entries)
}
/// Forcibly approve all candidates included at up to the given relay-chain height in the indicated
/// chain.
pub fn force_approve(
store: &mut OverlayedBackend<'_, impl Backend>,
chain_head: Hash,
up_to: BlockNumber,
) -> SubsystemResult<Vec<Hash>> {
enum State {
WalkTo,
Approving,
}
let mut approved_hashes = Vec::new();
let mut cur_hash = chain_head;
let mut state = State::WalkTo;
// iterate back to the `up_to` block, and then iterate backwards until all blocks
// are updated.
while let Some(mut entry) = store.load_block_entry(&cur_hash)? {
if entry.block_number() <= up_to {
state = State::Approving;
}
cur_hash = entry.parent_hash();
match state {
State::WalkTo => {},
State::Approving => {
entry.approved_bitfield.iter_mut().for_each(|mut b| *b = true);
approved_hashes.push(entry.block_hash());
store.write_block_entry(entry);
}
}
}
Ok(approved_hashes)
}
@@ -86,6 +86,19 @@ pub struct ApprovalEntry {
}
impl ApprovalEntry {
/// Convenience constructor
pub fn new(
tranches: Vec<TrancheEntry>,
backing_group: GroupIndex,
our_assignment: Option<OurAssignment>,
our_approval_sig: Option<ValidatorSignature>,
// `n_validators` bits.
assignments: BitVec<BitOrderLsb0, u8>,
approved: bool,
) -> Self {
Self { tranches, backing_group, our_assignment, our_approval_sig, assignments, approved }
}
// Access our assignment for this approval entry.
pub fn our_assignment(&self) -> Option<&OurAssignment> {
self.our_assignment.as_ref()
@@ -244,12 +257,12 @@ impl From<ApprovalEntry> for crate::approval_db::v1::ApprovalEntry {
/// Metadata regarding approval of a particular candidate.
#[derive(Debug, Clone, PartialEq)]
pub struct CandidateEntry {
candidate: CandidateReceipt,
session: SessionIndex,
pub candidate: CandidateReceipt,
pub session: SessionIndex,
// Assignments are based on blocks, so we need to track assignments separately
// based on the block we are looking at.
block_assignments: BTreeMap<Hash, ApprovalEntry>,
approvals: BitVec<BitOrderLsb0, u8>,
pub block_assignments: BTreeMap<Hash, ApprovalEntry>,
pub approvals: BitVec<BitOrderLsb0, u8>,
}
impl CandidateEntry {
@@ -328,8 +341,8 @@ pub struct BlockEntry {
// A bitfield where the i'th bit corresponds to the i'th candidate in `candidates`.
// The i'th bit is `true` iff the candidate has been approved in the context of this
// block. The block can be considered approved if the bitfield has all bits set to `true`.
approved_bitfield: BitVec<BitOrderLsb0, u8>,
children: Vec<Hash>,
pub approved_bitfield: BitVec<BitOrderLsb0, u8>,
pub children: Vec<Hash>,
}
impl BlockEntry {
File diff suppressed because it is too large Load Diff
+3
View File
@@ -223,6 +223,9 @@ pub enum SubsystemError {
/// The wrapped error. Marked as source for tracking the error chain.
#[source] source: Box<dyn 'static + std::error::Error + Send + Sync>
},
#[error(transparent)]
Io(#[from] std::io::Error),
}
impl SubsystemError {
+1 -1
View File
@@ -120,7 +120,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
spec_name: create_runtime_str!("kusama"),
impl_name: create_runtime_str!("parity-kusama"),
authoring_version: 2,
spec_version: 9080,
spec_version: 9090,
impl_version: 0,
#[cfg(not(feature = "disable-runtime-api"))]
apis: RUNTIME_API_VERSIONS,
+1 -1
View File
@@ -95,7 +95,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
spec_name: create_runtime_str!("polkadot"),
impl_name: create_runtime_str!("parity-polkadot"),
authoring_version: 0,
spec_version: 9080,
spec_version: 9090,
impl_version: 0,
#[cfg(not(feature = "disable-runtime-api"))]
apis: RUNTIME_API_VERSIONS,
+1 -1
View File
@@ -119,7 +119,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
spec_name: create_runtime_str!("westend"),
impl_name: create_runtime_str!("parity-westend"),
authoring_version: 2,
spec_version: 9080,
spec_version: 9090,
impl_version: 0,
#[cfg(not(feature = "disable-runtime-api"))]
apis: RUNTIME_API_VERSIONS,