diff --git a/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs b/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs index f6307a8b5a..428692b025 100644 --- a/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs +++ b/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs @@ -17,6 +17,7 @@ //! Version 1 of the DB schema. use kvdb::{DBTransaction, KeyValueDB}; +use polkadot_node_subsystem::{SubsystemResult, SubsystemError}; use polkadot_node_primitives::approval::{DelayTranche, AssignmentCert}; use polkadot_primitives::v1::{ ValidatorIndex, GroupIndex, CandidateReceipt, SessionIndex, CoreIndex, @@ -25,22 +26,141 @@ use polkadot_primitives::v1::{ use sp_consensus_slots::Slot; use parity_scale_codec::{Encode, Decode}; -use std::collections::{BTreeMap, HashMap}; -use std::collections::hash_map::Entry; +use std::collections::BTreeMap; +use std::sync::Arc; use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0}; +use crate::backend::{Backend, BackendWriteOp}; +use crate::persisted_entries; + +const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks"; + #[cfg(test)] pub mod tests; +/// DbBackend is a concrete implementation of the higher-level Backend trait +pub struct DbBackend { + inner: Arc, + config: Config, +} + +impl DbBackend { + /// Create a new [`DbBackend`] with the supplied key-value store and + /// config. + pub fn new(db: Arc, config: Config) -> Self { + DbBackend { + inner: db, + config, + } + } +} + +impl Backend for DbBackend { + fn load_block_entry( + &self, + block_hash: &Hash, + ) -> SubsystemResult> { + load_block_entry(&*self.inner, &self.config, block_hash) + .map(|e| e.map(Into::into)) + } + + fn load_candidate_entry( + &self, + candidate_hash: &CandidateHash, + ) -> SubsystemResult> { + load_candidate_entry(&*self.inner, &self.config, candidate_hash) + .map(|e| e.map(Into::into)) + } + + fn load_blocks_at_height( + &self, + block_height: &BlockNumber + ) -> SubsystemResult> { + load_blocks_at_height(&*self.inner, &self.config, block_height) + } + + fn load_all_blocks(&self) -> SubsystemResult> { + load_all_blocks(&*self.inner, &self.config) + } + + fn load_stored_blocks(&self) -> SubsystemResult> { + load_stored_blocks(&*self.inner, &self.config) + } + + /// Atomically write the list of operations, with later operations taking precedence over prior. + fn write(&mut self, ops: I) -> SubsystemResult<()> + where I: IntoIterator + { + let mut tx = DBTransaction::new(); + for op in ops { + match op { + BackendWriteOp::WriteStoredBlockRange(stored_block_range) => { + tx.put_vec( + self.config.col_data, + &STORED_BLOCKS_KEY, + stored_block_range.encode(), + ); + } + BackendWriteOp::WriteBlocksAtHeight(h, blocks) => { + tx.put_vec( + self.config.col_data, + &blocks_at_height_key(h), + blocks.encode(), + ); + } + BackendWriteOp::DeleteBlocksAtHeight(h) => { + tx.delete( + self.config.col_data, + &blocks_at_height_key(h), + ); + } + BackendWriteOp::WriteBlockEntry(block_entry) => { + let block_entry: BlockEntry = block_entry.into(); + tx.put_vec( + self.config.col_data, + &block_entry_key(&block_entry.block_hash), + block_entry.encode(), + ); + } + BackendWriteOp::DeleteBlockEntry(hash) => { + tx.delete( + self.config.col_data, + &block_entry_key(&hash), + ); + } + BackendWriteOp::WriteCandidateEntry(candidate_entry) => { + let candidate_entry: CandidateEntry = candidate_entry.into(); + tx.put_vec( + self.config.col_data, + &candidate_entry_key(&candidate_entry.candidate.hash()), + candidate_entry.encode(), + ); + } + BackendWriteOp::DeleteCandidateEntry(candidate_hash) => { + tx.delete( + self.config.col_data, + &candidate_entry_key(&candidate_hash), + ); + } + } + } + + self.inner.write(tx).map_err(|e| e.into()) + } +} + +/// A range from earliest..last block number stored within the DB. +#[derive(Encode, Decode, Debug, Clone, PartialEq)] +pub struct StoredBlockRange(pub BlockNumber, pub BlockNumber); + // slot_duration * 2 + DelayTranche gives the number of delay tranches since the // unix epoch. #[derive(Encode, Decode, Clone, Copy, Debug, PartialEq)] pub struct Tick(u64); +/// Convenience type definition pub type Bitfield = BitVec; -const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks"; - /// The database config. #[derive(Debug, Clone, Copy)] pub struct Config { @@ -113,10 +233,6 @@ pub struct BlockEntry { pub children: Vec, } -/// A range from earliest..last block number stored within the DB. -#[derive(Encode, Decode, Debug, Clone, PartialEq)] -pub struct StoredBlockRange(BlockNumber, BlockNumber); - impl From for Tick { fn from(tick: crate::Tick) -> Tick { Tick(tick) @@ -141,163 +257,7 @@ impl std::error::Error for Error {} /// Result alias for DB errors. pub type Result = std::result::Result; -/// Canonicalize some particular block, pruning everything before it and -/// pruning any competing branches at the same height. -pub(crate) fn canonicalize( - store: &dyn KeyValueDB, - config: &Config, - canon_number: BlockNumber, - canon_hash: Hash, -) - -> Result<()> -{ - let range = match load_stored_blocks(store, config)? { - None => return Ok(()), - Some(range) => if range.0 >= canon_number { - return Ok(()) - } else { - range - }, - }; - - let mut transaction = DBTransaction::new(); - - // Storing all candidates in memory is potentially heavy, but should be fine - // as long as finality doesn't stall for a long while. We could optimize this - // by keeping only the metadata about which blocks reference each candidate. - let mut visited_candidates = HashMap::new(); - - // All the block heights we visited but didn't necessarily delete everything from. - let mut visited_heights = HashMap::new(); - - let visit_and_remove_block_entry = | - block_hash: Hash, - transaction: &mut DBTransaction, - visited_candidates: &mut HashMap, - | -> Result> { - let block_entry = match load_block_entry(store, config, &block_hash)? { - None => return Ok(Vec::new()), - Some(b) => b, - }; - - transaction.delete(config.col_data, &block_entry_key(&block_hash)[..]); - for &(_, ref candidate_hash) in &block_entry.candidates { - let candidate = match visited_candidates.entry(*candidate_hash) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - e.insert(match load_candidate_entry(store, config, candidate_hash)? { - None => continue, // Should not happen except for corrupt DB - Some(c) => c, - }) - } - }; - - candidate.block_assignments.remove(&block_hash); - } - - Ok(block_entry.children) - }; - - // First visit everything before the height. - for i in range.0..canon_number { - let at_height = load_blocks_at_height(store, config, i)?; - transaction.delete(config.col_data, &blocks_at_height_key(i)[..]); - - for b in at_height { - let _ = visit_and_remove_block_entry( - b, - &mut transaction, - &mut visited_candidates, - )?; - } - } - - // Then visit everything at the height. - let pruned_branches = { - let at_height = load_blocks_at_height(store, config, canon_number)?; - transaction.delete(config.col_data, &blocks_at_height_key(canon_number)); - - // Note that while there may be branches descending from blocks at earlier heights, - // we have already covered them by removing everything at earlier heights. - let mut pruned_branches = Vec::new(); - - for b in at_height { - let children = visit_and_remove_block_entry( - b, - &mut transaction, - &mut visited_candidates, - )?; - - if b != canon_hash { - pruned_branches.extend(children); - } - } - - pruned_branches - }; - - // Follow all children of non-canonicalized blocks. - { - let mut frontier: Vec<_> = pruned_branches.into_iter().map(|h| (canon_number + 1, h)).collect(); - while let Some((height, next_child)) = frontier.pop() { - let children = visit_and_remove_block_entry( - next_child, - &mut transaction, - &mut visited_candidates, - )?; - - // extend the frontier of branches to include the given height. - frontier.extend(children.into_iter().map(|h| (height + 1, h))); - - // visit the at-height key for this deleted block's height. - let at_height = match visited_heights.entry(height) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => e.insert(load_blocks_at_height(store, config, height)?), - }; - - if let Some(i) = at_height.iter().position(|x| x == &next_child) { - at_height.remove(i); - } - } - } - - // Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`. - for (candidate_hash, candidate) in visited_candidates { - if candidate.block_assignments.is_empty() { - transaction.delete(config.col_data, &candidate_entry_key(&candidate_hash)[..]); - } else { - transaction.put_vec( - config.col_data, - &candidate_entry_key(&candidate_hash)[..], - candidate.encode(), - ); - } - } - - // Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`. - for (h, at) in visited_heights { - if at.is_empty() { - transaction.delete(config.col_data, &blocks_at_height_key(h)[..]); - } else { - transaction.put_vec(config.col_data, &blocks_at_height_key(h), at.encode()); - } - } - - // due to the fork pruning, this range actually might go too far above where our actual highest block is, - // if a relatively short fork is canonicalized. - let new_range = StoredBlockRange( - canon_number + 1, - std::cmp::max(range.1, canon_number + 2), - ).encode(); - - transaction.put_vec(config.col_data, &STORED_BLOCKS_KEY[..], new_range); - - // Update the values on-disk. - store.write(transaction).map_err(Into::into) -} - -fn load_decode(store: &dyn KeyValueDB, col_data: u32, key: &[u8]) - -> Result> +pub(crate) fn load_decode(store: &dyn KeyValueDB, col_data: u32, key: &[u8]) -> Result> { match store.get(col_data, key)? { None => Ok(None), @@ -307,282 +267,8 @@ fn load_decode(store: &dyn KeyValueDB, col_data: u32, key: &[u8]) } } -/// Information about a new candidate necessary to instantiate the requisite -/// candidate and approval entries. -#[derive(Clone)] -pub(crate) struct NewCandidateInfo { - pub candidate: CandidateReceipt, - pub backing_group: GroupIndex, - pub our_assignment: Option, -} - -/// Record a new block entry. -/// -/// This will update the blocks-at-height mapping, the stored block range, if necessary, -/// and add block and candidate entries. It will also add approval entries to existing -/// candidate entries and add this as a child of any block entry corresponding to the -/// parent hash. -/// -/// Has no effect if there is already an entry for the block or `candidate_info` returns -/// `None` for any of the candidates referenced by the block entry. In these cases, -/// no information about new candidates will be referred to by this function. -pub(crate) fn add_block_entry( - store: &dyn KeyValueDB, - config: &Config, - entry: BlockEntry, - n_validators: usize, - candidate_info: impl Fn(&CandidateHash) -> Option, -) -> Result> { - let mut transaction = DBTransaction::new(); - let session = entry.session; - let parent_hash = entry.parent_hash; - let number = entry.block_number; - - // Update the stored block range. - { - let new_range = match load_stored_blocks(store, config)? { - None => Some(StoredBlockRange(number, number + 1)), - Some(range) => if range.1 <= number { - Some(StoredBlockRange(range.0, number + 1)) - } else { - None - } - }; - - new_range.map(|n| transaction.put_vec(config.col_data, &STORED_BLOCKS_KEY[..], n.encode())) - }; - - // Update the blocks at height meta key. - { - let mut blocks_at_height = load_blocks_at_height(store, config, number)?; - if blocks_at_height.contains(&entry.block_hash) { - // seems we already have a block entry for this block. nothing to do here. - return Ok(Vec::new()) - } - - blocks_at_height.push(entry.block_hash); - transaction.put_vec(config.col_data, &blocks_at_height_key(number)[..], blocks_at_height.encode()) - }; - - let mut candidate_entries = Vec::with_capacity(entry.candidates.len()); - - // read and write all updated entries. - { - for &(_, ref candidate_hash) in &entry.candidates { - let NewCandidateInfo { - candidate, - backing_group, - our_assignment, - } = match candidate_info(candidate_hash) { - None => return Ok(Vec::new()), - Some(info) => info, - }; - - let mut candidate_entry = load_candidate_entry(store, config, &candidate_hash)? - .unwrap_or_else(move || CandidateEntry { - candidate, - session, - block_assignments: BTreeMap::new(), - approvals: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], - }); - - candidate_entry.block_assignments.insert( - entry.block_hash, - ApprovalEntry { - tranches: Vec::new(), - backing_group, - our_assignment, - our_approval_sig: None, - assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], - approved: false, - } - ); - - transaction.put_vec( - config.col_data, - &candidate_entry_key(&candidate_hash)[..], - candidate_entry.encode(), - ); - - candidate_entries.push((*candidate_hash, candidate_entry)); - } - }; - - // Update the child index for the parent. - load_block_entry(store, config, &parent_hash)?.map(|mut e| { - e.children.push(entry.block_hash); - transaction.put_vec(config.col_data, &block_entry_key(&parent_hash)[..], e.encode()) - }); - - // Put the new block entry in. - transaction.put_vec(config.col_data, &block_entry_key(&entry.block_hash)[..], entry.encode()); - - store.write(transaction)?; - Ok(candidate_entries) -} - -/// Forcibly approve all candidates included at up to the given relay-chain height in the indicated -/// chain. -/// -/// Returns a list of block hashes that were not approved and are now. -pub fn force_approve( - store: &dyn KeyValueDB, - db_config: Config, - chain_head: Hash, - up_to: BlockNumber, -) -> Result> { - enum State { - WalkTo, - Approving, - } - - let mut approved_hashes = Vec::new(); - - let mut cur_hash = chain_head; - let mut state = State::WalkTo; - - let mut tx = Transaction::new(db_config); - - // iterate back to the `up_to` block, and then iterate backwards until all blocks - // are updated. - while let Some(mut entry) = load_block_entry(store, &db_config, &cur_hash)? { - - if entry.block_number <= up_to { - state = State::Approving; - } - - cur_hash = entry.parent_hash; - - match state { - State::WalkTo => {}, - State::Approving => { - let is_approved = entry.approved_bitfield.count_ones() - == entry.approved_bitfield.len(); - - if !is_approved { - entry.approved_bitfield.iter_mut().for_each(|mut b| *b = true); - approved_hashes.push(entry.block_hash); - tx.put_block_entry(entry); - } - } - } - } - - tx.write(store)?; - Ok(approved_hashes) -} - -/// Return all blocks which have entries in the DB, ascending, by height. -pub(crate) fn load_all_blocks(store: &dyn KeyValueDB, config: &Config) -> Result> { - let stored_blocks = load_stored_blocks(store, config)?; - - let mut hashes = Vec::new(); - for height in stored_blocks.into_iter().flat_map(|s| s.0..s.1) { - hashes.extend(load_blocks_at_height(store, config, height)?); - } - - Ok(hashes) -} - -// An atomic transaction of multiple candidate or block entries. -#[must_use = "Transactions do nothing unless written to a DB"] -pub struct Transaction { - block_entries: HashMap, - candidate_entries: HashMap, - config: Config, -} - -impl Transaction { - pub(crate) fn new(config: Config) -> Self { - Transaction { - block_entries: HashMap::default(), - candidate_entries: HashMap::default(), - config, - } - } - - /// Put a block entry in the transaction, overwriting any other with the - /// same hash. - pub(crate) fn put_block_entry(&mut self, entry: BlockEntry) { - let hash = entry.block_hash; - let _ = self.block_entries.insert(hash, entry); - } - - /// Put a candidate entry in the transaction, overwriting any other with the - /// same hash. - pub(crate) fn put_candidate_entry(&mut self, hash: CandidateHash, entry: CandidateEntry) { - let _ = self.candidate_entries.insert(hash, entry); - } - - /// Returns true if the transaction contains no actions - pub(crate) fn is_empty(&self) -> bool { - self.block_entries.is_empty() && self.candidate_entries.is_empty() - } - - /// Write the contents of the transaction, atomically, to the DB. - pub(crate) fn write(self, db: &dyn KeyValueDB) -> Result<()> { - if self.is_empty() { - return Ok(()) - } - - let mut db_transaction = DBTransaction::new(); - - for (hash, entry) in self.block_entries { - let k = block_entry_key(&hash); - let v = entry.encode(); - - db_transaction.put_vec(self.config.col_data, &k, v); - } - - for (hash, entry) in self.candidate_entries { - let k = candidate_entry_key(&hash); - let v = entry.encode(); - - db_transaction.put_vec(self.config.col_data, &k, v); - } - - db.write(db_transaction).map_err(Into::into) - } -} - -/// Load the stored-blocks key from the state. -fn load_stored_blocks(store: &dyn KeyValueDB, config: &Config) - -> Result> -{ - load_decode(store, config.col_data, STORED_BLOCKS_KEY) -} - -/// Load a blocks-at-height entry for a given block number. -pub(crate) fn load_blocks_at_height( - store: &dyn KeyValueDB, - config: &Config, - block_number: BlockNumber, -) - -> Result> { - load_decode(store, config.col_data, &blocks_at_height_key(block_number)) - .map(|x| x.unwrap_or_default()) -} - -/// Load a block entry from the aux store. -pub(crate) fn load_block_entry(store: &dyn KeyValueDB, config: &Config, block_hash: &Hash) - -> Result> -{ - load_decode(store, config.col_data, &block_entry_key(block_hash)) -} - -/// Load a candidate entry from the aux store. -pub(crate) fn load_candidate_entry( - store: &dyn KeyValueDB, - config: &Config, - candidate_hash: &CandidateHash, -) - -> Result> -{ - load_decode(store, config.col_data, &candidate_entry_key(candidate_hash)) -} - /// The key a given block entry is stored under. -fn block_entry_key(block_hash: &Hash) -> [u8; 46] { +pub(crate) fn block_entry_key(block_hash: &Hash) -> [u8; 46] { const BLOCK_ENTRY_PREFIX: [u8; 14] = *b"Approvals_blck"; let mut key = [0u8; 14 + 32]; @@ -593,7 +279,7 @@ fn block_entry_key(block_hash: &Hash) -> [u8; 46] { } /// The key a given candidate entry is stored under. -fn candidate_entry_key(candidate_hash: &CandidateHash) -> [u8; 46] { +pub(crate) fn candidate_entry_key(candidate_hash: &CandidateHash) -> [u8; 46] { const CANDIDATE_ENTRY_PREFIX: [u8; 14] = *b"Approvals_cand"; let mut key = [0u8; 14 + 32]; @@ -604,7 +290,7 @@ fn candidate_entry_key(candidate_hash: &CandidateHash) -> [u8; 46] { } /// The key a set of block hashes corresponding to a block number is stored under. -fn blocks_at_height_key(block_number: BlockNumber) -> [u8; 16] { +pub(crate) fn blocks_at_height_key(block_number: BlockNumber) -> [u8; 16] { const BLOCKS_AT_HEIGHT_PREFIX: [u8; 12] = *b"Approvals_at"; let mut key = [0u8; 12 + 4]; @@ -613,3 +299,59 @@ fn blocks_at_height_key(block_number: BlockNumber) -> [u8; 16] { key } + +/// Return all blocks which have entries in the DB, ascending, by height. +pub fn load_all_blocks(store: &dyn KeyValueDB, config: &Config) -> SubsystemResult> { + let mut hashes = Vec::new(); + if let Some(stored_blocks) = load_stored_blocks(store, config)? { + for height in stored_blocks.0..stored_blocks.1 { + let blocks = load_blocks_at_height(store, config, &height)?; + hashes.extend(blocks); + } + + } + + Ok(hashes) +} + +/// Load the stored-blocks key from the state. +pub fn load_stored_blocks( + store: &dyn KeyValueDB, + config: &Config, +) -> SubsystemResult> { + load_decode(store, config.col_data, STORED_BLOCKS_KEY) + .map_err(|e| SubsystemError::with_origin("approval-voting", e)) +} + +/// Load a blocks-at-height entry for a given block number. +pub fn load_blocks_at_height( + store: &dyn KeyValueDB, + config: &Config, + block_number: &BlockNumber, +) -> SubsystemResult> { + load_decode(store, config.col_data, &blocks_at_height_key(*block_number)) + .map(|x| x.unwrap_or_default()) + .map_err(|e| SubsystemError::with_origin("approval-voting", e)) +} + +/// Load a block entry from the aux store. +pub fn load_block_entry( + store: &dyn KeyValueDB, + config: &Config, + block_hash: &Hash, +) -> SubsystemResult> { + load_decode(store, config.col_data, &block_entry_key(block_hash)) + .map(|u: Option| u.map(|v| v.into())) + .map_err(|e| SubsystemError::with_origin("approval-voting", e)) +} + +/// Load a candidate entry from the aux store. +pub fn load_candidate_entry( + store: &dyn KeyValueDB, + config: &Config, + candidate_hash: &CandidateHash, +) -> SubsystemResult> { + load_decode(store, config.col_data, &candidate_entry_key(candidate_hash)) + .map(|u: Option| u.map(|v| v.into())) + .map_err(|e| SubsystemError::with_origin("approval-voting", e)) +} diff --git a/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs b/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs index d0056cd98d..b9b19e3f1c 100644 --- a/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs +++ b/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs @@ -17,7 +17,13 @@ //! Tests for the aux-schema of approval voting. use super::*; +use std::sync::Arc; +use std::collections::HashMap; use polkadot_primitives::v1::Id as ParaId; +use kvdb::KeyValueDB; +use super::{DbBackend, StoredBlockRange}; +use crate::backend::{Backend, OverlayedBackend}; +use crate::ops::{NewCandidateInfo, add_block_entry, force_approve, canonicalize}; const DATA_COL: u32 = 0; const NUM_COLUMNS: u32 = 1; @@ -26,36 +32,9 @@ const TEST_CONFIG: Config = Config { col_data: DATA_COL, }; -pub(crate) fn write_stored_blocks(tx: &mut DBTransaction, range: StoredBlockRange) { - tx.put_vec( - DATA_COL, - &STORED_BLOCKS_KEY[..], - range.encode(), - ); -} - -pub(crate) fn write_blocks_at_height(tx: &mut DBTransaction, height: BlockNumber, blocks: &[Hash]) { - tx.put_vec( - DATA_COL, - &blocks_at_height_key(height)[..], - blocks.encode(), - ); -} - -pub(crate) fn write_block_entry(tx: &mut DBTransaction, block_hash: &Hash, entry: &BlockEntry) { - tx.put_vec( - DATA_COL, - &block_entry_key(block_hash)[..], - entry.encode(), - ); -} - -pub(crate) fn write_candidate_entry(tx: &mut DBTransaction, candidate_hash: &CandidateHash, entry: &CandidateEntry) { - tx.put_vec( - DATA_COL, - &candidate_entry_key(candidate_hash)[..], - entry.encode(), - ); +fn make_db() -> (DbBackend, Arc) { + let db_writer: Arc = Arc::new(kvdb_memorydb::create(NUM_COLUMNS)); + (DbBackend::new(db_writer.clone(), TEST_CONFIG), db_writer) } fn make_bitvec(len: usize) -> BitVec { @@ -92,11 +71,11 @@ fn make_candidate(para_id: ParaId, relay_parent: Hash) -> CandidateReceipt { #[test] fn read_write() { - let store = kvdb_memorydb::create(NUM_COLUMNS); + let (mut db, store) = make_db(); let hash_a = Hash::repeat_byte(1); let hash_b = Hash::repeat_byte(2); - let candidate_hash = CandidateHash(Hash::repeat_byte(3)); + let candidate_hash = CandidateReceipt::::default().hash(); let range = StoredBlockRange(10, 20); let at_height = vec![hash_a, hash_b]; @@ -124,53 +103,48 @@ fn read_write() { approvals: Default::default(), }; - let mut tx = DBTransaction::new(); + let mut overlay_db = OverlayedBackend::new(&db); + overlay_db.write_stored_block_range(range.clone()); + overlay_db.write_blocks_at_height(1, at_height.clone()); + overlay_db.write_block_entry(block_entry.clone().into()); + overlay_db.write_candidate_entry(candidate_entry.clone().into()); - write_stored_blocks(&mut tx, range.clone()); - write_blocks_at_height(&mut tx, 1, &at_height); - write_block_entry(&mut tx, &hash_a, &block_entry); - write_candidate_entry(&mut tx, &candidate_hash, &candidate_entry); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); - store.write(tx).unwrap(); - - assert_eq!(load_stored_blocks(&store, &TEST_CONFIG).unwrap(), Some(range)); - assert_eq!(load_blocks_at_height(&store, &TEST_CONFIG, 1).unwrap(), at_height); - assert_eq!(load_block_entry(&store, &TEST_CONFIG, &hash_a).unwrap(), Some(block_entry)); + assert_eq!(load_stored_blocks(store.as_ref(), &TEST_CONFIG).unwrap(), Some(range)); + assert_eq!(load_blocks_at_height(store.as_ref(), &TEST_CONFIG, &1).unwrap(), at_height); + assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &hash_a).unwrap(), Some(block_entry.into())); assert_eq!( - load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash).unwrap(), - Some(candidate_entry), + load_candidate_entry(store.as_ref(), &TEST_CONFIG, &candidate_hash).unwrap(), + Some(candidate_entry.into()), ); - let delete_keys = vec![ - STORED_BLOCKS_KEY.to_vec(), - blocks_at_height_key(1).to_vec(), - block_entry_key(&hash_a).to_vec(), - candidate_entry_key(&candidate_hash).to_vec(), - ]; + let mut overlay_db = OverlayedBackend::new(&db); + overlay_db.delete_blocks_at_height(1); + overlay_db.delete_block_entry(&hash_a); + overlay_db.delete_candidate_entry(&candidate_hash); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); - let mut tx = DBTransaction::new(); - for key in delete_keys { - tx.delete(DATA_COL, &key[..]); - } - - store.write(tx).unwrap(); - - assert!(load_stored_blocks(&store, &TEST_CONFIG).unwrap().is_none()); - assert!(load_blocks_at_height(&store, &TEST_CONFIG, 1).unwrap().is_empty()); - assert!(load_block_entry(&store, &TEST_CONFIG, &hash_a).unwrap().is_none()); - assert!(load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash).unwrap().is_none()); + assert!(load_blocks_at_height(store.as_ref(), &TEST_CONFIG, &1).unwrap().is_empty()); + assert!(load_block_entry(store.as_ref(), &TEST_CONFIG, &hash_a).unwrap().is_none()); + assert!(load_candidate_entry(store.as_ref(), &TEST_CONFIG, &candidate_hash).unwrap().is_none()); } #[test] fn add_block_entry_works() { - let store = kvdb_memorydb::create(NUM_COLUMNS); + let (mut db, store) = make_db(); let parent_hash = Hash::repeat_byte(1); let block_hash_a = Hash::repeat_byte(2); let block_hash_b = Hash::repeat_byte(69); - let candidate_hash_a = CandidateHash(Hash::repeat_byte(3)); - let candidate_hash_b = CandidateHash(Hash::repeat_byte(4)); + let candidate_receipt_a = make_candidate(1.into(), parent_hash); + let candidate_receipt_b = make_candidate(2.into(), parent_hash); + + let candidate_hash_a = candidate_receipt_a.hash(); + let candidate_hash_b = candidate_receipt_b.hash(); let block_number = 10; @@ -191,49 +165,53 @@ fn add_block_entry_works() { let n_validators = 10; let mut new_candidate_info = HashMap::new(); - new_candidate_info.insert(candidate_hash_a, NewCandidateInfo { - candidate: make_candidate(1.into(), parent_hash), - backing_group: GroupIndex(0), - our_assignment: None, - }); + new_candidate_info.insert(candidate_hash_a, NewCandidateInfo::new( + candidate_receipt_a, + GroupIndex(0), + None, + )); + let mut overlay_db = OverlayedBackend::new(&db); add_block_entry( - &store, - &TEST_CONFIG, - block_entry_a.clone(), + &mut overlay_db, + block_entry_a.clone().into(), n_validators, |h| new_candidate_info.get(h).map(|x| x.clone()), ).unwrap(); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); - new_candidate_info.insert(candidate_hash_b, NewCandidateInfo { - candidate: make_candidate(2.into(), parent_hash), - backing_group: GroupIndex(1), - our_assignment: None, - }); + new_candidate_info.insert(candidate_hash_b, NewCandidateInfo::new( + candidate_receipt_b, + GroupIndex(1), + None, + )); + let mut overlay_db = OverlayedBackend::new(&db); add_block_entry( - &store, - &TEST_CONFIG, - block_entry_b.clone(), + &mut overlay_db, + block_entry_b.clone().into(), n_validators, |h| new_candidate_info.get(h).map(|x| x.clone()), ).unwrap(); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); - assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a)); - assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b)); + assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a.into())); + assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b.into())); - let candidate_entry_a = load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash_a) + let candidate_entry_a = load_candidate_entry(store.as_ref(), &TEST_CONFIG, &candidate_hash_a) .unwrap().unwrap(); assert_eq!(candidate_entry_a.block_assignments.keys().collect::>(), vec![&block_hash_a, &block_hash_b]); - let candidate_entry_b = load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash_b) + let candidate_entry_b = load_candidate_entry(store.as_ref(), &TEST_CONFIG, &candidate_hash_b) .unwrap().unwrap(); assert_eq!(candidate_entry_b.block_assignments.keys().collect::>(), vec![&block_hash_b]); } #[test] fn add_block_entry_adds_child() { - let store = kvdb_memorydb::create(NUM_COLUMNS); + let (mut db, store) = make_db(); let parent_hash = Hash::repeat_byte(1); let block_hash_a = Hash::repeat_byte(2); @@ -255,31 +233,33 @@ fn add_block_entry_adds_child() { let n_validators = 10; + let mut overlay_db = OverlayedBackend::new(&db); add_block_entry( - &store, - &TEST_CONFIG, - block_entry_a.clone(), + &mut overlay_db, + block_entry_a.clone().into(), n_validators, |_| None, ).unwrap(); add_block_entry( - &store, - &TEST_CONFIG, - block_entry_b.clone(), + &mut overlay_db, + block_entry_b.clone().into(), n_validators, |_| None, ).unwrap(); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); + block_entry_a.children.push(block_hash_b); - assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a)); - assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b)); + assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a.into())); + assert_eq!(load_block_entry(store.as_ref(), &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b.into())); } #[test] fn canonicalize_works() { - let store = kvdb_memorydb::create(NUM_COLUMNS); + let (mut db, store) = make_db(); // -> B1 -> C1 -> D1 // A -> B2 -> C2 -> D2 @@ -296,9 +276,10 @@ fn canonicalize_works() { let n_validators = 10; - let mut tx = DBTransaction::new(); - write_stored_blocks(&mut tx, StoredBlockRange(1, 5)); - store.write(tx).unwrap(); + let mut overlay_db = OverlayedBackend::new(&db); + overlay_db.write_stored_block_range(StoredBlockRange(1, 5)); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); let genesis = Hash::repeat_byte(0); @@ -310,11 +291,17 @@ fn canonicalize_works() { let block_hash_d1 = Hash::repeat_byte(6); let block_hash_d2 = Hash::repeat_byte(7); - let cand_hash_1 = CandidateHash(Hash::repeat_byte(10)); - let cand_hash_2 = CandidateHash(Hash::repeat_byte(11)); - let cand_hash_3 = CandidateHash(Hash::repeat_byte(12)); - let cand_hash_4 = CandidateHash(Hash::repeat_byte(13)); - let cand_hash_5 = CandidateHash(Hash::repeat_byte(15)); + let candidate_receipt_genesis = make_candidate(1.into(), genesis); + let candidate_receipt_a = make_candidate(2.into(), block_hash_a); + let candidate_receipt_b = make_candidate(3.into(), block_hash_a); + let candidate_receipt_b1 = make_candidate(4.into(), block_hash_b1); + let candidate_receipt_c1 = make_candidate(5.into(), block_hash_c1); + + let cand_hash_1 = candidate_receipt_genesis.hash(); + let cand_hash_2 = candidate_receipt_a.hash(); + let cand_hash_3 = candidate_receipt_b.hash(); + let cand_hash_4 = candidate_receipt_b1.hash(); + let cand_hash_5 = candidate_receipt_c1.hash(); let block_entry_a = make_block_entry(block_hash_a, genesis, 1, Vec::new()); let block_entry_b1 = make_block_entry(block_hash_b1, block_hash_a, 2, Vec::new()); @@ -344,38 +331,37 @@ fn canonicalize_works() { vec![(CoreIndex(0), cand_hash_5)], ); - let candidate_info = { let mut candidate_info = HashMap::new(); - candidate_info.insert(cand_hash_1, NewCandidateInfo { - candidate: make_candidate(1.into(), genesis), - backing_group: GroupIndex(1), - our_assignment: None, - }); + candidate_info.insert(cand_hash_1, NewCandidateInfo::new( + candidate_receipt_genesis, + GroupIndex(1), + None, + )); - candidate_info.insert(cand_hash_2, NewCandidateInfo { - candidate: make_candidate(2.into(), block_hash_a), - backing_group: GroupIndex(2), - our_assignment: None, - }); + candidate_info.insert(cand_hash_2, NewCandidateInfo::new( + candidate_receipt_a, + GroupIndex(2), + None, + )); - candidate_info.insert(cand_hash_3, NewCandidateInfo { - candidate: make_candidate(3.into(), block_hash_a), - backing_group: GroupIndex(3), - our_assignment: None, - }); + candidate_info.insert(cand_hash_3, NewCandidateInfo::new( + candidate_receipt_b, + GroupIndex(3), + None, + )); - candidate_info.insert(cand_hash_4, NewCandidateInfo { - candidate: make_candidate(4.into(), block_hash_b1), - backing_group: GroupIndex(4), - our_assignment: None, - }); + candidate_info.insert(cand_hash_4, NewCandidateInfo::new( + candidate_receipt_b1, + GroupIndex(4), + None, + )); - candidate_info.insert(cand_hash_5, NewCandidateInfo { - candidate: make_candidate(5.into(), block_hash_c1), - backing_group: GroupIndex(5), - our_assignment: None, - }); + candidate_info.insert(cand_hash_5, NewCandidateInfo::new( + candidate_receipt_c1, + GroupIndex(5), + None, + )); candidate_info }; @@ -391,25 +377,27 @@ fn canonicalize_works() { block_entry_d2.clone(), ]; + let mut overlay_db = OverlayedBackend::new(&db); for block_entry in blocks { add_block_entry( - &store, - &TEST_CONFIG, - block_entry, + &mut overlay_db, + block_entry.into(), n_validators, |h| candidate_info.get(h).map(|x| x.clone()), ).unwrap(); } + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); let check_candidates_in_store = |expected: Vec<(CandidateHash, Option>)>| { for (c_hash, in_blocks) in expected { let (entry, in_blocks) = match in_blocks { None => { - assert!(load_candidate_entry(&store, &TEST_CONFIG, &c_hash).unwrap().is_none()); + assert!(load_candidate_entry(store.as_ref(), &TEST_CONFIG, &c_hash).unwrap().is_none()); continue } Some(i) => ( - load_candidate_entry(&store, &TEST_CONFIG, &c_hash).unwrap().unwrap(), + load_candidate_entry(store.as_ref(), &TEST_CONFIG, &c_hash).unwrap().unwrap(), i, ), }; @@ -426,11 +414,11 @@ fn canonicalize_works() { for (hash, with_candidates) in expected { let (entry, with_candidates) = match with_candidates { None => { - assert!(load_block_entry(&store, &TEST_CONFIG, &hash).unwrap().is_none()); + assert!(load_block_entry(store.as_ref(), &TEST_CONFIG, &hash).unwrap().is_none()); continue } Some(i) => ( - load_block_entry(&store, &TEST_CONFIG, &hash).unwrap().unwrap(), + load_block_entry(store.as_ref(), &TEST_CONFIG, &hash).unwrap().unwrap(), i, ), }; @@ -461,9 +449,12 @@ fn canonicalize_works() { (block_hash_d2, Some(vec![cand_hash_5])), ]); - canonicalize(&store, &TEST_CONFIG, 3, block_hash_c1).unwrap(); + let mut overlay_db = OverlayedBackend::new(&db); + canonicalize(&mut overlay_db, 3, block_hash_c1).unwrap(); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); - assert_eq!(load_stored_blocks(&store, &TEST_CONFIG).unwrap().unwrap(), StoredBlockRange(4, 5)); + assert_eq!(load_stored_blocks(store.as_ref(), &TEST_CONFIG).unwrap().unwrap(), StoredBlockRange(4, 5)); check_candidates_in_store(vec![ (cand_hash_1, None), @@ -486,22 +477,23 @@ fn canonicalize_works() { #[test] fn force_approve_works() { - let store = kvdb_memorydb::create(NUM_COLUMNS); + let (mut db, store) = make_db(); let n_validators = 10; - let mut tx = DBTransaction::new(); - write_stored_blocks(&mut tx, StoredBlockRange(1, 4)); - store.write(tx).unwrap(); + let mut overlay_db = OverlayedBackend::new(&db); + overlay_db.write_stored_block_range(StoredBlockRange(1, 4)); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); let candidate_hash = CandidateHash(Hash::repeat_byte(42)); let single_candidate_vec = vec![(CoreIndex(0), candidate_hash)]; let candidate_info = { let mut candidate_info = HashMap::new(); - candidate_info.insert(candidate_hash, NewCandidateInfo { - candidate: make_candidate(1.into(), Default::default()), - backing_group: GroupIndex(1), - our_assignment: None, - }); + candidate_info.insert(candidate_hash, NewCandidateInfo::new( + make_candidate(1.into(), Default::default()), + GroupIndex(1), + None, + )); candidate_info }; @@ -524,35 +516,36 @@ fn force_approve_works() { block_entry_d.clone(), ]; + let mut overlay_db = OverlayedBackend::new(&db); for block_entry in blocks { add_block_entry( - &store, - &TEST_CONFIG, - block_entry, + &mut overlay_db, + block_entry.into(), n_validators, |h| candidate_info.get(h).map(|x| x.clone()), ).unwrap(); } - - let approved_hashes = force_approve(&store, TEST_CONFIG, block_hash_d, 2).unwrap(); + let approved_hashes = force_approve(&mut overlay_db, block_hash_d, 2).unwrap(); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); assert!(load_block_entry( - &store, + store.as_ref(), &TEST_CONFIG, &block_hash_a, ).unwrap().unwrap().approved_bitfield.all()); assert!(load_block_entry( - &store, + store.as_ref(), &TEST_CONFIG, &block_hash_b, ).unwrap().unwrap().approved_bitfield.all()); assert!(load_block_entry( - &store, + store.as_ref(), &TEST_CONFIG, &block_hash_c, ).unwrap().unwrap().approved_bitfield.not_any()); assert!(load_block_entry( - &store, + store.as_ref(), &TEST_CONFIG, &block_hash_d, ).unwrap().unwrap().approved_bitfield.not_any()); @@ -564,7 +557,7 @@ fn force_approve_works() { #[test] fn load_all_blocks_works() { - let store = kvdb_memorydb::create(NUM_COLUMNS); + let (mut db, store) = make_db(); let parent_hash = Hash::repeat_byte(1); let block_hash_a = Hash::repeat_byte(2); @@ -596,34 +589,35 @@ fn load_all_blocks_works() { let n_validators = 10; + let mut overlay_db = OverlayedBackend::new(&db); add_block_entry( - &store, - &TEST_CONFIG, - block_entry_a.clone(), + &mut overlay_db, + block_entry_a.clone().into(), n_validators, |_| None ).unwrap(); // add C before B to test sorting. add_block_entry( - &store, - &TEST_CONFIG, - block_entry_c.clone(), + &mut overlay_db, + block_entry_c.clone().into(), n_validators, |_| None ).unwrap(); add_block_entry( - &store, - &TEST_CONFIG, - block_entry_b.clone(), + &mut overlay_db, + block_entry_b.clone().into(), n_validators, |_| None ).unwrap(); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); + assert_eq!( load_all_blocks( - &store, + store.as_ref(), &TEST_CONFIG ).unwrap(), vec![block_hash_a, block_hash_b, block_hash_c], diff --git a/polkadot/node/core/approval-voting/src/backend.rs b/polkadot/node/core/approval-voting/src/backend.rs new file mode 100644 index 0000000000..14cd43dba1 --- /dev/null +++ b/polkadot/node/core/approval-voting/src/backend.rs @@ -0,0 +1,194 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! An abstraction over storage used by the chain selection subsystem. +//! +//! This provides both a [`Backend`] trait and an [`OverlayedBackend`] +//! struct which allows in-memory changes to be applied on top of a +//! [`Backend`], maintaining consistency between queries and temporary writes, +//! before any commit to the underlying storage is made. + +use polkadot_node_subsystem::{SubsystemResult}; +use polkadot_primitives::v1::{BlockNumber, CandidateHash, Hash}; + +use std::collections::HashMap; + +use super::approval_db::v1::StoredBlockRange; +use super::persisted_entries::{BlockEntry, CandidateEntry}; + +#[derive(Debug)] +pub enum BackendWriteOp { + WriteStoredBlockRange(StoredBlockRange), + WriteBlocksAtHeight(BlockNumber, Vec), + WriteBlockEntry(BlockEntry), + WriteCandidateEntry(CandidateEntry), + DeleteBlocksAtHeight(BlockNumber), + DeleteBlockEntry(Hash), + DeleteCandidateEntry(CandidateHash), +} + +/// An abstraction over backend storage for the logic of this subsystem. +pub trait Backend { + /// Load a block entry from the DB. + fn load_block_entry(&self, hash: &Hash) -> SubsystemResult>; + /// Load a candidate entry from the DB. + fn load_candidate_entry(&self, candidate_hash: &CandidateHash) -> SubsystemResult>; + /// Load all blocks at a specific height. + fn load_blocks_at_height(&self, height: &BlockNumber) -> SubsystemResult>; + /// Load all block from the DB. + fn load_all_blocks(&self) -> SubsystemResult>; + /// Load stored block range form the DB. + fn load_stored_blocks(&self) -> SubsystemResult>; + /// Atomically write the list of operations, with later operations taking precedence over prior. + fn write(&mut self, ops: I) -> SubsystemResult<()> + where I: IntoIterator; +} + +/// An in-memory overlay over the backend. +/// +/// This maintains read-only access to the underlying backend, but can be +/// converted into a set of write operations which will, when written to +/// the underlying backend, give the same view as the state of the overlay. +pub struct OverlayedBackend<'a, B: 'a> { + inner: &'a B, + + // `None` means unchanged + stored_block_range: Option, + // `None` means 'deleted', missing means query inner. + blocks_at_height: HashMap>>, + // `None` means 'deleted', missing means query inner. + block_entries: HashMap>, + // `None` means 'deleted', missing means query inner. + candidate_entries: HashMap>, +} + +impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> { + pub fn new(backend: &'a B) -> Self { + OverlayedBackend { + inner: backend, + stored_block_range: None, + blocks_at_height: HashMap::new(), + block_entries: HashMap::new(), + candidate_entries: HashMap::new(), + } + } + + pub fn is_empty(&self) -> bool { + self.block_entries.is_empty() && + self.candidate_entries.is_empty() && + self.blocks_at_height.is_empty() && + self.stored_block_range.is_none() + } + + pub fn load_all_blocks(&self) -> SubsystemResult> { + let mut hashes = Vec::new(); + if let Some(stored_blocks) = self.load_stored_blocks()? { + for height in stored_blocks.0..stored_blocks.1 { + hashes.extend(self.load_blocks_at_height(&height)?); + } + } + + Ok(hashes) + } + + pub fn load_stored_blocks(&self) -> SubsystemResult> { + if let Some(val) = self.stored_block_range.clone() { + return Ok(Some(val)) + } + + self.inner.load_stored_blocks() + } + + pub fn load_blocks_at_height(&self, height: &BlockNumber) -> SubsystemResult> { + if let Some(val) = self.blocks_at_height.get(&height) { + return Ok(val.clone().unwrap_or_default()) + } + + self.inner.load_blocks_at_height(height) + } + + pub fn load_block_entry(&self, hash: &Hash) -> SubsystemResult> { + if let Some(val) = self.block_entries.get(&hash) { + return Ok(val.clone()) + } + + self.inner.load_block_entry(hash) + } + + pub fn load_candidate_entry(&self, candidate_hash: &CandidateHash) -> SubsystemResult> { + if let Some(val) = self.candidate_entries.get(&candidate_hash) { + return Ok(val.clone()) + } + + self.inner.load_candidate_entry(candidate_hash) + } + + // The assumption is that stored block range is only None on initialization. + // Therefore, there is no need to delete_stored_block_range. + pub fn write_stored_block_range(&mut self, range: StoredBlockRange) { + self.stored_block_range = Some(range); + } + + pub fn write_blocks_at_height(&mut self, height: BlockNumber, blocks: Vec) { + self.blocks_at_height.insert(height, Some(blocks)); + } + + pub fn delete_blocks_at_height(&mut self, height: BlockNumber) { + self.blocks_at_height.insert(height, None); + } + + pub fn write_block_entry(&mut self, entry: BlockEntry) { + self.block_entries.insert(entry.block_hash(), Some(entry)); + } + + pub fn delete_block_entry(&mut self, hash: &Hash) { + self.block_entries.insert(*hash, None); + } + + pub fn write_candidate_entry(&mut self, entry: CandidateEntry) { + self.candidate_entries.insert(entry.candidate_receipt().hash(), Some(entry)); + } + + pub fn delete_candidate_entry(&mut self, hash: &CandidateHash) { + self.candidate_entries.insert(*hash, None); + } + + /// Transform this backend into a set of write-ops to be written to the + /// inner backend. + pub fn into_write_ops(self) -> impl Iterator { + let blocks_at_height_ops = self.blocks_at_height.into_iter().map(|(h, v)| match v { + Some(v) => BackendWriteOp::WriteBlocksAtHeight(h, v), + None => BackendWriteOp::DeleteBlocksAtHeight(h), + }); + + let block_entry_ops = self.block_entries.into_iter().map(|(h, v)| match v { + Some(v) => BackendWriteOp::WriteBlockEntry(v), + None => BackendWriteOp::DeleteBlockEntry(h), + }); + + let candidate_entry_ops = self.candidate_entries.into_iter().map(|(h, v)| match v { + Some(v) => BackendWriteOp::WriteCandidateEntry(v), + None => BackendWriteOp::DeleteCandidateEntry(h), + }); + + self.stored_block_range + .map(|v| BackendWriteOp::WriteStoredBlockRange(v)) + .into_iter() + .chain(blocks_at_height_ops) + .chain(block_entry_ops) + .chain(candidate_entry_ops) + } +} diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index 6cdbfe8550..7f5558782f 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -49,7 +49,6 @@ use polkadot_node_primitives::approval::{ use polkadot_node_jaeger as jaeger; use sc_keystore::LocalKeystore; use sp_consensus_slots::Slot; -use kvdb::KeyValueDB; use futures::prelude::*; use futures::channel::oneshot; @@ -58,12 +57,13 @@ use bitvec::order::Lsb0 as BitOrderLsb0; use std::collections::HashMap; use std::convert::TryFrom; -use crate::approval_db::{self, v1::Config as DatabaseConfig}; +use super::approval_db::v1; +use crate::backend::{Backend, OverlayedBackend}; use crate::persisted_entries::CandidateEntry; use crate::criteria::{AssignmentCriteria, OurAssignment}; use crate::time::{slot_number_to_tick, Tick}; -use super::{LOG_TARGET, State, DBReader}; +use super::{LOG_TARGET, State}; struct ImportedBlockInfo { included_candidates: Vec<(CandidateHash, CandidateReceipt, CoreIndex, GroupIndex)>, @@ -284,11 +284,10 @@ pub struct BlockImportedCandidates { /// * and return information about all candidates imported under each block. /// /// It is the responsibility of the caller to schedule wakeups for each block. -pub(crate) async fn handle_new_head( +pub(crate) async fn handle_new_head<'a>( ctx: &mut impl SubsystemContext, - state: &mut State, - db_writer: &dyn KeyValueDB, - db_config: DatabaseConfig, + state: &mut State, + db: &mut OverlayedBackend<'a, impl Backend>, head: Hash, finalized_number: &Option, ) -> SubsystemResult> { @@ -345,7 +344,7 @@ pub(crate) async fn handle_new_head( let new_blocks = determine_new_blocks( ctx.sender(), - |h| state.db.load_block_entry(h).map(|e| e.is_some()), + |h| db.load_block_entry(h).map(|e| e.is_some()), head, &header, lower_bound_number, @@ -473,7 +472,7 @@ pub(crate) async fn handle_new_head( ctx.send_message(ChainSelectionMessage::Approved(block_hash).into()).await; } - let block_entry = approval_db::v1::BlockEntry { + let block_entry = v1::BlockEntry { block_hash, parent_hash: block_header.parent_hash, block_number: block_header.number, @@ -494,12 +493,7 @@ pub(crate) async fn handle_new_head( "Enacting force-approve", ); - let approved_hashes = approval_db::v1::force_approve( - db_writer, - db_config, - block_hash, - up_to, - ) + let approved_hashes = crate::ops::force_approve(db, block_hash, up_to) .map_err(|e| SubsystemError::with_origin("approval-voting", e))?; // Notify chain-selection of all approved hashes. @@ -515,18 +509,17 @@ pub(crate) async fn handle_new_head( "Writing BlockEntry", ); - let candidate_entries = approval_db::v1::add_block_entry( - db_writer, - &db_config, - block_entry, + let candidate_entries = crate::ops::add_block_entry( + db, + block_entry.into(), n_validators, |candidate_hash| { included_candidates.iter().find(|(hash, _, _, _)| candidate_hash == hash) - .map(|(_, receipt, core, backing_group)| approval_db::v1::NewCandidateInfo { - candidate: receipt.clone(), - backing_group: *backing_group, - our_assignment: assignments.get(core).map(|a| a.clone().into()), - }) + .map(|(_, receipt, core, backing_group)| super::ops::NewCandidateInfo::new( + receipt.clone(), + *backing_group, + assignments.get(core).map(|a| a.clone().into()), + )) } ).map_err(|e| SubsystemError::with_origin("approval-voting", e))?; approval_meta.push(BlockApprovalMeta { @@ -566,6 +559,7 @@ pub(crate) async fn handle_new_head( #[cfg(test)] mod tests { use super::*; + use crate::approval_db::v1::DbBackend; use polkadot_node_subsystem_test_helpers::make_subsystem_context; use polkadot_node_primitives::approval::{VRFOutput, VRFProof}; use polkadot_primitives::v1::{SessionInfo, ValidatorIndex}; @@ -580,8 +574,12 @@ mod tests { use assert_matches::assert_matches; use merlin::Transcript; use std::{pin::Pin, sync::Arc}; + use kvdb::KeyValueDB; - use crate::{APPROVAL_SESSIONS, criteria, BlockEntry}; + use crate::{ + APPROVAL_SESSIONS, criteria, BlockEntry, + approval_db::v1::Config as DatabaseConfig, + }; const DATA_COL: u32 = 0; const NUM_COLUMNS: u32 = 1; @@ -589,37 +587,6 @@ mod tests { const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_data: DATA_COL, }; - - #[derive(Default)] - struct TestDB { - block_entries: HashMap, - candidate_entries: HashMap, - } - - impl DBReader for TestDB { - fn load_block_entry( - &self, - block_hash: &Hash, - ) -> SubsystemResult> { - Ok(self.block_entries.get(block_hash).map(|c| c.clone())) - } - - fn load_candidate_entry( - &self, - candidate_hash: &CandidateHash, - ) -> SubsystemResult> { - Ok(self.candidate_entries.get(candidate_hash).map(|c| c.clone())) - } - - fn load_all_blocks(&self) -> SubsystemResult> { - let mut hashes: Vec<_> = self.block_entries.keys().cloned().collect(); - - hashes.sort_by_key(|k| self.block_entries.get(k).unwrap().block_number()); - - Ok(hashes) - } - } - #[derive(Default)] struct MockClock; @@ -635,20 +602,17 @@ mod tests { } } - fn blank_state() -> State { + fn blank_state() -> State { State { session_window: RollingSessionWindow::new(APPROVAL_SESSIONS), keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, - db: TestDB::default(), clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria), } } - fn single_session_state(index: SessionIndex, info: SessionInfo) - -> State - { + fn single_session_state(index: SessionIndex, info: SessionInfo) -> State { State { session_window: RollingSessionWindow::with_session_info( APPROVAL_SESSIONS, @@ -1162,6 +1126,10 @@ mod tests { #[test] fn insta_approval_works() { + let db_writer: Arc = Arc::new(kvdb_memorydb::create(NUM_COLUMNS)); + let mut db = DbBackend::new(db_writer.clone(), TEST_CONFIG); + let mut overlay_db = OverlayedBackend::new(&db); + let pool = TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); @@ -1221,9 +1189,8 @@ mod tests { .collect::>(); let mut state = single_session_state(session, session_info); - state.db.block_entries.insert( - parent_hash.clone(), - crate::approval_db::v1::BlockEntry { + overlay_db.write_block_entry( + v1::BlockEntry { block_hash: parent_hash.clone(), parent_hash: Default::default(), block_number: 4, @@ -1233,22 +1200,26 @@ mod tests { candidates: Vec::new(), approved_bitfield: Default::default(), children: Vec::new(), - }.into(), + }.into() ); - let db_writer = kvdb_memorydb::create(NUM_COLUMNS); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); let test_fut = { Box::pin(async move { + let mut overlay_db = OverlayedBackend::new(&db); let result = handle_new_head( &mut ctx, &mut state, - &db_writer, - TEST_CONFIG, + &mut overlay_db, hash, &Some(1), ).await.unwrap(); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); + assert_eq!(result.len(), 1); let candidates = &result[0].imported_candidates; assert_eq!(candidates.len(), 2); @@ -1256,8 +1227,8 @@ mod tests { assert_eq!(candidates[1].1.approvals().len(), 6); // the first candidate should be insta-approved // the second should not - let entry: BlockEntry = crate::approval_db::v1::load_block_entry( - &db_writer, + let entry: BlockEntry = v1::load_block_entry( + db_writer.as_ref(), &TEST_CONFIG, &hash, ) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 28cc1ca6a7..d2fb201d0d 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -72,12 +72,15 @@ use time::{slot_number_to_tick, Tick, Clock, ClockExt, SystemClock}; mod approval_checking; mod approval_db; +mod backend; mod criteria; mod import; +mod ops; mod time; mod persisted_entries; -use crate::approval_db::v1::Config as DatabaseConfig; +use crate::approval_db::v1::{DbBackend, Config as DatabaseConfig}; +use crate::backend::{Backend, OverlayedBackend}; #[cfg(test)] mod tests; @@ -330,11 +333,13 @@ impl Subsystem for ApprovalVotingSubsystem where C: SubsystemContext { fn start(self, ctx: C) -> SpawnedSubsystem { - let future = run::( + let backend = DbBackend::new(self.db.clone(), self.db_config); + let future = run::( ctx, self, Box::new(SystemClock), Box::new(RealAssignmentCriteria), + backend, ) .map_err(|e| SubsystemError::with_origin("approval-voting", e)) .boxed(); @@ -461,72 +466,6 @@ impl Wakeups { } } -/// A read-only handle to a database. -trait DBReader { - fn load_block_entry( - &self, - block_hash: &Hash, - ) -> SubsystemResult>; - - fn load_candidate_entry( - &self, - candidate_hash: &CandidateHash, - ) -> SubsystemResult>; - - fn load_all_blocks(&self) -> SubsystemResult>; -} - -// This is a submodule to enforce opacity of the inner DB type. -mod approval_db_v1_reader { - use super::{ - DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry, - SubsystemResult, SubsystemError, DatabaseConfig, approval_db, - }; - - /// A DB reader that uses the approval-db V1 under the hood. - pub(super) struct ApprovalDBV1Reader { - inner: T, - config: DatabaseConfig, - } - - impl ApprovalDBV1Reader { - pub(super) fn new(inner: T, config: DatabaseConfig) -> Self { - ApprovalDBV1Reader { - inner, - config, - } - } - } - - impl<'a, T: 'a> DBReader for ApprovalDBV1Reader - where T: std::ops::Deref - { - fn load_block_entry( - &self, - block_hash: &Hash, - ) -> SubsystemResult> { - approval_db::v1::load_block_entry(&*self.inner, &self.config, block_hash) - .map(|e| e.map(Into::into)) - .map_err(|e| SubsystemError::with_origin("approval-voting", e)) - } - - fn load_candidate_entry( - &self, - candidate_hash: &CandidateHash, - ) -> SubsystemResult> { - approval_db::v1::load_candidate_entry(&*self.inner, &self.config, candidate_hash) - .map(|e| e.map(Into::into)) - .map_err(|e| SubsystemError::with_origin("approval-voting", e)) - } - - fn load_all_blocks(&self) -> SubsystemResult> { - approval_db::v1::load_all_blocks(&*self.inner, &self.config) - .map_err(|e| SubsystemError::with_origin("approval-voting", e)) - } - } -} -use approval_db_v1_reader::ApprovalDBV1Reader; - struct ApprovalStatus { required_tranches: RequiredTranches, tranche_now: DelayTranche, @@ -636,16 +575,15 @@ impl CurrentlyCheckingSet { } } -struct State { +struct State { session_window: RollingSessionWindow, keystore: Arc, slot_duration_millis: u64, - db: T, clock: Box, assignment_criteria: Box, } -impl State { +impl State { fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> { self.session_window.session_info(i) } @@ -705,8 +643,6 @@ enum Action { candidate_hash: CandidateHash, tick: Tick, }, - WriteBlockEntry(BlockEntry), - WriteCandidateEntry(CandidateHash, CandidateEntry), LaunchApproval { candidate_hash: CandidateHash, indirect_cert: IndirectAssignmentCert, @@ -723,19 +659,21 @@ enum Action { Conclude, } -async fn run( +async fn run( mut ctx: C, mut subsystem: ApprovalVotingSubsystem, clock: Box, assignment_criteria: Box, + mut backend: B, ) -> SubsystemResult<()> - where C: SubsystemContext + where + C: SubsystemContext, + B: Backend, { let mut state = State { session_window: RollingSessionWindow::new(APPROVAL_SESSIONS), keystore: subsystem.keystore, slot_duration_millis: subsystem.slot_duration_millis, - db: ApprovalDBV1Reader::new(subsystem.db.clone(), subsystem.db_config.clone()), clock, assignment_criteria, }; @@ -746,14 +684,14 @@ async fn run( let mut last_finalized_height: Option = None; - let db_writer = &*subsystem.db; - loop { + let mut overlayed_db = OverlayedBackend::new(&backend); let actions = futures::select! { (tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { subsystem.metrics.on_wakeup(); process_wakeup( &mut state, + &mut overlayed_db, woken_block, woken_candidate, tick, @@ -763,9 +701,8 @@ async fn run( let mut actions = handle_from_overseer( &mut ctx, &mut state, + &mut overlayed_db, &subsystem.metrics, - db_writer, - subsystem.db_config, next_msg?, &mut last_finalized_height, &mut wakeups, @@ -814,17 +751,23 @@ async fn run( if handle_actions( &mut ctx, &mut state, + &mut overlayed_db, &subsystem.metrics, &mut wakeups, &mut currently_checking_set, &mut approvals_cache, - db_writer, - subsystem.db_config, &mut subsystem.mode, actions, ).await? { break; } + + if !overlayed_db.is_empty() { + let _timer = subsystem.metrics.time_db_transaction(); + + let ops = overlayed_db.into_write_ops(); + backend.write(ops)?; + } } Ok(()) @@ -851,17 +794,15 @@ async fn run( // returns `true` if any of the actions was a `Conclude` command. async fn handle_actions( ctx: &mut impl SubsystemContext, - state: &mut State, + state: &mut State, + overlayed_db: &mut OverlayedBackend<'_, impl Backend>, metrics: &Metrics, wakeups: &mut Wakeups, currently_checking_set: &mut CurrentlyCheckingSet, approvals_cache: &mut lru::LruCache, - db: &dyn KeyValueDB, - db_config: DatabaseConfig, mode: &mut Mode, actions: Vec, ) -> SubsystemResult { - let mut transaction = approval_db::v1::Transaction::new(db_config); let mut conclude = false; let mut actions_iter = actions.into_iter(); @@ -872,15 +813,7 @@ async fn handle_actions( block_number, candidate_hash, tick, - } => { - wakeups.schedule(block_hash, block_number, candidate_hash, tick) - } - Action::WriteBlockEntry(block_entry) => { - transaction.put_block_entry(block_entry.into()); - } - Action::WriteCandidateEntry(candidate_hash, candidate_entry) => { - transaction.put_candidate_entry(candidate_hash, candidate_entry.into()); - } + } => wakeups.schedule(block_hash, block_number, candidate_hash, tick), Action::IssueApproval(candidate_hash, approval_request) => { let mut sender = ctx.sender().clone(); // Note that the IssueApproval action will create additional @@ -897,6 +830,7 @@ async fn handle_actions( let next_actions: Vec = issue_approval( &mut sender, state, + overlayed_db, metrics, candidate_hash, approval_request, @@ -969,9 +903,7 @@ async fn handle_actions( Action::BecomeActive => { *mode = Mode::Active; - let messages = distribution_messages_for_activation( - ApprovalDBV1Reader::new(db, db_config) - )?; + let messages = distribution_messages_for_activation(overlayed_db)?; ctx.send_messages(messages.into_iter().map(Into::into)).await; } @@ -979,20 +911,13 @@ async fn handle_actions( } } - if !transaction.is_empty() { - let _timer = metrics.time_db_transaction(); - - transaction.write(db) - .map_err(|e| SubsystemError::with_origin("approval-voting", e))?; - } - Ok(conclude) } -fn distribution_messages_for_activation<'a>( - db: impl DBReader + 'a, +fn distribution_messages_for_activation( + db: &OverlayedBackend<'_, impl Backend>, ) -> SubsystemResult> { - let all_blocks = db.load_all_blocks()?; + let all_blocks: Vec = db.load_all_blocks()?; let mut approval_meta = Vec::with_capacity(all_blocks.len()); let mut messages = Vec::new(); @@ -1089,10 +1014,9 @@ fn distribution_messages_for_activation<'a>( // Handle an incoming signal from the overseer. Returns true if execution should conclude. async fn handle_from_overseer( ctx: &mut impl SubsystemContext, - state: &mut State, + state: &mut State, + db: &mut OverlayedBackend<'_, impl Backend>, metrics: &Metrics, - db_writer: &dyn KeyValueDB, - db_config: DatabaseConfig, x: FromOverseer, last_finalized_height: &mut Option, wakeups: &mut Wakeups, @@ -1107,8 +1031,7 @@ async fn handle_from_overseer( match import::handle_new_head( ctx, state, - db_writer, - db_config, + db, head, &*last_finalized_height, ).await { @@ -1162,7 +1085,7 @@ async fn handle_from_overseer( FromOverseer::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => { *last_finalized_height = Some(block_number); - approval_db::v1::canonicalize(db_writer, &db_config, block_number, block_hash) + crate::ops::canonicalize(db, block_number, block_hash) .map_err(|e| SubsystemError::with_origin("db", e))?; wakeups.prune_finalized_wakeups(block_number); @@ -1175,15 +1098,16 @@ async fn handle_from_overseer( FromOverseer::Communication { msg } => match msg { ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => { let (check_outcome, actions) - = check_and_import_assignment(state, a, claimed_core)?; + = check_and_import_assignment(state, db, a, claimed_core)?; let _ = res.send(check_outcome); + actions } ApprovalVotingMessage::CheckAndImportApproval(a, res) => { - check_and_import_approval(state, metrics, a, |r| { let _ = res.send(r); })?.0 + check_and_import_approval(state, db, metrics, a, |r| { let _ = res.send(r); })?.0 } ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res ) => { - match handle_approved_ancestor(ctx, &state.db, target, lower_bound, wakeups).await { + match handle_approved_ancestor(ctx, db, target, lower_bound, wakeups).await { Ok(v) => { let _ = res.send(v); } @@ -1203,7 +1127,7 @@ async fn handle_from_overseer( async fn handle_approved_ancestor( ctx: &mut impl SubsystemContext, - db: &impl DBReader, + db: &OverlayedBackend<'_, impl Backend>, target: Hash, lower_bound: BlockNumber, wakeups: &Wakeups, @@ -1491,14 +1415,15 @@ fn schedule_wakeup_action( } fn check_and_import_assignment( - state: &State, + state: &State, + db: &mut OverlayedBackend<'_, impl Backend>, assignment: IndirectAssignmentCert, candidate_index: CandidateIndex, ) -> SubsystemResult<(AssignmentCheckResult, Vec)> { const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds. let tick_now = state.clock.tick_now(); - let block_entry = match state.db.load_block_entry(&assignment.block_hash)? { + let block_entry = match db.load_block_entry(&assignment.block_hash)? { Some(b) => b, None => return Ok((AssignmentCheckResult::Bad( AssignmentCheckError::UnknownBlock(assignment.block_hash), @@ -1523,7 +1448,7 @@ fn check_and_import_assignment( ), Vec::new())), // no candidate at core. }; - let mut candidate_entry = match state.db.load_candidate_entry(&assigned_candidate_hash)? { + let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? { Some(c) => c, None => { return Ok((AssignmentCheckResult::Bad( @@ -1605,13 +1530,14 @@ fn check_and_import_assignment( } // We also write the candidate entry as it now contains the new candidate. - actions.push(Action::WriteCandidateEntry(assigned_candidate_hash, candidate_entry)); + db.write_candidate_entry(candidate_entry.into()); Ok((res, actions)) } fn check_and_import_approval( - state: &State, + state: &State, + db: &mut OverlayedBackend<'_, impl Backend>, metrics: &Metrics, approval: IndirectSignedApprovalVote, with_response: impl FnOnce(ApprovalCheckResult) -> T, @@ -1623,7 +1549,7 @@ fn check_and_import_approval( } } } - let block_entry = match state.db.load_block_entry(&approval.block_hash)? { + let block_entry = match db.load_block_entry(&approval.block_hash)? { Some(b) => b, None => { respond_early!(ApprovalCheckResult::Bad( @@ -1666,7 +1592,7 @@ fn check_and_import_approval( )) } - let candidate_entry = match state.db.load_candidate_entry(&approved_candidate_hash)? { + let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? { Some(c) => c, None => { respond_early!(ApprovalCheckResult::Bad( @@ -1704,6 +1630,7 @@ fn check_and_import_approval( let actions = import_checked_approval( state, + db, &metrics, block_entry, approved_candidate_hash, @@ -1738,7 +1665,8 @@ impl ApprovalSource { // validator on the candidate and block. This updates the block entry and candidate entry as // necessary and schedules any further wakeups. fn import_checked_approval( - state: &State, + state: &State, + db: &mut OverlayedBackend<'_, impl Backend>, metrics: &Metrics, mut block_entry: BlockEntry, candidate_hash: CandidateHash, @@ -1812,7 +1740,7 @@ fn import_checked_approval( actions.push(Action::NoteApprovedInChainSelection(block_hash)); } - actions.push(Action::WriteBlockEntry(block_entry)); + db.write_block_entry(block_entry.into()); } (is_approved, status) @@ -1856,11 +1784,11 @@ fn import_checked_approval( // // 1. The source is remote, as we don't store anything new in the approval entry. // 2. The candidate is not newly approved, as we haven't altered the approval entry's - // approved flag with `mark_approved` above. + // approved flag with `mark_approved` above. // 3. The source had already approved the candidate, as we haven't altered the bitfield. if !source.is_remote() || newly_approved || !already_approved_by { // In all other cases, we need to write the candidate entry. - actions.push(Action::WriteCandidateEntry(candidate_hash, candidate_entry)); + db.write_candidate_entry(candidate_entry); } } @@ -1904,7 +1832,8 @@ fn should_trigger_assignment( } fn process_wakeup( - state: &State, + state: &State, + db: &mut OverlayedBackend<'_, impl Backend>, relay_block: Hash, candidate_hash: CandidateHash, expected_tick: Tick, @@ -1917,8 +1846,8 @@ fn process_wakeup( .with_candidate(candidate_hash) .with_stage(jaeger::Stage::ApprovalChecking); - let block_entry = state.db.load_block_entry(&relay_block)?; - let candidate_entry = state.db.load_candidate_entry(&candidate_hash)?; + let block_entry = db.load_block_entry(&relay_block)?; + let candidate_entry = db.load_candidate_entry(&candidate_hash)?; // If either is not present, we have nothing to wakeup. Might have lost a race with finality let (block_entry, mut candidate_entry) = match (block_entry, candidate_entry) { @@ -1981,7 +1910,10 @@ fn process_wakeup( (should_trigger, approval_entry.backing_group()) }; - let (mut actions, maybe_cert) = if should_trigger { + let mut actions = Vec::new(); + let candidate_receipt = candidate_entry.candidate_receipt().clone(); + + let maybe_cert = if should_trigger { let maybe_cert = { let approval_entry = candidate_entry.approval_entry_mut(&relay_block) .expect("should_trigger only true if this fetched earlier; qed"); @@ -1989,11 +1921,11 @@ fn process_wakeup( approval_entry.trigger_our_assignment(state.clock.tick_now()) }; - let actions = vec![Action::WriteCandidateEntry(candidate_hash, candidate_entry.clone())]; + db.write_candidate_entry(candidate_entry.clone()); - (actions, maybe_cert) + maybe_cert } else { - (Vec::new(), None) + None }; if let Some((cert, val_index, tranche)) = maybe_cert { @@ -2010,7 +1942,7 @@ fn process_wakeup( tracing::trace!( target: LOG_TARGET, ?candidate_hash, - para_id = ?candidate_entry.candidate_receipt().descriptor.para_id, + para_id = ?candidate_receipt.descriptor.para_id, block_hash = ?relay_block, "Launching approval work.", ); @@ -2023,7 +1955,7 @@ fn process_wakeup( relay_block_hash: relay_block, candidate_index: i as _, session: block_entry.session(), - candidate: candidate_entry.candidate_receipt().clone(), + candidate: candidate_receipt, backing_group, }); } @@ -2274,12 +2206,13 @@ async fn launch_approval( // have been done. fn issue_approval( ctx: &mut impl SubsystemSender, - state: &State, + state: &mut State, + db: &mut OverlayedBackend<'_, impl Backend>, metrics: &Metrics, candidate_hash: CandidateHash, ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest, ) -> SubsystemResult> { - let block_entry = match state.db.load_block_entry(&block_hash)? { + let block_entry = match db.load_block_entry(&block_hash)? { Some(b) => b, None => { // not a cause for alarm - just lost a race with pruning, most likely. @@ -2337,7 +2270,7 @@ fn issue_approval( } }; - let candidate_entry = match state.db.load_candidate_entry(&candidate_hash)? { + let candidate_entry = match db.load_candidate_entry(&candidate_hash)? { Some(c) => c, None => { tracing::warn!( @@ -2397,6 +2330,7 @@ fn issue_approval( let actions = import_checked_approval( state, + db, metrics, block_entry, candidate_hash, diff --git a/polkadot/node/core/approval-voting/src/ops.rs b/polkadot/node/core/approval-voting/src/ops.rs new file mode 100644 index 0000000000..cc02aa811d --- /dev/null +++ b/polkadot/node/core/approval-voting/src/ops.rs @@ -0,0 +1,334 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Middleware interface that leverages low-level database operations +//! to provide a clean API for processing block and candidate imports. + +use polkadot_node_subsystem::SubsystemResult; + +use polkadot_primitives::v1::{ + CandidateHash, CandidateReceipt, BlockNumber, GroupIndex, Hash, +}; +use bitvec::{order::Lsb0 as BitOrderLsb0}; + +use std::convert::Into; +use std::collections::{BTreeMap, HashMap}; +use std::collections::hash_map::Entry; + +use super::persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry}; +use super::backend::{Backend, OverlayedBackend}; +use super::approval_db::{ + v1::{ + OurAssignment, StoredBlockRange, + }, +}; + +/// Information about a new candidate necessary to instantiate the requisite +/// candidate and approval entries. +#[derive(Clone)] +pub struct NewCandidateInfo { + candidate: CandidateReceipt, + backing_group: GroupIndex, + our_assignment: Option, +} + +impl NewCandidateInfo { + /// Convenience constructor + pub fn new( + candidate: CandidateReceipt, + backing_group: GroupIndex, + our_assignment: Option, + ) -> Self { + Self { candidate, backing_group, our_assignment } + } +} + +fn visit_and_remove_block_entry( + block_hash: Hash, + overlayed_db: &mut OverlayedBackend<'_, impl Backend>, + visited_candidates: &mut HashMap, +) -> SubsystemResult> { + let block_entry = match overlayed_db.load_block_entry(&block_hash)? { + None => return Ok(Vec::new()), + Some(b) => b, + }; + + overlayed_db.delete_block_entry(&block_hash); + for &(_, ref candidate_hash) in block_entry.candidates() { + let candidate = match visited_candidates.entry(*candidate_hash) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(e) => { + e.insert(match overlayed_db.load_candidate_entry(candidate_hash)? { + None => continue, // Should not happen except for corrupt DB + Some(c) => c, + }) + } + }; + + candidate.block_assignments.remove(&block_hash); + } + + Ok(block_entry.children) +} + +/// Canonicalize some particular block, pruning everything before it and +/// pruning any competing branches at the same height. +pub fn canonicalize( + overlay_db: &mut OverlayedBackend<'_, impl Backend>, + canon_number: BlockNumber, + canon_hash: Hash, +) -> SubsystemResult<()> { + let range = match overlay_db.load_stored_blocks()? { + None => return Ok(()), + Some(range) if range.0 >= canon_number => return Ok(()), + Some(range) => range, + }; + + // Storing all candidates in memory is potentially heavy, but should be fine + // as long as finality doesn't stall for a long while. We could optimize this + // by keeping only the metadata about which blocks reference each candidate. + let mut visited_candidates = HashMap::new(); + + // All the block heights we visited but didn't necessarily delete everything from. + let mut visited_heights = HashMap::new(); + + // First visit everything before the height. + for i in range.0..canon_number { + let at_height = overlay_db.load_blocks_at_height(&i)?; + overlay_db.delete_blocks_at_height(i); + + for b in at_height { + let _ = visit_and_remove_block_entry( + b, + overlay_db, + &mut visited_candidates, + )?; + } + } + + // Then visit everything at the height. + let pruned_branches = { + let at_height = overlay_db.load_blocks_at_height(&canon_number)?; + overlay_db.delete_blocks_at_height(canon_number); + + // Note that while there may be branches descending from blocks at earlier heights, + // we have already covered them by removing everything at earlier heights. + let mut pruned_branches = Vec::new(); + + for b in at_height { + let children = visit_and_remove_block_entry( + b, + overlay_db, + &mut visited_candidates, + )?; + + if b != canon_hash { + pruned_branches.extend(children); + } + } + + pruned_branches + }; + + // Follow all children of non-canonicalized blocks. + { + let mut frontier: Vec<(BlockNumber, Hash)> = pruned_branches.into_iter().map(|h| (canon_number + 1, h)).collect(); + while let Some((height, next_child)) = frontier.pop() { + let children = visit_and_remove_block_entry( + next_child, + overlay_db, + &mut visited_candidates, + )?; + + // extend the frontier of branches to include the given height. + frontier.extend(children.into_iter().map(|h| (height + 1, h))); + + // visit the at-height key for this deleted block's height. + let at_height = match visited_heights.entry(height) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(e) => e.insert(overlay_db.load_blocks_at_height(&height)?), + }; + if let Some(i) = at_height.iter().position(|x| x == &next_child) { + at_height.remove(i); + } + } + } + + // Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`. + for (candidate_hash, candidate) in visited_candidates.into_iter() { + if candidate.block_assignments.is_empty() { + overlay_db.delete_candidate_entry(&candidate_hash); + } else { + overlay_db.write_candidate_entry(candidate); + } + } + + // Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`. + for (h, at) in visited_heights.into_iter() { + if at.is_empty() { + overlay_db.delete_blocks_at_height(h); + } else { + overlay_db.write_blocks_at_height(h, at); + } + } + + // due to the fork pruning, this range actually might go too far above where our actual highest block is, + // if a relatively short fork is canonicalized. + // TODO https://github.com/paritytech/polkadot/issues/3389 + let new_range = StoredBlockRange( + canon_number + 1, + std::cmp::max(range.1, canon_number + 2), + ); + + overlay_db.write_stored_block_range(new_range); + + Ok(()) +} + +/// Record a new block entry. +/// +/// This will update the blocks-at-height mapping, the stored block range, if necessary, +/// and add block and candidate entries. It will also add approval entries to existing +/// candidate entries and add this as a child of any block entry corresponding to the +/// parent hash. +/// +/// Has no effect if there is already an entry for the block or `candidate_info` returns +/// `None` for any of the candidates referenced by the block entry. In these cases, +/// no information about new candidates will be referred to by this function. +pub fn add_block_entry( + store: &mut OverlayedBackend<'_, impl Backend>, + entry: BlockEntry, + n_validators: usize, + candidate_info: impl Fn(&CandidateHash) -> Option, +) -> SubsystemResult> { + let session = entry.session(); + let parent_hash = entry.parent_hash(); + let number = entry.block_number(); + + // Update the stored block range. + { + let new_range = match store.load_stored_blocks()? { + None => Some(StoredBlockRange(number, number + 1)), + Some(range) if range.1 <= number => Some(StoredBlockRange(range.0, number + 1)), + Some(_) => None, + }; + + new_range.map(|n| store.write_stored_block_range(n)); + }; + + // Update the blocks at height meta key. + { + let mut blocks_at_height = store.load_blocks_at_height(&number)?; + if blocks_at_height.contains(&entry.block_hash()) { + // seems we already have a block entry for this block. nothing to do here. + return Ok(Vec::new()) + } + + blocks_at_height.push(entry.block_hash()); + store.write_blocks_at_height(number, blocks_at_height) + }; + + let mut candidate_entries = Vec::with_capacity(entry.candidates().len()); + + // read and write all updated entries. + { + for &(_, ref candidate_hash) in entry.candidates() { + let NewCandidateInfo { + candidate, + backing_group, + our_assignment, + } = match candidate_info(candidate_hash) { + None => return Ok(Vec::new()), + Some(info) => info, + }; + + let mut candidate_entry = store.load_candidate_entry(&candidate_hash)? + .unwrap_or_else(move || CandidateEntry { + candidate, + session, + block_assignments: BTreeMap::new(), + approvals: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], + }); + + candidate_entry.block_assignments.insert( + entry.block_hash(), + ApprovalEntry::new( + Vec::new(), + backing_group, + our_assignment.map(|v| v.into()), + None, + bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], + false, + ) + ); + + store.write_candidate_entry(candidate_entry.clone()); + + candidate_entries.push((*candidate_hash, candidate_entry)); + } + }; + + // Update the child index for the parent. + store.load_block_entry(&parent_hash)?.map(|mut e| { + e.children.push(entry.block_hash()); + store.write_block_entry(e); + }); + + // Put the new block entry in. + store.write_block_entry(entry); + + Ok(candidate_entries) +} + +/// Forcibly approve all candidates included at up to the given relay-chain height in the indicated +/// chain. +pub fn force_approve( + store: &mut OverlayedBackend<'_, impl Backend>, + chain_head: Hash, + up_to: BlockNumber, +) -> SubsystemResult> { + enum State { + WalkTo, + Approving, + } + + let mut approved_hashes = Vec::new(); + + let mut cur_hash = chain_head; + let mut state = State::WalkTo; + + // iterate back to the `up_to` block, and then iterate backwards until all blocks + // are updated. + while let Some(mut entry) = store.load_block_entry(&cur_hash)? { + + if entry.block_number() <= up_to { + state = State::Approving; + } + + cur_hash = entry.parent_hash(); + + match state { + State::WalkTo => {}, + State::Approving => { + entry.approved_bitfield.iter_mut().for_each(|mut b| *b = true); + approved_hashes.push(entry.block_hash()); + store.write_block_entry(entry); + } + } + } + + Ok(approved_hashes) +} diff --git a/polkadot/node/core/approval-voting/src/persisted_entries.rs b/polkadot/node/core/approval-voting/src/persisted_entries.rs index ce0f5689c1..9032f49995 100644 --- a/polkadot/node/core/approval-voting/src/persisted_entries.rs +++ b/polkadot/node/core/approval-voting/src/persisted_entries.rs @@ -86,6 +86,19 @@ pub struct ApprovalEntry { } impl ApprovalEntry { + /// Convenience constructor + pub fn new( + tranches: Vec, + backing_group: GroupIndex, + our_assignment: Option, + our_approval_sig: Option, + // `n_validators` bits. + assignments: BitVec, + approved: bool, + ) -> Self { + Self { tranches, backing_group, our_assignment, our_approval_sig, assignments, approved } + } + // Access our assignment for this approval entry. pub fn our_assignment(&self) -> Option<&OurAssignment> { self.our_assignment.as_ref() @@ -244,12 +257,12 @@ impl From for crate::approval_db::v1::ApprovalEntry { /// Metadata regarding approval of a particular candidate. #[derive(Debug, Clone, PartialEq)] pub struct CandidateEntry { - candidate: CandidateReceipt, - session: SessionIndex, + pub candidate: CandidateReceipt, + pub session: SessionIndex, // Assignments are based on blocks, so we need to track assignments separately // based on the block we are looking at. - block_assignments: BTreeMap, - approvals: BitVec, + pub block_assignments: BTreeMap, + pub approvals: BitVec, } impl CandidateEntry { @@ -328,8 +341,8 @@ pub struct BlockEntry { // A bitfield where the i'th bit corresponds to the i'th candidate in `candidates`. // The i'th bit is `true` iff the candidate has been approved in the context of this // block. The block can be considered approved if the bitfield has all bits set to `true`. - approved_bitfield: BitVec, - children: Vec, + pub approved_bitfield: BitVec, + pub children: Vec, } impl BlockEntry { diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 84006478bd..c539f42566 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -15,7 +15,9 @@ // along with Polkadot. If not, see . use super::*; -use polkadot_primitives::v1::{CoreIndex, GroupIndex, ValidatorSignature}; +use super::approval_db::v1::Config; +use super::backend::{Backend, BackendWriteOp}; +use polkadot_primitives::v1::{CandidateDescriptor, CoreIndex, GroupIndex, ValidatorSignature}; use polkadot_node_primitives::approval::{ AssignmentCert, AssignmentCertKind, VRFOutput, VRFProof, RELAY_VRF_MODULO_CONTEXT, DelayTranche, @@ -33,6 +35,29 @@ use assert_matches::assert_matches; const SLOT_DURATION_MILLIS: u64 = 5000; +const DATA_COL: u32 = 0; +const NUM_COLUMNS: u32 = 1; + +const TEST_CONFIG: Config = Config { + col_data: DATA_COL, +}; + +fn make_db() -> DbBackend { + let db_writer: Arc = Arc::new(kvdb_memorydb::create(NUM_COLUMNS)); + DbBackend::new(db_writer.clone(), TEST_CONFIG) +} + +fn overlay_txn(db: &mut T, mut f: F) + where + T: Backend, + F: FnMut(&mut OverlayedBackend<'_, T>) +{ + let mut overlay_db = OverlayedBackend::new(db); + f(&mut overlay_db); + let write_ops = overlay_db.into_write_ops(); + db.write(write_ops).unwrap(); +} + fn slot_to_tick(t: impl Into) -> crate::time::Tick { crate::time::slot_number_to_tick(SLOT_DURATION_MILLIS, t.into()) } @@ -159,50 +184,17 @@ impl MockAssignmentCriteria< } } -#[derive(Default)] -struct TestStore { - block_entries: HashMap, - candidate_entries: HashMap, -} - -impl DBReader for TestStore { - fn load_block_entry( - &self, - block_hash: &Hash, - ) -> SubsystemResult> { - Ok(self.block_entries.get(block_hash).cloned()) - } - - fn load_candidate_entry( - &self, - candidate_hash: &CandidateHash, - ) -> SubsystemResult> { - Ok(self.candidate_entries.get(candidate_hash).cloned()) - } - - fn load_all_blocks(&self) -> SubsystemResult> { - let mut hashes: Vec<_> = self.block_entries.keys().cloned().collect(); - - hashes.sort_by_key(|k| self.block_entries.get(k).unwrap().block_number()); - - Ok(hashes) - } -} - -fn blank_state() -> State { +fn blank_state() -> State { State { session_window: RollingSessionWindow::new(APPROVAL_SESSIONS), keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: SLOT_DURATION_MILLIS, - db: TestStore::default(), clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), } } -fn single_session_state(index: SessionIndex, info: SessionInfo) - -> State -{ +fn single_session_state(index: SessionIndex, info: SessionInfo) -> State { State { session_window: RollingSessionWindow::with_session_info( APPROVAL_SESSIONS, @@ -244,6 +236,7 @@ struct StateConfig { validator_groups: Vec>, needed_approvals: u32, no_show_slots: u32, + candidate_hash: Option, } impl Default for StateConfig { @@ -256,12 +249,13 @@ impl Default for StateConfig { validator_groups: vec![vec![ValidatorIndex(0)], vec![ValidatorIndex(1)]], needed_approvals: 1, no_show_slots: 2, + candidate_hash: None, } } } // one block with one candidate. Alice and Bob are in the assignment keys. -fn some_state(config: StateConfig) -> State { +fn some_state(config: StateConfig, db: &mut DbBackend) -> State { let StateConfig { session_index, slot, @@ -270,11 +264,12 @@ fn some_state(config: StateConfig) -> State { validator_groups, needed_approvals, no_show_slots, + candidate_hash, } = config; let n_validators = validators.len(); - let mut state = State { + let state = State { clock: Box::new(MockClock::new(tick)), ..single_session_state(session_index, SessionInfo { validators: validators.iter().map(|v| v.public().into()).collect(), @@ -293,65 +288,63 @@ fn some_state(config: StateConfig) -> State { let core_index = 0.into(); let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = candidate_hash.unwrap_or_else(|| CandidateHash(Hash::repeat_byte(0xCC))); add_block( - &mut state.db, + db, block_hash, session_index, slot, ); add_candidate_to_block( - &mut state.db, + db, block_hash, candidate_hash, n_validators, core_index, GroupIndex(0), + None, ); state } fn add_block( - db: &mut TestStore, + db: &mut DbBackend, block_hash: Hash, session: SessionIndex, slot: Slot, ) { - db.block_entries.insert( + overlay_txn(db, |overlay_db| overlay_db.write_block_entry(approval_db::v1::BlockEntry { block_hash, - approval_db::v1::BlockEntry { - block_hash, - parent_hash: Default::default(), - block_number: 0, - session, - slot, - candidates: Vec::new(), - relay_vrf_story: Default::default(), - approved_bitfield: Default::default(), - children: Default::default(), - }.into(), - ); + parent_hash: Default::default(), + block_number: 0, + session, + slot, + candidates: Vec::new(), + relay_vrf_story: Default::default(), + approved_bitfield: Default::default(), + children: Default::default(), + }.into())); } fn add_candidate_to_block( - db: &mut TestStore, + db: &mut DbBackend, block_hash: Hash, candidate_hash: CandidateHash, n_validators: usize, core: CoreIndex, backing_group: GroupIndex, + candidate_receipt: Option, ) { - let mut block_entry = db.block_entries.get(&block_hash).unwrap().clone(); + let mut block_entry = db.load_block_entry(&block_hash).unwrap().unwrap(); - let candidate_entry = db.candidate_entries - .entry(candidate_hash) - .or_insert_with(|| approval_db::v1::CandidateEntry { + let mut candidate_entry = db.load_candidate_entry(&candidate_hash).unwrap() + .unwrap_or_else(|| approval_db::v1::CandidateEntry { session: block_entry.session(), block_assignments: Default::default(), - candidate: CandidateReceipt::default(), + candidate: candidate_receipt.unwrap_or_default(), approvals: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], }.into()); @@ -369,12 +362,61 @@ fn add_candidate_to_block( }.into(), ); - db.block_entries.insert(block_hash, block_entry); + overlay_txn(db, |overlay_db| { + overlay_db.write_block_entry(block_entry.clone()); + overlay_db.write_candidate_entry(candidate_entry.clone()); + }) +} + +fn import_assignment( + db: &mut DbBackend, + candidate_hash: &CandidateHash, + block_hash: &Hash, + validator_index: ValidatorIndex, + mut f: F, +) + where F: FnMut(&mut CandidateEntry) +{ + let mut candidate_entry = db.load_candidate_entry(candidate_hash).unwrap().unwrap(); + candidate_entry.approval_entry_mut(block_hash).unwrap() + .import_assignment(0, validator_index, 0); + + f(&mut candidate_entry); + + overlay_txn(db, |overlay_db| overlay_db.write_candidate_entry(candidate_entry.clone())); +} + +fn set_our_assignment( + db: &mut DbBackend, + candidate_hash: &CandidateHash, + block_hash: &Hash, + tranche: DelayTranche, + mut f: F, +) + where F: FnMut(&mut CandidateEntry) +{ + let mut candidate_entry = db.load_candidate_entry(&candidate_hash).unwrap().unwrap(); + let approval_entry = candidate_entry.approval_entry_mut(&block_hash).unwrap(); + + approval_entry.set_our_assignment(approval_db::v1::OurAssignment { + cert: garbage_assignment_cert( + AssignmentCertKind::RelayVRFModulo { sample: 0 } + ), + tranche, + validator_index: ValidatorIndex(0), + triggered: false, + }.into()); + + f(&mut candidate_entry); + + overlay_txn(db, |overlay_db| overlay_db.write_candidate_entry(candidate_entry.clone())); } #[test] fn rejects_bad_assignment() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); + let candidate_hash = CandidateReceipt::::default().hash(); let assignment_good = IndirectAssignmentCert { block_hash, validator: ValidatorIndex(0), @@ -384,17 +426,27 @@ fn rejects_bad_assignment() { }, ), }; - let mut state = some_state(Default::default()); + let mut state = some_state(StateConfig { + candidate_hash: Some(candidate_hash), + ..Default::default() + }, &mut db); let candidate_index = 0; + let mut overlay_db = OverlayedBackend::new(&db); let res = check_and_import_assignment( &mut state, + &mut overlay_db, assignment_good.clone(), candidate_index, ).unwrap(); assert_eq!(res.0, AssignmentCheckResult::Accepted); // Check that the assignment's been imported. - assert!(res.1.iter().any(|action| matches!(action, Action::WriteCandidateEntry(..)))); + assert_eq!(res.1.len(), 1); + + let write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 1); + assert_matches!(write_ops.get(0).unwrap(), BackendWriteOp::WriteCandidateEntry(..)); + db.write(write_ops).unwrap(); // unknown hash let unknown_hash = Hash::repeat_byte(0x02); @@ -408,33 +460,44 @@ fn rejects_bad_assignment() { ), }; + let mut overlay_db = OverlayedBackend::new(&db); let res = check_and_import_assignment( &mut state, + &mut overlay_db, assignment, candidate_index, ).unwrap(); assert_eq!(res.0, AssignmentCheckResult::Bad(AssignmentCheckError::UnknownBlock(unknown_hash))); + assert_eq!(overlay_db.into_write_ops().count(), 0); let mut state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Err(criteria::InvalidAssignment) })), - ..some_state(Default::default()) + ..some_state(StateConfig { + candidate_hash: Some(candidate_hash), + ..Default::default() + }, &mut db) }; // same assignment, but this time rejected + let mut overlay_db = OverlayedBackend::new(&db); let res = check_and_import_assignment( &mut state, + &mut overlay_db, assignment_good, candidate_index, ).unwrap(); assert_eq!(res.0, AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCert(ValidatorIndex(0)))); + assert_eq!(overlay_db.into_write_ops().count(), 0); } #[test] fn rejects_assignment_in_future() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); let candidate_index = 0; + let candidate_hash = CandidateReceipt::::default().hash(); let assignment = IndirectAssignmentCert { block_hash, validator: ValidatorIndex(0), @@ -450,33 +513,59 @@ fn rejects_assignment_in_future() { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(move || { Ok((tick + 20) as _) })), - ..some_state(StateConfig { tick, ..Default::default() }) + ..some_state(StateConfig { + tick, + candidate_hash: Some(candidate_hash), + ..Default::default() + }, &mut db) }; + let mut overlay_db = OverlayedBackend::new(&db); let res = check_and_import_assignment( &mut state, + &mut overlay_db, assignment.clone(), candidate_index, ).unwrap(); assert_eq!(res.0, AssignmentCheckResult::TooFarInFuture); + let write_ops = overlay_db.into_write_ops().collect::>(); + + assert_eq!(write_ops.len(), 0); + db.write(write_ops).unwrap(); + let mut state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(move || { Ok((tick + 20 - 1) as _) })), - ..some_state(StateConfig { tick, ..Default::default() }) + ..some_state(StateConfig { + tick, + candidate_hash: Some(candidate_hash), + ..Default::default() + }, &mut db) }; + let mut overlay_db = OverlayedBackend::new(&db); let res = check_and_import_assignment( &mut state, + &mut overlay_db, assignment.clone(), candidate_index, ).unwrap(); assert_eq!(res.0, AssignmentCheckResult::Accepted); + + let write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 1); + + assert_matches!( + write_ops.get(0).unwrap(), + BackendWriteOp::WriteCandidateEntry(..) + ); } #[test] fn rejects_assignment_with_unknown_candidate() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); let candidate_index = 1; let assignment = IndirectAssignmentCert { @@ -489,20 +578,24 @@ fn rejects_assignment_with_unknown_candidate() { ), }; - let mut state = some_state(Default::default()); + let mut state = some_state(Default::default(), &mut db); + let mut overlay_db = OverlayedBackend::new(&db); let res = check_and_import_assignment( &mut state, + &mut overlay_db, assignment.clone(), candidate_index, ).unwrap(); assert_eq!(res.0, AssignmentCheckResult::Bad(AssignmentCheckError::InvalidCandidateIndex(candidate_index))); + assert_eq!(overlay_db.into_write_ops().count(), 0); } #[test] fn assignment_import_updates_candidate_entry_and_schedules_wakeup() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); let candidate_index = 0; let assignment = IndirectAssignmentCert { @@ -519,17 +612,22 @@ fn assignment_import_updates_candidate_entry_and_schedules_wakeup() { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), - ..some_state(Default::default()) + ..some_state(StateConfig { + candidate_hash: Some(candidate_hash), + ..Default::default() + }, &mut db) }; + let mut overlay_db = OverlayedBackend::new(&db); let (res, actions) = check_and_import_assignment( &mut state, + &mut overlay_db, assignment.clone(), candidate_index, ).unwrap(); assert_eq!(res, AssignmentCheckResult::Accepted); - assert_eq!(actions.len(), 2); + assert_eq!(actions.len(), 1); assert_matches!( actions.get(0).unwrap(), @@ -545,25 +643,31 @@ fn assignment_import_updates_candidate_entry_and_schedules_wakeup() { } ); + let write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(1, write_ops.len()); assert_matches!( - actions.get(1).unwrap(), - Action::WriteCandidateEntry(c, e) => { - assert_eq!(c, &candidate_hash); - assert!(e.approval_entry(&block_hash).unwrap().is_assigned(ValidatorIndex(0))); + write_ops.get(0).unwrap(), + BackendWriteOp::WriteCandidateEntry(ref c_entry) => { + assert_eq!(&c_entry.candidate.hash(), &candidate_hash); + assert!(c_entry.approval_entry(&block_hash).unwrap().is_assigned(ValidatorIndex(0))); } ); } #[test] fn rejects_approval_before_assignment() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), - ..some_state(Default::default()) + ..some_state(StateConfig { + candidate_hash: Some(candidate_hash), + ..Default::default() + }, &mut db) }; let vote = IndirectSignedApprovalVote { @@ -573,8 +677,10 @@ fn rejects_approval_before_assignment() { signature: sign_approval(Sr25519Keyring::Alice, candidate_hash, 1), }; + let mut overlay_db = OverlayedBackend::new(&db); let (actions, res) = check_and_import_approval( &state, + &mut overlay_db, &Metrics(None), vote, |r| r @@ -582,18 +688,23 @@ fn rejects_approval_before_assignment() { assert_eq!(res, ApprovalCheckResult::Bad(ApprovalCheckError::NoAssignment(ValidatorIndex(0)))); assert!(actions.is_empty()); + assert_eq!(overlay_db.into_write_ops().count(), 0); } #[test] fn rejects_approval_if_no_candidate_entry() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); - let mut state = State { + let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), - ..some_state(Default::default()) + ..some_state(StateConfig { + candidate_hash: Some(candidate_hash), + ..Default::default() + }, &mut db) }; let vote = IndirectSignedApprovalVote { @@ -603,10 +714,13 @@ fn rejects_approval_if_no_candidate_entry() { signature: sign_approval(Sr25519Keyring::Alice, candidate_hash, 1), }; - state.db.candidate_entries.remove(&candidate_hash); + overlay_txn(&mut db, |overlay_db| overlay_db.delete_candidate_entry(&candidate_hash)); + + let mut overlay_db = OverlayedBackend::new(&db); let (actions, res) = check_and_import_approval( &state, + &mut overlay_db, &Metrics(None), vote, |r| r @@ -614,19 +728,24 @@ fn rejects_approval_if_no_candidate_entry() { assert_eq!(res, ApprovalCheckResult::Bad(ApprovalCheckError::InvalidCandidate(0, candidate_hash))); assert!(actions.is_empty()); + assert_eq!(overlay_db.into_write_ops().count(), 0); } #[test] fn rejects_approval_if_no_block_entry() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); let validator_index = ValidatorIndex(0); - let mut state = State { + let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), - ..some_state(Default::default()) + ..some_state(StateConfig { + candidate_hash: Some(candidate_hash), + ..Default::default() + }, &mut db) }; let vote = IndirectSignedApprovalVote { @@ -636,15 +755,14 @@ fn rejects_approval_if_no_block_entry() { signature: sign_approval(Sr25519Keyring::Alice, candidate_hash, 1), }; - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index, 0); + import_assignment(&mut db, &candidate_hash, &block_hash, validator_index, |_| {}); - state.db.block_entries.remove(&block_hash); + overlay_txn(&mut db, |overlay_db| overlay_db.delete_block_entry(&block_hash)); + let mut overlay_db = OverlayedBackend::new(&db); let (actions, res) = check_and_import_approval( &state, + &mut overlay_db, &Metrics(None), vote, |r| r @@ -652,16 +770,18 @@ fn rejects_approval_if_no_block_entry() { assert_eq!(res, ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(block_hash))); assert!(actions.is_empty()); + assert_eq!(overlay_db.into_write_ops().count(), 0); } #[test] fn accepts_and_imports_approval_after_assignment() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); let validator_index = ValidatorIndex(0); let candidate_index = 0; - let mut state = State { + let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), @@ -669,8 +789,9 @@ fn accepts_and_imports_approval_after_assignment() { validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie], validator_groups: vec![vec![ValidatorIndex(0), ValidatorIndex(1)], vec![ValidatorIndex(2)]], needed_approvals: 2, + candidate_hash: Some(candidate_hash), ..Default::default() - }) + }, &mut db) }; let vote = IndirectSignedApprovalVote { @@ -680,13 +801,12 @@ fn accepts_and_imports_approval_after_assignment() { signature: sign_approval(Sr25519Keyring::Alice, candidate_hash, 1), }; - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index, 0); + import_assignment(&mut db, &candidate_hash, &block_hash, validator_index, |_| {}); + let mut overlay_db = OverlayedBackend::new(&db); let (actions, res) = check_and_import_approval( &state, + &mut overlay_db, &Metrics(None), vote, |r| r @@ -694,12 +814,14 @@ fn accepts_and_imports_approval_after_assignment() { assert_eq!(res, ApprovalCheckResult::Accepted); - assert_eq!(actions.len(), 1); + assert_eq!(actions.len(), 0); + + let write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 1); assert_matches!( - actions.get(0).unwrap(), - Action::WriteCandidateEntry(c_hash, c_entry) => { - assert_eq!(c_hash, &candidate_hash); - assert!(c_entry.approvals().get(validator_index.0 as usize).unwrap()); + write_ops.get(0).unwrap(), + BackendWriteOp::WriteCandidateEntry(ref c_entry) => { + assert_eq!(&c_entry.candidate.hash(), &candidate_hash); assert!(!c_entry.approval_entry(&block_hash).unwrap().is_approved()); } ); @@ -707,13 +829,14 @@ fn accepts_and_imports_approval_after_assignment() { #[test] fn second_approval_import_only_schedules_wakeups() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); let validator_index = ValidatorIndex(0); let validator_index_b = ValidatorIndex(1); let candidate_index = 0; - let mut state = State { + let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), @@ -721,8 +844,9 @@ fn second_approval_import_only_schedules_wakeups() { validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie], validator_groups: vec![vec![ValidatorIndex(0), ValidatorIndex(1)], vec![ValidatorIndex(2)]], needed_approvals: 2, + candidate_hash: Some(candidate_hash), ..Default::default() - }) + }, &mut db) }; let vote = IndirectSignedApprovalVote { @@ -732,18 +856,16 @@ fn second_approval_import_only_schedules_wakeups() { signature: sign_approval(Sr25519Keyring::Alice, candidate_hash, 1), }; - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index, 0); - - assert!(!state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .mark_approval(validator_index)); + import_assignment(&mut db, &candidate_hash, &block_hash, validator_index, |candidate_entry| { + assert!(!candidate_entry.mark_approval(validator_index)); + }); // There is only one assignment, so nothing to schedule if we double-import. + let mut overlay_db = OverlayedBackend::new(&db); let (actions, res) = check_and_import_approval( &state, + &mut overlay_db, &Metrics(None), vote.clone(), |r| r @@ -751,16 +873,16 @@ fn second_approval_import_only_schedules_wakeups() { assert_eq!(res, ApprovalCheckResult::Accepted); assert!(actions.is_empty()); + assert_eq!(overlay_db.into_write_ops().count(), 0); // After adding a second assignment, there should be a schedule wakeup action. - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index_b, 0); + import_assignment(&mut db, &candidate_hash, &block_hash, validator_index_b, |_| {}); + let mut overlay_db = OverlayedBackend::new(&db); let (actions, res) = check_and_import_approval( &state, + &mut overlay_db, &Metrics(None), vote, |r| r @@ -768,6 +890,7 @@ fn second_approval_import_only_schedules_wakeups() { assert_eq!(res, ApprovalCheckResult::Accepted); assert_eq!(actions.len(), 1); + assert_eq!(overlay_db.into_write_ops().count(), 0); assert_matches!( actions.get(0).unwrap(), @@ -777,12 +900,13 @@ fn second_approval_import_only_schedules_wakeups() { #[test] fn import_checked_approval_updates_entries_and_schedules() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); let validator_index_a = ValidatorIndex(0); let validator_index_b = ValidatorIndex(1); - let mut state = State { + let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), @@ -790,31 +914,27 @@ fn import_checked_approval_updates_entries_and_schedules() { validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie], validator_groups: vec![vec![ValidatorIndex(0), ValidatorIndex(1)], vec![ValidatorIndex(2)]], needed_approvals: 2, + candidate_hash: Some(candidate_hash), ..Default::default() - }) + }, &mut db) }; - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index_a, 0); - - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index_b, 0); + import_assignment(&mut db, &candidate_hash, &block_hash, validator_index_a, |_| {}); + import_assignment(&mut db, &candidate_hash, &block_hash, validator_index_b, |_| {}); { - let mut actions = import_checked_approval( + let mut overlay_db = OverlayedBackend::new(&db); + let actions = import_checked_approval( &state, + &mut overlay_db, &Metrics(None), - state.db.block_entries.get(&block_hash).unwrap().clone(), + db.load_block_entry(&block_hash).unwrap().unwrap(), candidate_hash, - state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + db.load_candidate_entry(&candidate_hash).unwrap().unwrap(), ApprovalSource::Remote(validator_index_a), ); - assert_eq!(actions.len(), 2); + assert_eq!(actions.len(), 1); assert_matches!( actions.get(0).unwrap(), Action::ScheduleWakeup { @@ -826,27 +946,32 @@ fn import_checked_approval_updates_entries_and_schedules() { assert_eq!(c_hash, &candidate_hash); } ); + + let mut write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 1); assert_matches!( - actions.get_mut(1).unwrap(), - Action::WriteCandidateEntry(c_hash, ref mut c_entry) => { - assert_eq!(c_hash, &candidate_hash); + write_ops.get_mut(0).unwrap(), + BackendWriteOp::WriteCandidateEntry(ref mut c_entry) => { assert!(!c_entry.approval_entry(&block_hash).unwrap().is_approved()); assert!(c_entry.mark_approval(validator_index_a)); - - state.db.candidate_entries.insert(candidate_hash, c_entry.clone()); } ); + + db.write(write_ops).unwrap(); } { - let mut actions = import_checked_approval( + let mut overlay_db = OverlayedBackend::new(&db); + let actions = import_checked_approval( &state, + &mut overlay_db, &Metrics(None), - state.db.block_entries.get(&block_hash).unwrap().clone(), + db.load_block_entry(&block_hash).unwrap().unwrap(), candidate_hash, - state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + db.load_candidate_entry(&candidate_hash).unwrap().unwrap(), ApprovalSource::Remote(validator_index_b), ); + assert_eq!(actions.len(), 1); assert_matches!( actions.get(0).unwrap(), @@ -855,19 +980,22 @@ fn import_checked_approval_updates_entries_and_schedules() { } ); + let mut write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 2); assert_matches!( - actions.get(1).unwrap(), - Action::WriteBlockEntry(b_entry) => { + write_ops.get(0).unwrap(), + BackendWriteOp::WriteBlockEntry(b_entry) => { assert_eq!(b_entry.block_hash(), block_hash); assert!(b_entry.is_fully_approved()); assert!(b_entry.is_candidate_approved(&candidate_hash)); } ); + assert_matches!( - actions.get_mut(2).unwrap(), - Action::WriteCandidateEntry(c_hash, ref mut c_entry) => { - assert_eq!(c_hash, &candidate_hash); + write_ops.get_mut(1).unwrap(), + BackendWriteOp::WriteCandidateEntry(ref mut c_entry) => { + assert_eq!(&c_entry.candidate.hash(), &candidate_hash); assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved()); assert!(c_entry.mark_approval(validator_index_b)); } @@ -1312,14 +1440,19 @@ fn wakeup_earlier_supersedes_later() { #[test] fn import_checked_approval_sets_one_block_bit_at_a_time() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); - let candidate_hash_2 = CandidateHash(Hash::repeat_byte(0xDD)); + let candidate_hash = CandidateReceipt::::default().hash(); + let candidate_receipt_2 = CandidateReceipt:: { + descriptor: CandidateDescriptor::default(), + commitments_hash: Hash::repeat_byte(0x02), + }; + let candidate_hash_2 = candidate_receipt_2.hash(); let validator_index_a = ValidatorIndex(0); let validator_index_b = ValidatorIndex(1); - let mut state = State { + let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), @@ -1327,79 +1460,81 @@ fn import_checked_approval_sets_one_block_bit_at_a_time() { validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie], validator_groups: vec![vec![ValidatorIndex(0), ValidatorIndex(1)], vec![ValidatorIndex(2)]], needed_approvals: 2, + candidate_hash: Some(candidate_hash), ..Default::default() - }) + }, &mut db) }; add_candidate_to_block( - &mut state.db, + &mut db, block_hash, candidate_hash_2, 3, CoreIndex(1), GroupIndex(1), + Some(candidate_receipt_2), ); - let setup_candidate = |db: &mut TestStore, c_hash| { - db.candidate_entries.get_mut(&c_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index_a, 0); - - db.candidate_entries.get_mut(&c_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index_b, 0); - - assert!(!db.candidate_entries.get_mut(&c_hash).unwrap() - .mark_approval(validator_index_a)); + let setup_candidate = |db: &mut DbBackend, c_hash| { + import_assignment(db, &c_hash, &block_hash, validator_index_a, |candidate_entry| { + let approval_entry = candidate_entry.approval_entry_mut(&block_hash).unwrap(); + approval_entry.import_assignment(0, validator_index_b, 0); + assert!(!candidate_entry.mark_approval(validator_index_a)); + }) }; - setup_candidate(&mut state.db, candidate_hash); - setup_candidate(&mut state.db, candidate_hash_2); + setup_candidate(&mut db, candidate_hash); + setup_candidate(&mut db, candidate_hash_2); + let mut overlay_db = OverlayedBackend::new(&db); let actions = import_checked_approval( &state, + &mut overlay_db, &Metrics(None), - state.db.block_entries.get(&block_hash).unwrap().clone(), + db.load_block_entry(&block_hash).unwrap().unwrap(), candidate_hash, - state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + db.load_candidate_entry(&candidate_hash).unwrap().unwrap(), ApprovalSource::Remote(validator_index_b), ); - assert_eq!(actions.len(), 2); + assert_eq!(actions.len(), 0); + + let write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 2); + assert_matches!( - actions.get(0).unwrap(), - Action::WriteBlockEntry(b_entry) => { + write_ops.get(0).unwrap(), + BackendWriteOp::WriteBlockEntry(b_entry) => { assert_eq!(b_entry.block_hash(), block_hash); assert!(!b_entry.is_fully_approved()); assert!(b_entry.is_candidate_approved(&candidate_hash)); assert!(!b_entry.is_candidate_approved(&candidate_hash_2)); - - state.db.block_entries.insert(block_hash, b_entry.clone()); } ); assert_matches!( - actions.get(1).unwrap(), - Action::WriteCandidateEntry(c_h, c_entry) => { - assert_eq!(c_h, &candidate_hash); + write_ops.get(1).unwrap(), + BackendWriteOp::WriteCandidateEntry(c_entry) => { + assert_eq!(&c_entry.candidate.hash(), &candidate_hash); assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved()); - - state.db.candidate_entries.insert(*c_h, c_entry.clone()); } ); + db.write(write_ops).unwrap(); + + let mut overlay_db = OverlayedBackend::new(&db); let actions = import_checked_approval( &state, + &mut overlay_db, &Metrics(None), - state.db.block_entries.get(&block_hash).unwrap().clone(), + db.load_block_entry(&block_hash).unwrap().unwrap(), candidate_hash_2, - state.db.candidate_entries.get(&candidate_hash_2).unwrap().clone(), + db.load_candidate_entry(&candidate_hash_2).unwrap().unwrap(), ApprovalSource::Remote(validator_index_b), ); - assert_eq!(actions.len(), 3); + assert_eq!(actions.len(), 1); + assert_matches!( actions.get(0).unwrap(), Action::NoteApprovedInChainSelection(h) => { @@ -1407,9 +1542,12 @@ fn import_checked_approval_sets_one_block_bit_at_a_time() { } ); + let write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 2); + assert_matches!( - actions.get(1).unwrap(), - Action::WriteBlockEntry(b_entry) => { + write_ops.get(0).unwrap(), + BackendWriteOp::WriteBlockEntry(b_entry) => { assert_eq!(b_entry.block_hash(), block_hash); assert!(b_entry.is_fully_approved()); assert!(b_entry.is_candidate_approved(&candidate_hash)); @@ -1418,9 +1556,9 @@ fn import_checked_approval_sets_one_block_bit_at_a_time() { ); assert_matches!( - actions.get(2).unwrap(), - Action::WriteCandidateEntry(c_h, c_entry) => { - assert_eq!(c_h, &candidate_hash_2); + write_ops.get(1).unwrap(), + BackendWriteOp::WriteCandidateEntry(c_entry) => { + assert_eq!(&c_entry.candidate.hash(), &candidate_hash_2); assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved()); } ); @@ -1428,6 +1566,8 @@ fn import_checked_approval_sets_one_block_bit_at_a_time() { #[test] fn approved_ancestor_all_approved() { + let mut db = make_db(); + let block_hash_1 = Hash::repeat_byte(0x01); let block_hash_2 = Hash::repeat_byte(0x02); let block_hash_3 = Hash::repeat_byte(0x03); @@ -1438,21 +1578,7 @@ fn approved_ancestor_all_approved() { let slot = Slot::from(1); let session_index = 1; - let mut state = State { - assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { - Ok(0) - })), - ..some_state(StateConfig { - validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob], - validator_groups: vec![vec![ValidatorIndex(0)], vec![ValidatorIndex(1)]], - needed_approvals: 2, - session_index, - slot, - ..Default::default() - }) - }; - - let add_block = |db: &mut TestStore, block_hash, approved| { + let add_block = |db: &mut DbBackend, block_hash, approved| { add_block( db, block_hash, @@ -1460,24 +1586,27 @@ fn approved_ancestor_all_approved() { slot, ); - let b = db.block_entries.get_mut(&block_hash).unwrap(); - b.add_candidate(CoreIndex(0), candidate_hash); + let mut block_entry = db.load_block_entry(&block_hash).unwrap().unwrap(); + block_entry.add_candidate(CoreIndex(0), candidate_hash); if approved { - b.mark_approved_by_hash(&candidate_hash); + block_entry.mark_approved_by_hash(&candidate_hash); } + + overlay_txn(db, |overlay_db| overlay_db.write_block_entry(block_entry.clone())); }; - add_block(&mut state.db, block_hash_1, true); - add_block(&mut state.db, block_hash_2, true); - add_block(&mut state.db, block_hash_3, true); - add_block(&mut state.db, block_hash_4, true); + add_block(&mut db, block_hash_1, true); + add_block(&mut db, block_hash_2, true); + add_block(&mut db, block_hash_3, true); + add_block(&mut db, block_hash_4, true); let pool = TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); let test_fut = Box::pin(async move { + let overlay_db = OverlayedBackend::new(&db); assert_eq!( - handle_approved_ancestor(&mut ctx, &state.db, block_hash_4, 0, &Default::default()) + handle_approved_ancestor(&mut ctx, &overlay_db, block_hash_4, 0, &Default::default()) .await.unwrap(), Some((block_hash_4, 4)), ) @@ -1511,6 +1640,8 @@ fn approved_ancestor_all_approved() { #[test] fn approved_ancestor_missing_approval() { + let mut db = make_db(); + let block_hash_1 = Hash::repeat_byte(0x01); let block_hash_2 = Hash::repeat_byte(0x02); let block_hash_3 = Hash::repeat_byte(0x03); @@ -1521,21 +1652,7 @@ fn approved_ancestor_missing_approval() { let slot = Slot::from(1); let session_index = 1; - let mut state = State { - assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { - Ok(0) - })), - ..some_state(StateConfig { - validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob], - validator_groups: vec![vec![ValidatorIndex(0)], vec![ValidatorIndex(1)]], - needed_approvals: 2, - session_index, - slot, - ..Default::default() - }) - }; - - let add_block = |db: &mut TestStore, block_hash, approved| { + let add_block = |db: &mut DbBackend, block_hash, approved| { add_block( db, block_hash, @@ -1543,24 +1660,27 @@ fn approved_ancestor_missing_approval() { slot, ); - let b = db.block_entries.get_mut(&block_hash).unwrap(); - b.add_candidate(CoreIndex(0), candidate_hash); + let mut block_entry = db.load_block_entry(&block_hash).unwrap().unwrap(); + block_entry.add_candidate(CoreIndex(0), candidate_hash); if approved { - b.mark_approved_by_hash(&candidate_hash); + block_entry.mark_approved_by_hash(&candidate_hash); } + + overlay_txn(db, |overlay_db| overlay_db.write_block_entry(block_entry.clone())); }; - add_block(&mut state.db, block_hash_1, true); - add_block(&mut state.db, block_hash_2, true); - add_block(&mut state.db, block_hash_3, false); - add_block(&mut state.db, block_hash_4, true); + add_block(&mut db, block_hash_1, true); + add_block(&mut db, block_hash_2, true); + add_block(&mut db, block_hash_3, false); + add_block(&mut db, block_hash_4, true); let pool = TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); let test_fut = Box::pin(async move { + let overlay_db = OverlayedBackend::new(&db); assert_eq!( - handle_approved_ancestor(&mut ctx, &state.db, block_hash_4, 0, &Default::default()) + handle_approved_ancestor(&mut ctx, &overlay_db, block_hash_4, 0, &Default::default()) .await.unwrap(), Some((block_hash_2, 2)), ) @@ -1594,12 +1714,14 @@ fn approved_ancestor_missing_approval() { #[test] fn process_wakeup_trigger_assignment_launch_approval() { + let mut db = make_db(); + let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); let slot = Slot::from(1); let session_index = 1; - let mut state = State { + let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), @@ -1609,57 +1731,38 @@ fn process_wakeup_trigger_assignment_launch_approval() { needed_approvals: 2, session_index, slot, + candidate_hash: Some(candidate_hash), ..Default::default() - }) + }, &mut db) }; + let mut overlay_db = OverlayedBackend::new(&db); let actions = process_wakeup( &state, + &mut overlay_db, block_hash, candidate_hash, 1, ).unwrap(); assert!(actions.is_empty()); + assert_eq!(overlay_db.into_write_ops().count(), 0); - state.db.candidate_entries - .get_mut(&candidate_hash) - .unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .set_our_assignment(approval_db::v1::OurAssignment { - cert: garbage_assignment_cert( - AssignmentCertKind::RelayVRFModulo { sample: 0 } - ), - tranche: 0, - validator_index: ValidatorIndex(0), - triggered: false, - }.into()); + set_our_assignment(&mut db, &candidate_hash, &block_hash, 0, |_| {}); + let mut overlay_db = OverlayedBackend::new(&db); let actions = process_wakeup( &state, + &mut overlay_db, block_hash, candidate_hash, 1, ).unwrap(); - assert_eq!(actions.len(), 3); - assert_matches!( - actions.get(0).unwrap(), - Action::WriteCandidateEntry(c_hash, c_entry) => { - assert_eq!(c_hash, &candidate_hash); - assert!(c_entry - .approval_entry(&block_hash) - .unwrap() - .our_assignment() - .unwrap() - .triggered() - ); - } - ); + assert_eq!(actions.len(), 2); assert_matches!( - actions.get(1).unwrap(), + actions.get(0).unwrap(), Action::LaunchApproval { candidate_index, .. @@ -1669,24 +1772,43 @@ fn process_wakeup_trigger_assignment_launch_approval() { ); assert_matches!( - actions.get(2).unwrap(), + actions.get(1).unwrap(), Action::ScheduleWakeup { tick, .. } => { assert_eq!(tick, &slot_to_tick(0 + 2)); } - ) + ); + + let write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 1); + + assert_matches!( + write_ops.get(0).unwrap(), + BackendWriteOp::WriteCandidateEntry(c_entry) => { + assert_eq!(&c_entry.candidate.hash(), &candidate_hash); + assert!(c_entry + .approval_entry(&block_hash) + .unwrap() + .our_assignment() + .unwrap() + .triggered() + ); + } + ); } #[test] fn process_wakeup_schedules_wakeup() { + let mut db = make_db(); + let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); let slot = Slot::from(1); let session_index = 1; - let mut state = State { + let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(10) })), @@ -1696,26 +1818,17 @@ fn process_wakeup_schedules_wakeup() { needed_approvals: 2, session_index, slot, + candidate_hash: Some(candidate_hash), ..Default::default() - }) + }, &mut db) }; - state.db.candidate_entries - .get_mut(&candidate_hash) - .unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .set_our_assignment(approval_db::v1::OurAssignment { - cert: garbage_assignment_cert( - AssignmentCertKind::RelayVRFModulo { sample: 0 } - ), - tranche: 10, - validator_index: ValidatorIndex(0), - triggered: false, - }.into()); + set_our_assignment(&mut db, &candidate_hash, &block_hash, 10, |_| {}); + let mut overlay_db = OverlayedBackend::new(&db); let actions = process_wakeup( &state, + &mut overlay_db, block_hash, candidate_hash, 1, @@ -1730,6 +1843,7 @@ fn process_wakeup_schedules_wakeup() { assert_eq!(tick, &(slot_to_tick(slot) + 10)); } ); + assert_eq!(overlay_db.into_write_ops().count(), 0); } #[test] @@ -1744,39 +1858,42 @@ fn finalization_event_prunes() { #[test] fn local_approval_import_always_updates_approval_entry() { + let mut db = make_db(); let block_hash = Hash::repeat_byte(0x01); let block_hash_2 = Hash::repeat_byte(0x02); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let candidate_hash = CandidateReceipt::::default().hash(); let validator_index = ValidatorIndex(0); let state_config = StateConfig { validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie], validator_groups: vec![vec![ValidatorIndex(0), ValidatorIndex(1)], vec![ValidatorIndex(2)]], needed_approvals: 2, + candidate_hash: Some(candidate_hash), ..Default::default() }; - let mut state = State { + let state = State { assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { Ok(0) })), - ..some_state(state_config.clone()) + ..some_state(state_config.clone(), &mut db) }; add_block( - &mut state.db, + &mut db, block_hash_2, state_config.session_index, state_config.slot, ); add_candidate_to_block( - &mut state.db, + &mut db, block_hash_2, candidate_hash, state_config.validators.len(), 1.into(), GroupIndex(1), + None, ); let sig_a = sign_approval(Sr25519Keyring::Alice, candidate_hash, 1); @@ -1784,21 +1901,11 @@ fn local_approval_import_always_updates_approval_entry() { { let mut import_local_assignment = |block_hash: Hash| { - let approval_entry = state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap(); - - approval_entry.set_our_assignment(approval_db::v1::OurAssignment { - cert: garbage_assignment_cert( - AssignmentCertKind::RelayVRFModulo { sample: 0 } - ), - tranche: 0, - validator_index, - triggered: false, - }.into()); - - assert!(approval_entry.trigger_our_assignment(0).is_some()); - assert!(approval_entry.local_statements().0.is_some()); + set_our_assignment(&mut db, &candidate_hash, &block_hash, 0, |candidate_entry| { + let approval_entry = candidate_entry.approval_entry_mut(&block_hash).unwrap(); + assert!(approval_entry.trigger_our_assignment(0).is_some()); + assert!(approval_entry.local_statements().0.is_some()); + }); }; import_local_assignment(block_hash); @@ -1806,55 +1913,63 @@ fn local_approval_import_always_updates_approval_entry() { } { - let mut actions = import_checked_approval( + let mut overlay_db = OverlayedBackend::new(&db); + let actions = import_checked_approval( &state, + &mut overlay_db, &Metrics(None), - state.db.block_entries.get(&block_hash).unwrap().clone(), + db.load_block_entry(&block_hash).unwrap().unwrap(), candidate_hash, - state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + db.load_candidate_entry(&candidate_hash).unwrap().unwrap(), ApprovalSource::Local(validator_index, sig_a.clone()), ); - assert_eq!(actions.len(), 1); + assert_eq!(actions.len(), 0); + + let mut write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 1); assert_matches!( - actions.get_mut(0).unwrap(), - Action::WriteCandidateEntry(c_hash, ref mut c_entry) => { - assert_eq!(c_hash, &candidate_hash); + write_ops.get_mut(0).unwrap(), + BackendWriteOp::WriteCandidateEntry(ref mut c_entry) => { + assert_eq!(&c_entry.candidate.hash(), &candidate_hash); assert_eq!( c_entry.approval_entry(&block_hash).unwrap().local_statements().1, Some(sig_a), ); assert!(c_entry.mark_approval(validator_index)); - - state.db.candidate_entries.insert(candidate_hash, c_entry.clone()); } ); + + db.write(write_ops).unwrap(); } { - let mut actions = import_checked_approval( + let mut overlay_db = OverlayedBackend::new(&db); + let actions = import_checked_approval( &state, + &mut overlay_db, &Metrics(None), - state.db.block_entries.get(&block_hash_2).unwrap().clone(), + db.load_block_entry(&block_hash_2).unwrap().unwrap(), candidate_hash, - state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + db.load_candidate_entry(&candidate_hash).unwrap().unwrap(), ApprovalSource::Local(validator_index, sig_b.clone()), ); - assert_eq!(actions.len(), 1); + assert_eq!(actions.len(), 0); + + let mut write_ops = overlay_db.into_write_ops().collect::>(); + assert_eq!(write_ops.len(), 1); assert_matches!( - actions.get_mut(0).unwrap(), - Action::WriteCandidateEntry(c_hash, ref mut c_entry) => { - assert_eq!(c_hash, &candidate_hash); + write_ops.get_mut(0).unwrap(), + BackendWriteOp::WriteCandidateEntry(ref mut c_entry) => { + assert_eq!(&c_entry.candidate.hash(), &candidate_hash); assert_eq!( c_entry.approval_entry(&block_hash_2).unwrap().local_statements().1, Some(sig_b), ); assert!(c_entry.mark_approval(validator_index)); - - state.db.candidate_entries.insert(candidate_hash, c_entry.clone()); } ); } diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index d722508ca4..f3abd0a51c 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -223,6 +223,9 @@ pub enum SubsystemError { /// The wrapped error. Marked as source for tracking the error chain. #[source] source: Box }, + + #[error(transparent)] + Io(#[from] std::io::Error), } impl SubsystemError { diff --git a/polkadot/runtime/kusama/src/lib.rs b/polkadot/runtime/kusama/src/lib.rs index 32e8691faa..2951856191 100644 --- a/polkadot/runtime/kusama/src/lib.rs +++ b/polkadot/runtime/kusama/src/lib.rs @@ -120,7 +120,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { spec_name: create_runtime_str!("kusama"), impl_name: create_runtime_str!("parity-kusama"), authoring_version: 2, - spec_version: 9080, + spec_version: 9090, impl_version: 0, #[cfg(not(feature = "disable-runtime-api"))] apis: RUNTIME_API_VERSIONS, diff --git a/polkadot/runtime/polkadot/src/lib.rs b/polkadot/runtime/polkadot/src/lib.rs index c912429605..758df88005 100644 --- a/polkadot/runtime/polkadot/src/lib.rs +++ b/polkadot/runtime/polkadot/src/lib.rs @@ -95,7 +95,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { spec_name: create_runtime_str!("polkadot"), impl_name: create_runtime_str!("parity-polkadot"), authoring_version: 0, - spec_version: 9080, + spec_version: 9090, impl_version: 0, #[cfg(not(feature = "disable-runtime-api"))] apis: RUNTIME_API_VERSIONS, diff --git a/polkadot/runtime/westend/src/lib.rs b/polkadot/runtime/westend/src/lib.rs index 9c1f203680..bad767fd95 100644 --- a/polkadot/runtime/westend/src/lib.rs +++ b/polkadot/runtime/westend/src/lib.rs @@ -119,7 +119,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { spec_name: create_runtime_str!("westend"), impl_name: create_runtime_str!("parity-westend"), authoring_version: 2, - spec_version: 9080, + spec_version: 9090, impl_version: 0, #[cfg(not(feature = "disable-runtime-api"))] apis: RUNTIME_API_VERSIONS,