mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 09:21:04 +00:00
Replace AuxStore with custom RocksDB (#2471)
* Use KeyValueDB in approval-voting * use KVDB instead of AuxStore * add rocksdb to cargo toml * add a Config struct * create new DB in service * fix dep for regular node * make optional * post merge fix Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
committed by
GitHub
parent
85489ceb36
commit
006602eff2
Generated
+4
@@ -5198,8 +5198,12 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"assert_matches",
|
"assert_matches",
|
||||||
"bitvec",
|
"bitvec",
|
||||||
|
"derive_more",
|
||||||
"futures 0.3.12",
|
"futures 0.3.12",
|
||||||
"futures-timer 3.0.2",
|
"futures-timer 3.0.2",
|
||||||
|
"kvdb",
|
||||||
|
"kvdb-memorydb",
|
||||||
|
"kvdb-rocksdb",
|
||||||
"maplit",
|
"maplit",
|
||||||
"merlin",
|
"merlin",
|
||||||
"parity-scale-codec",
|
"parity-scale-codec",
|
||||||
|
|||||||
@@ -13,6 +13,9 @@ tracing-futures = "0.2.4"
|
|||||||
bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] }
|
bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] }
|
||||||
merlin = "2.0"
|
merlin = "2.0"
|
||||||
schnorrkel = "0.9.1"
|
schnorrkel = "0.9.1"
|
||||||
|
kvdb = "0.9.0"
|
||||||
|
kvdb-rocksdb = "0.11.0"
|
||||||
|
derive_more = "0.99.1"
|
||||||
|
|
||||||
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
|
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
|
||||||
polkadot-overseer = { path = "../../overseer" }
|
polkadot-overseer = { path = "../../overseer" }
|
||||||
@@ -36,3 +39,4 @@ sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch =
|
|||||||
maplit = "1.0.2"
|
maplit = "1.0.2"
|
||||||
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
|
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
|
||||||
assert_matches = "1.4.0"
|
assert_matches = "1.4.0"
|
||||||
|
kvdb-memorydb = "0.9.0"
|
||||||
|
|||||||
@@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
//! Version 1 of the DB schema.
|
//! Version 1 of the DB schema.
|
||||||
|
|
||||||
use sc_client_api::backend::AuxStore;
|
use kvdb::{DBTransaction, KeyValueDB};
|
||||||
use polkadot_node_primitives::approval::{DelayTranche, AssignmentCert};
|
use polkadot_node_primitives::approval::{DelayTranche, AssignmentCert};
|
||||||
use polkadot_primitives::v1::{
|
use polkadot_primitives::v1::{
|
||||||
ValidatorIndex, GroupIndex, CandidateReceipt, SessionIndex, CoreIndex,
|
ValidatorIndex, GroupIndex, CandidateReceipt, SessionIndex, CoreIndex,
|
||||||
@@ -27,6 +27,7 @@ use parity_scale_codec::{Encode, Decode};
|
|||||||
|
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
|
use std::sync::Arc;
|
||||||
use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0};
|
use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0};
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -39,6 +40,9 @@ pub struct Tick(u64);
|
|||||||
|
|
||||||
pub type Bitfield = BitVec<BitOrderLsb0, u8>;
|
pub type Bitfield = BitVec<BitOrderLsb0, u8>;
|
||||||
|
|
||||||
|
const NUM_COLUMNS: u32 = 1;
|
||||||
|
const DATA_COL: u32 = 0;
|
||||||
|
|
||||||
const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks";
|
const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks";
|
||||||
|
|
||||||
/// Details pertaining to our assignment on a block.
|
/// Details pertaining to our assignment on a block.
|
||||||
@@ -103,6 +107,33 @@ pub struct BlockEntry {
|
|||||||
pub children: Vec<Hash>,
|
pub children: Vec<Hash>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Clear the given directory and create a RocksDB instance there.
|
||||||
|
pub fn clear_and_recreate(path: &std::path::Path, cache_size: usize)
|
||||||
|
-> std::io::Result<Arc<dyn KeyValueDB>>
|
||||||
|
{
|
||||||
|
use kvdb_rocksdb::{DatabaseConfig, Database as RocksDB};
|
||||||
|
|
||||||
|
tracing::info!("Recreating approval-checking DB at {:?}", path);
|
||||||
|
|
||||||
|
if let Err(e) = std::fs::remove_dir_all(path) {
|
||||||
|
if e.kind() != std::io::ErrorKind::NotFound {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::fs::create_dir_all(path)?;
|
||||||
|
|
||||||
|
let mut db_config = DatabaseConfig::with_columns(NUM_COLUMNS);
|
||||||
|
|
||||||
|
db_config.memory_budget.insert(DATA_COL, cache_size);
|
||||||
|
|
||||||
|
let path = path.to_str().ok_or_else(|| std::io::Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
format!("Non-UTF-8 database path {:?}", path),
|
||||||
|
))?;
|
||||||
|
|
||||||
|
Ok(Arc::new(RocksDB::open(&db_config, path)?))
|
||||||
|
}
|
||||||
|
|
||||||
/// A range from earliest..last block number stored within the DB.
|
/// A range from earliest..last block number stored within the DB.
|
||||||
#[derive(Encode, Decode, Debug, Clone, PartialEq)]
|
#[derive(Encode, Decode, Debug, Clone, PartialEq)]
|
||||||
pub struct StoredBlockRange(BlockNumber, BlockNumber);
|
pub struct StoredBlockRange(BlockNumber, BlockNumber);
|
||||||
@@ -119,14 +150,26 @@ impl From<Tick> for crate::Tick {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Errors while accessing things from the DB.
|
||||||
|
#[derive(Debug, derive_more::From, derive_more::Display)]
|
||||||
|
pub enum Error {
|
||||||
|
Io(std::io::Error),
|
||||||
|
InvalidDecoding(parity_scale_codec::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for Error {}
|
||||||
|
|
||||||
|
/// Result alias for DB errors.
|
||||||
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
/// Canonicalize some particular block, pruning everything before it and
|
/// Canonicalize some particular block, pruning everything before it and
|
||||||
/// pruning any competing branches at the same height.
|
/// pruning any competing branches at the same height.
|
||||||
pub(crate) fn canonicalize(
|
pub(crate) fn canonicalize(
|
||||||
store: &impl AuxStore,
|
store: &dyn KeyValueDB,
|
||||||
canon_number: BlockNumber,
|
canon_number: BlockNumber,
|
||||||
canon_hash: Hash,
|
canon_hash: Hash,
|
||||||
)
|
)
|
||||||
-> sp_blockchain::Result<()>
|
-> Result<()>
|
||||||
{
|
{
|
||||||
let range = match load_stored_blocks(store)? {
|
let range = match load_stored_blocks(store)? {
|
||||||
None => return Ok(()),
|
None => return Ok(()),
|
||||||
@@ -137,8 +180,7 @@ pub(crate) fn canonicalize(
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut deleted_height_keys = Vec::new();
|
let mut transaction = DBTransaction::new();
|
||||||
let mut deleted_block_keys = Vec::new();
|
|
||||||
|
|
||||||
// Storing all candidates in memory is potentially heavy, but should be fine
|
// Storing all candidates in memory is potentially heavy, but should be fine
|
||||||
// as long as finality doesn't stall for a long while. We could optimize this
|
// as long as finality doesn't stall for a long while. We could optimize this
|
||||||
@@ -150,15 +192,15 @@ pub(crate) fn canonicalize(
|
|||||||
|
|
||||||
let visit_and_remove_block_entry = |
|
let visit_and_remove_block_entry = |
|
||||||
block_hash: Hash,
|
block_hash: Hash,
|
||||||
deleted_block_keys: &mut Vec<_>,
|
transaction: &mut DBTransaction,
|
||||||
visited_candidates: &mut HashMap<CandidateHash, CandidateEntry>,
|
visited_candidates: &mut HashMap<CandidateHash, CandidateEntry>,
|
||||||
| -> sp_blockchain::Result<Vec<Hash>> {
|
| -> Result<Vec<Hash>> {
|
||||||
let block_entry = match load_block_entry(store, &block_hash)? {
|
let block_entry = match load_block_entry(store, &block_hash)? {
|
||||||
None => return Ok(Vec::new()),
|
None => return Ok(Vec::new()),
|
||||||
Some(b) => b,
|
Some(b) => b,
|
||||||
};
|
};
|
||||||
|
|
||||||
deleted_block_keys.push(block_entry_key(&block_hash));
|
transaction.delete(DATA_COL, &block_entry_key(&block_hash)[..]);
|
||||||
for &(_, ref candidate_hash) in &block_entry.candidates {
|
for &(_, ref candidate_hash) in &block_entry.candidates {
|
||||||
let candidate = match visited_candidates.entry(*candidate_hash) {
|
let candidate = match visited_candidates.entry(*candidate_hash) {
|
||||||
Entry::Occupied(e) => e.into_mut(),
|
Entry::Occupied(e) => e.into_mut(),
|
||||||
@@ -179,12 +221,12 @@ pub(crate) fn canonicalize(
|
|||||||
// First visit everything before the height.
|
// First visit everything before the height.
|
||||||
for i in range.0..canon_number {
|
for i in range.0..canon_number {
|
||||||
let at_height = load_blocks_at_height(store, i)?;
|
let at_height = load_blocks_at_height(store, i)?;
|
||||||
deleted_height_keys.push(blocks_at_height_key(i));
|
transaction.delete(DATA_COL, &blocks_at_height_key(i)[..]);
|
||||||
|
|
||||||
for b in at_height {
|
for b in at_height {
|
||||||
let _ = visit_and_remove_block_entry(
|
let _ = visit_and_remove_block_entry(
|
||||||
b,
|
b,
|
||||||
&mut deleted_block_keys,
|
&mut transaction,
|
||||||
&mut visited_candidates,
|
&mut visited_candidates,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
@@ -193,7 +235,7 @@ pub(crate) fn canonicalize(
|
|||||||
// Then visit everything at the height.
|
// Then visit everything at the height.
|
||||||
let pruned_branches = {
|
let pruned_branches = {
|
||||||
let at_height = load_blocks_at_height(store, canon_number)?;
|
let at_height = load_blocks_at_height(store, canon_number)?;
|
||||||
deleted_height_keys.push(blocks_at_height_key(canon_number));
|
transaction.delete(DATA_COL, &blocks_at_height_key(canon_number));
|
||||||
|
|
||||||
// Note that while there may be branches descending from blocks at earlier heights,
|
// Note that while there may be branches descending from blocks at earlier heights,
|
||||||
// we have already covered them by removing everything at earlier heights.
|
// we have already covered them by removing everything at earlier heights.
|
||||||
@@ -202,7 +244,7 @@ pub(crate) fn canonicalize(
|
|||||||
for b in at_height {
|
for b in at_height {
|
||||||
let children = visit_and_remove_block_entry(
|
let children = visit_and_remove_block_entry(
|
||||||
b,
|
b,
|
||||||
&mut deleted_block_keys,
|
&mut transaction,
|
||||||
&mut visited_candidates,
|
&mut visited_candidates,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
@@ -220,7 +262,7 @@ pub(crate) fn canonicalize(
|
|||||||
while let Some((height, next_child)) = frontier.pop() {
|
while let Some((height, next_child)) = frontier.pop() {
|
||||||
let children = visit_and_remove_block_entry(
|
let children = visit_and_remove_block_entry(
|
||||||
next_child,
|
next_child,
|
||||||
&mut deleted_block_keys,
|
&mut transaction,
|
||||||
&mut visited_candidates,
|
&mut visited_candidates,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
@@ -240,32 +282,26 @@ pub(crate) fn canonicalize(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`.
|
// Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`.
|
||||||
let (written_candidates, deleted_candidates) = {
|
|
||||||
let mut written = Vec::new();
|
|
||||||
let mut deleted = Vec::new();
|
|
||||||
|
|
||||||
for (candidate_hash, candidate) in visited_candidates {
|
for (candidate_hash, candidate) in visited_candidates {
|
||||||
if candidate.block_assignments.is_empty() {
|
if candidate.block_assignments.is_empty() {
|
||||||
deleted.push(candidate_entry_key(&candidate_hash));
|
transaction.delete(DATA_COL, &candidate_entry_key(&candidate_hash)[..]);
|
||||||
} else {
|
} else {
|
||||||
written.push((candidate_entry_key(&candidate_hash), candidate.encode()));
|
transaction.put_vec(
|
||||||
|
DATA_COL,
|
||||||
|
&candidate_entry_key(&candidate_hash)[..],
|
||||||
|
candidate.encode(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(written, deleted)
|
|
||||||
};
|
|
||||||
|
|
||||||
// Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`.
|
// Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`.
|
||||||
let written_at_height = {
|
for (h, at) in visited_heights {
|
||||||
visited_heights.into_iter().filter_map(|(h, at)| {
|
|
||||||
if at.is_empty() {
|
if at.is_empty() {
|
||||||
deleted_height_keys.push(blocks_at_height_key(h));
|
transaction.delete(DATA_COL, &blocks_at_height_key(h)[..]);
|
||||||
None
|
|
||||||
} else {
|
} else {
|
||||||
Some((blocks_at_height_key(h), at.encode()))
|
transaction.put_vec(DATA_COL, &blocks_at_height_key(h), at.encode());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}).collect::<Vec<_>>()
|
|
||||||
};
|
|
||||||
|
|
||||||
// due to the fork pruning, this range actually might go too far above where our actual highest block is,
|
// due to the fork pruning, this range actually might go too far above where our actual highest block is,
|
||||||
// if a relatively short fork is canonicalized.
|
// if a relatively short fork is canonicalized.
|
||||||
@@ -274,81 +310,20 @@ pub(crate) fn canonicalize(
|
|||||||
std::cmp::max(range.1, canon_number + 2),
|
std::cmp::max(range.1, canon_number + 2),
|
||||||
).encode();
|
).encode();
|
||||||
|
|
||||||
// Because aux-store requires &&[u8], we have to collect.
|
transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], new_range);
|
||||||
|
|
||||||
let inserted_keys: Vec<_> = std::iter::once((&STORED_BLOCKS_KEY[..], &new_range[..]))
|
|
||||||
.chain(written_candidates.iter().map(|&(ref k, ref v)| (&k[..], &v[..])))
|
|
||||||
.chain(written_at_height.iter().map(|&(ref k, ref v)| (&k[..], &v[..])))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let deleted_keys: Vec<_> = deleted_block_keys.iter().map(|k| &k[..])
|
|
||||||
.chain(deleted_height_keys.iter().map(|k| &k[..]))
|
|
||||||
.chain(deleted_candidates.iter().map(|k| &k[..]))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// Update the values on-disk.
|
// Update the values on-disk.
|
||||||
store.insert_aux(
|
store.write(transaction).map_err(Into::into)
|
||||||
inserted_keys.iter(),
|
|
||||||
deleted_keys.iter(),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clear the aux store of everything.
|
fn load_decode<D: Decode>(store: &dyn KeyValueDB, key: &[u8])
|
||||||
pub(crate) fn clear(store: &impl AuxStore)
|
-> Result<Option<D>>
|
||||||
-> sp_blockchain::Result<()>
|
|
||||||
{
|
{
|
||||||
let range = match load_stored_blocks(store)? {
|
match store.get(DATA_COL, key)? {
|
||||||
None => return Ok(()),
|
|
||||||
Some(range) => range,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut visited_height_keys = Vec::new();
|
|
||||||
let mut visited_block_keys = Vec::new();
|
|
||||||
let mut visited_candidate_keys = Vec::new();
|
|
||||||
|
|
||||||
for i in range.0..range.1 {
|
|
||||||
let at_height = load_blocks_at_height(store, i)?;
|
|
||||||
|
|
||||||
visited_height_keys.push(blocks_at_height_key(i));
|
|
||||||
|
|
||||||
for block_hash in at_height {
|
|
||||||
let block_entry = match load_block_entry(store, &block_hash)? {
|
|
||||||
None => continue,
|
|
||||||
Some(e) => e,
|
|
||||||
};
|
|
||||||
|
|
||||||
visited_block_keys.push(block_entry_key(&block_hash));
|
|
||||||
|
|
||||||
for &(_, candidate_hash) in &block_entry.candidates {
|
|
||||||
visited_candidate_keys.push(candidate_entry_key(&candidate_hash));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// unfortunately demands a `collect` because aux store wants `&&[u8]` for some reason.
|
|
||||||
let visited_keys_borrowed = visited_height_keys.iter().map(|x| &x[..])
|
|
||||||
.chain(visited_block_keys.iter().map(|x| &x[..]))
|
|
||||||
.chain(visited_candidate_keys.iter().map(|x| &x[..]))
|
|
||||||
.chain(std::iter::once(&STORED_BLOCKS_KEY[..]))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
store.insert_aux(&[], &visited_keys_borrowed)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_decode<D: Decode>(store: &impl AuxStore, key: &[u8])
|
|
||||||
-> sp_blockchain::Result<Option<D>>
|
|
||||||
{
|
|
||||||
match store.get_aux(key)? {
|
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
Some(raw) => D::decode(&mut &raw[..])
|
Some(raw) => D::decode(&mut &raw[..])
|
||||||
.map(Some)
|
.map(Some)
|
||||||
.map_err(|e| sp_blockchain::Error::Storage(
|
.map_err(Into::into),
|
||||||
format!("Failed to decode item in approvals DB: {:?}", e)
|
|
||||||
)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -372,16 +347,18 @@ pub(crate) struct NewCandidateInfo {
|
|||||||
/// `None` for any of the candidates referenced by the block entry. In these cases,
|
/// `None` for any of the candidates referenced by the block entry. In these cases,
|
||||||
/// no information about new candidates will be referred to by this function.
|
/// no information about new candidates will be referred to by this function.
|
||||||
pub(crate) fn add_block_entry(
|
pub(crate) fn add_block_entry(
|
||||||
store: &impl AuxStore,
|
store: &dyn KeyValueDB,
|
||||||
parent_hash: Hash,
|
parent_hash: Hash,
|
||||||
number: BlockNumber,
|
number: BlockNumber,
|
||||||
entry: BlockEntry,
|
entry: BlockEntry,
|
||||||
n_validators: usize,
|
n_validators: usize,
|
||||||
candidate_info: impl Fn(&CandidateHash) -> Option<NewCandidateInfo>,
|
candidate_info: impl Fn(&CandidateHash) -> Option<NewCandidateInfo>,
|
||||||
) -> sp_blockchain::Result<Vec<(CandidateHash, CandidateEntry)>> {
|
) -> Result<Vec<(CandidateHash, CandidateEntry)>> {
|
||||||
|
let mut transaction = DBTransaction::new();
|
||||||
let session = entry.session;
|
let session = entry.session;
|
||||||
|
|
||||||
let new_block_range = {
|
// Update the stored block range.
|
||||||
|
{
|
||||||
let new_range = match load_stored_blocks(store)? {
|
let new_range = match load_stored_blocks(store)? {
|
||||||
None => Some(StoredBlockRange(number, number + 1)),
|
None => Some(StoredBlockRange(number, number + 1)),
|
||||||
Some(range) => if range.1 <= number {
|
Some(range) => if range.1 <= number {
|
||||||
@@ -391,10 +368,11 @@ pub(crate) fn add_block_entry(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
new_range.map(|n| (STORED_BLOCKS_KEY, n.encode()))
|
new_range.map(|n| transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], n.encode()))
|
||||||
};
|
};
|
||||||
|
|
||||||
let updated_blocks_at = {
|
// Update the blocks at height meta key.
|
||||||
|
{
|
||||||
let mut blocks_at_height = load_blocks_at_height(store, number)?;
|
let mut blocks_at_height = load_blocks_at_height(store, number)?;
|
||||||
if blocks_at_height.contains(&entry.block_hash) {
|
if blocks_at_height.contains(&entry.block_hash) {
|
||||||
// seems we already have a block entry for this block. nothing to do here.
|
// seems we already have a block entry for this block. nothing to do here.
|
||||||
@@ -402,13 +380,13 @@ pub(crate) fn add_block_entry(
|
|||||||
}
|
}
|
||||||
|
|
||||||
blocks_at_height.push(entry.block_hash);
|
blocks_at_height.push(entry.block_hash);
|
||||||
(blocks_at_height_key(number), blocks_at_height.encode())
|
transaction.put_vec(DATA_COL, &blocks_at_height_key(number)[..], blocks_at_height.encode())
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut candidate_entries = Vec::with_capacity(entry.candidates.len());
|
let mut candidate_entries = Vec::with_capacity(entry.candidates.len());
|
||||||
|
|
||||||
let candidate_entry_updates = {
|
// read and write all updated entries.
|
||||||
let mut updated_entries = Vec::with_capacity(entry.candidates.len());
|
{
|
||||||
for &(_, ref candidate_hash) in &entry.candidates {
|
for &(_, ref candidate_hash) in &entry.candidates {
|
||||||
let NewCandidateInfo {
|
let NewCandidateInfo {
|
||||||
candidate,
|
candidate,
|
||||||
@@ -438,43 +416,26 @@ pub(crate) fn add_block_entry(
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
updated_entries.push(
|
transaction.put_vec(
|
||||||
(candidate_entry_key(&candidate_hash), candidate_entry.encode())
|
DATA_COL,
|
||||||
|
&candidate_entry_key(&candidate_hash)[..],
|
||||||
|
candidate_entry.encode(),
|
||||||
);
|
);
|
||||||
|
|
||||||
candidate_entries.push((*candidate_hash, candidate_entry));
|
candidate_entries.push((*candidate_hash, candidate_entry));
|
||||||
}
|
}
|
||||||
|
|
||||||
updated_entries
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let updated_parent = {
|
// Update the child index for the parent.
|
||||||
load_block_entry(store, &parent_hash)?.map(|mut e| {
|
load_block_entry(store, &parent_hash)?.map(|mut e| {
|
||||||
e.children.push(entry.block_hash);
|
e.children.push(entry.block_hash);
|
||||||
(block_entry_key(&parent_hash), e.encode())
|
transaction.put_vec(DATA_COL, &block_entry_key(&parent_hash)[..], e.encode())
|
||||||
})
|
});
|
||||||
};
|
|
||||||
|
|
||||||
let write_block_entry = (block_entry_key(&entry.block_hash), entry.encode());
|
// Put the new block entry in.
|
||||||
|
transaction.put_vec(DATA_COL, &block_entry_key(&entry.block_hash)[..], entry.encode());
|
||||||
// write:
|
|
||||||
// - new block range
|
|
||||||
// - updated blocks-at item
|
|
||||||
// - fresh and updated candidate entries
|
|
||||||
// - the parent block entry.
|
|
||||||
// - the block entry itself
|
|
||||||
|
|
||||||
// Unfortunately have to collect because aux-store demands &(&[u8], &[u8]).
|
|
||||||
let all_keys_and_values: Vec<_> = new_block_range.as_ref().into_iter()
|
|
||||||
.map(|&(ref k, ref v)| (&k[..], &v[..]))
|
|
||||||
.chain(std::iter::once((&updated_blocks_at.0[..], &updated_blocks_at.1[..])))
|
|
||||||
.chain(candidate_entry_updates.iter().map(|&(ref k, ref v)| (&k[..], &v[..])))
|
|
||||||
.chain(std::iter::once((&write_block_entry.0[..], &write_block_entry.1[..])))
|
|
||||||
.chain(updated_parent.as_ref().into_iter().map(|&(ref k, ref v)| (&k[..], &v[..])))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
store.insert_aux(&all_keys_and_values, &[])?;
|
|
||||||
|
|
||||||
|
store.write(transaction)?;
|
||||||
Ok(candidate_entries)
|
Ok(candidate_entries)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -501,57 +462,55 @@ impl Transaction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Write the contents of the transaction, atomically, to the DB.
|
/// Write the contents of the transaction, atomically, to the DB.
|
||||||
pub(crate) fn write(self, db: &impl AuxStore) -> sp_blockchain::Result<()> {
|
pub(crate) fn write(self, db: &dyn KeyValueDB) -> Result<()> {
|
||||||
if self.block_entries.is_empty() && self.candidate_entries.is_empty() {
|
if self.block_entries.is_empty() && self.candidate_entries.is_empty() {
|
||||||
return Ok(())
|
return Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
let blocks: Vec<_> = self.block_entries.into_iter().map(|(hash, entry)| {
|
let mut db_transaction = DBTransaction::new();
|
||||||
|
|
||||||
|
for (hash, entry) in self.block_entries {
|
||||||
let k = block_entry_key(&hash);
|
let k = block_entry_key(&hash);
|
||||||
let v = entry.encode();
|
let v = entry.encode();
|
||||||
|
|
||||||
(k, v)
|
db_transaction.put_vec(DATA_COL, &k, v);
|
||||||
}).collect();
|
}
|
||||||
|
|
||||||
let candidates: Vec<_> = self.candidate_entries.into_iter().map(|(hash, entry)| {
|
for (hash, entry) in self.candidate_entries {
|
||||||
let k = candidate_entry_key(&hash);
|
let k = candidate_entry_key(&hash);
|
||||||
let v = entry.encode();
|
let v = entry.encode();
|
||||||
|
|
||||||
(k, v)
|
db_transaction.put_vec(DATA_COL, &k, v);
|
||||||
}).collect();
|
}
|
||||||
|
|
||||||
let kv = blocks.iter().map(|(k, v)| (&k[..], &v[..]))
|
db.write(db_transaction).map_err(Into::into)
|
||||||
.chain(candidates.iter().map(|(k, v)| (&k[..], &v[..])))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
db.insert_aux(&kv, &[])
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load the stored-blocks key from the state.
|
/// Load the stored-blocks key from the state.
|
||||||
fn load_stored_blocks(store: &impl AuxStore)
|
fn load_stored_blocks(store: &dyn KeyValueDB)
|
||||||
-> sp_blockchain::Result<Option<StoredBlockRange>>
|
-> Result<Option<StoredBlockRange>>
|
||||||
{
|
{
|
||||||
load_decode(store, STORED_BLOCKS_KEY)
|
load_decode(store, STORED_BLOCKS_KEY)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load a blocks-at-height entry for a given block number.
|
/// Load a blocks-at-height entry for a given block number.
|
||||||
pub(crate) fn load_blocks_at_height(store: &impl AuxStore, block_number: BlockNumber)
|
pub(crate) fn load_blocks_at_height(store: &dyn KeyValueDB, block_number: BlockNumber)
|
||||||
-> sp_blockchain::Result<Vec<Hash>> {
|
-> Result<Vec<Hash>> {
|
||||||
load_decode(store, &blocks_at_height_key(block_number))
|
load_decode(store, &blocks_at_height_key(block_number))
|
||||||
.map(|x| x.unwrap_or_default())
|
.map(|x| x.unwrap_or_default())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load a block entry from the aux store.
|
/// Load a block entry from the aux store.
|
||||||
pub(crate) fn load_block_entry(store: &impl AuxStore, block_hash: &Hash)
|
pub(crate) fn load_block_entry(store: &dyn KeyValueDB, block_hash: &Hash)
|
||||||
-> sp_blockchain::Result<Option<BlockEntry>>
|
-> Result<Option<BlockEntry>>
|
||||||
{
|
{
|
||||||
load_decode(store, &block_entry_key(block_hash))
|
load_decode(store, &block_entry_key(block_hash))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load a candidate entry from the aux store.
|
/// Load a candidate entry from the aux store.
|
||||||
pub(crate) fn load_candidate_entry(store: &impl AuxStore, candidate_hash: &CandidateHash)
|
pub(crate) fn load_candidate_entry(store: &dyn KeyValueDB, candidate_hash: &CandidateHash)
|
||||||
-> sp_blockchain::Result<Option<CandidateEntry>>
|
-> Result<Option<CandidateEntry>>
|
||||||
{
|
{
|
||||||
load_decode(store, &candidate_entry_key(candidate_hash))
|
load_decode(store, &candidate_entry_key(candidate_hash))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,65 +18,38 @@
|
|||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use polkadot_primitives::v1::Id as ParaId;
|
use polkadot_primitives::v1::Id as ParaId;
|
||||||
use std::cell::RefCell;
|
|
||||||
|
|
||||||
#[derive(Default)]
|
pub(crate) fn write_stored_blocks(tx: &mut DBTransaction, range: StoredBlockRange) {
|
||||||
pub struct TestStore {
|
tx.put_vec(
|
||||||
inner: RefCell<HashMap<Vec<u8>, Vec<u8>>>,
|
DATA_COL,
|
||||||
}
|
&STORED_BLOCKS_KEY[..],
|
||||||
|
|
||||||
impl AuxStore for TestStore {
|
|
||||||
fn insert_aux<'a, 'b: 'a, 'c: 'a, I, D>(&self, insertions: I, deletions: D) -> sp_blockchain::Result<()>
|
|
||||||
where I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>, D: IntoIterator<Item = &'a &'b [u8]>
|
|
||||||
{
|
|
||||||
let mut store = self.inner.borrow_mut();
|
|
||||||
|
|
||||||
// insertions before deletions.
|
|
||||||
for (k, v) in insertions {
|
|
||||||
store.insert(k.to_vec(), v.to_vec());
|
|
||||||
}
|
|
||||||
|
|
||||||
for k in deletions {
|
|
||||||
store.remove(&k[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_aux(&self, key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
|
|
||||||
Ok(self.inner.borrow().get(key).map(|v| v.clone()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TestStore {
|
|
||||||
pub(crate) fn write_stored_blocks(&self, range: StoredBlockRange) {
|
|
||||||
self.inner.borrow_mut().insert(
|
|
||||||
STORED_BLOCKS_KEY.to_vec(),
|
|
||||||
range.encode(),
|
range.encode(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn write_blocks_at_height(&self, height: BlockNumber, blocks: &[Hash]) {
|
pub(crate) fn write_blocks_at_height(tx: &mut DBTransaction, height: BlockNumber, blocks: &[Hash]) {
|
||||||
self.inner.borrow_mut().insert(
|
tx.put_vec(
|
||||||
blocks_at_height_key(height).to_vec(),
|
DATA_COL,
|
||||||
|
&blocks_at_height_key(height)[..],
|
||||||
blocks.encode(),
|
blocks.encode(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn write_block_entry(&self, block_hash: &Hash, entry: &BlockEntry) {
|
pub(crate) fn write_block_entry(tx: &mut DBTransaction, block_hash: &Hash, entry: &BlockEntry) {
|
||||||
self.inner.borrow_mut().insert(
|
tx.put_vec(
|
||||||
block_entry_key(block_hash).to_vec(),
|
DATA_COL,
|
||||||
|
&block_entry_key(block_hash)[..],
|
||||||
entry.encode(),
|
entry.encode(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn write_candidate_entry(&self, candidate_hash: &CandidateHash, entry: &CandidateEntry) {
|
pub(crate) fn write_candidate_entry(tx: &mut DBTransaction, candidate_hash: &CandidateHash, entry: &CandidateEntry) {
|
||||||
self.inner.borrow_mut().insert(
|
tx.put_vec(
|
||||||
candidate_entry_key(candidate_hash).to_vec(),
|
DATA_COL,
|
||||||
|
&candidate_entry_key(candidate_hash)[..],
|
||||||
entry.encode(),
|
entry.encode(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
fn make_bitvec(len: usize) -> BitVec<BitOrderLsb0, u8> {
|
fn make_bitvec(len: usize) -> BitVec<BitOrderLsb0, u8> {
|
||||||
bitvec::bitvec![BitOrderLsb0, u8; 0; len]
|
bitvec::bitvec![BitOrderLsb0, u8; 0; len]
|
||||||
@@ -108,7 +81,7 @@ fn make_candidate(para_id: ParaId, relay_parent: Hash) -> CandidateReceipt {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn read_write() {
|
fn read_write() {
|
||||||
let store = TestStore::default();
|
let store = kvdb_memorydb::create(1);
|
||||||
|
|
||||||
let hash_a = Hash::repeat_byte(1);
|
let hash_a = Hash::repeat_byte(1);
|
||||||
let hash_b = Hash::repeat_byte(2);
|
let hash_b = Hash::repeat_byte(2);
|
||||||
@@ -137,10 +110,14 @@ fn read_write() {
|
|||||||
approvals: Default::default(),
|
approvals: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
store.write_stored_blocks(range.clone());
|
let mut tx = DBTransaction::new();
|
||||||
store.write_blocks_at_height(1, &at_height);
|
|
||||||
store.write_block_entry(&hash_a, &block_entry);
|
write_stored_blocks(&mut tx, range.clone());
|
||||||
store.write_candidate_entry(&candidate_hash, &candidate_entry);
|
write_blocks_at_height(&mut tx, 1, &at_height);
|
||||||
|
write_block_entry(&mut tx, &hash_a, &block_entry);
|
||||||
|
write_candidate_entry(&mut tx, &candidate_hash, &candidate_entry);
|
||||||
|
|
||||||
|
store.write(tx).unwrap();
|
||||||
|
|
||||||
assert_eq!(load_stored_blocks(&store).unwrap(), Some(range));
|
assert_eq!(load_stored_blocks(&store).unwrap(), Some(range));
|
||||||
assert_eq!(load_blocks_at_height(&store, 1).unwrap(), at_height);
|
assert_eq!(load_blocks_at_height(&store, 1).unwrap(), at_height);
|
||||||
@@ -154,8 +131,12 @@ fn read_write() {
|
|||||||
candidate_entry_key(&candidate_hash).to_vec(),
|
candidate_entry_key(&candidate_hash).to_vec(),
|
||||||
];
|
];
|
||||||
|
|
||||||
let delete_keys: Vec<_> = delete_keys.iter().map(|k| &k[..]).collect();
|
let mut tx = DBTransaction::new();
|
||||||
store.insert_aux(&[], &delete_keys).unwrap();
|
for key in delete_keys {
|
||||||
|
tx.delete(DATA_COL, &key[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
store.write(tx).unwrap();
|
||||||
|
|
||||||
assert!(load_stored_blocks(&store).unwrap().is_none());
|
assert!(load_stored_blocks(&store).unwrap().is_none());
|
||||||
assert!(load_blocks_at_height(&store, 1).unwrap().is_empty());
|
assert!(load_blocks_at_height(&store, 1).unwrap().is_empty());
|
||||||
@@ -165,7 +146,7 @@ fn read_write() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn add_block_entry_works() {
|
fn add_block_entry_works() {
|
||||||
let store = TestStore::default();
|
let store = kvdb_memorydb::create(1);
|
||||||
|
|
||||||
let parent_hash = Hash::repeat_byte(1);
|
let parent_hash = Hash::repeat_byte(1);
|
||||||
let block_hash_a = Hash::repeat_byte(2);
|
let block_hash_a = Hash::repeat_byte(2);
|
||||||
@@ -230,7 +211,7 @@ fn add_block_entry_works() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn add_block_entry_adds_child() {
|
fn add_block_entry_adds_child() {
|
||||||
let store = TestStore::default();
|
let store = kvdb_memorydb::create(1);
|
||||||
|
|
||||||
let parent_hash = Hash::repeat_byte(1);
|
let parent_hash = Hash::repeat_byte(1);
|
||||||
let block_hash_a = Hash::repeat_byte(2);
|
let block_hash_a = Hash::repeat_byte(2);
|
||||||
@@ -272,59 +253,9 @@ fn add_block_entry_adds_child() {
|
|||||||
assert_eq!(load_block_entry(&store, &block_hash_b).unwrap(), Some(block_entry_b));
|
assert_eq!(load_block_entry(&store, &block_hash_b).unwrap(), Some(block_entry_b));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn clear_works() {
|
|
||||||
let store = TestStore::default();
|
|
||||||
|
|
||||||
let hash_a = Hash::repeat_byte(1);
|
|
||||||
let hash_b = Hash::repeat_byte(2);
|
|
||||||
let candidate_hash = CandidateHash(Hash::repeat_byte(3));
|
|
||||||
|
|
||||||
let range = StoredBlockRange(0, 5);
|
|
||||||
let at_height = vec![hash_a, hash_b];
|
|
||||||
|
|
||||||
let block_entry = make_block_entry(
|
|
||||||
hash_a,
|
|
||||||
vec![(CoreIndex(0), candidate_hash)],
|
|
||||||
);
|
|
||||||
|
|
||||||
let candidate_entry = CandidateEntry {
|
|
||||||
candidate: Default::default(),
|
|
||||||
session: 5,
|
|
||||||
block_assignments: vec![
|
|
||||||
(hash_a, ApprovalEntry {
|
|
||||||
tranches: Vec::new(),
|
|
||||||
backing_group: GroupIndex(1),
|
|
||||||
our_assignment: None,
|
|
||||||
assignments: Default::default(),
|
|
||||||
approved: false,
|
|
||||||
})
|
|
||||||
].into_iter().collect(),
|
|
||||||
approvals: Default::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
store.write_stored_blocks(range.clone());
|
|
||||||
store.write_blocks_at_height(1, &at_height);
|
|
||||||
store.write_block_entry(&hash_a, &block_entry);
|
|
||||||
store.write_candidate_entry(&candidate_hash, &candidate_entry);
|
|
||||||
|
|
||||||
assert_eq!(load_stored_blocks(&store).unwrap(), Some(range));
|
|
||||||
assert_eq!(load_blocks_at_height(&store, 1).unwrap(), at_height);
|
|
||||||
assert_eq!(load_block_entry(&store, &hash_a).unwrap(), Some(block_entry));
|
|
||||||
assert_eq!(load_candidate_entry(&store, &candidate_hash).unwrap(), Some(candidate_entry));
|
|
||||||
|
|
||||||
clear(&store).unwrap();
|
|
||||||
|
|
||||||
assert!(load_stored_blocks(&store).unwrap().is_none());
|
|
||||||
assert!(load_blocks_at_height(&store, 1).unwrap().is_empty());
|
|
||||||
assert!(load_block_entry(&store, &hash_a).unwrap().is_none());
|
|
||||||
assert!(load_candidate_entry(&store, &candidate_hash).unwrap().is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn canonicalize_works() {
|
fn canonicalize_works() {
|
||||||
let store = TestStore::default();
|
let store = kvdb_memorydb::create(1);
|
||||||
|
|
||||||
// -> B1 -> C1 -> D1
|
// -> B1 -> C1 -> D1
|
||||||
// A -> B2 -> C2 -> D2
|
// A -> B2 -> C2 -> D2
|
||||||
@@ -341,7 +272,9 @@ fn canonicalize_works() {
|
|||||||
|
|
||||||
let n_validators = 10;
|
let n_validators = 10;
|
||||||
|
|
||||||
store.write_stored_blocks(StoredBlockRange(1, 5));
|
let mut tx = DBTransaction::new();
|
||||||
|
write_stored_blocks(&mut tx, StoredBlockRange(1, 5));
|
||||||
|
store.write(tx).unwrap();
|
||||||
|
|
||||||
let genesis = Hash::repeat_byte(0);
|
let genesis = Hash::repeat_byte(0);
|
||||||
|
|
||||||
|
|||||||
@@ -42,8 +42,8 @@ use polkadot_node_primitives::approval::{
|
|||||||
self as approval_types, BlockApprovalMeta, RelayVRFStory,
|
self as approval_types, BlockApprovalMeta, RelayVRFStory,
|
||||||
};
|
};
|
||||||
use sc_keystore::LocalKeystore;
|
use sc_keystore::LocalKeystore;
|
||||||
use sc_client_api::backend::AuxStore;
|
|
||||||
use sp_consensus_slots::Slot;
|
use sp_consensus_slots::Slot;
|
||||||
|
use kvdb::KeyValueDB;
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::channel::oneshot;
|
use futures::channel::oneshot;
|
||||||
@@ -516,7 +516,7 @@ pub struct BlockImportedCandidates {
|
|||||||
pub(crate) async fn handle_new_head(
|
pub(crate) async fn handle_new_head(
|
||||||
ctx: &mut impl SubsystemContext,
|
ctx: &mut impl SubsystemContext,
|
||||||
state: &mut State<impl DBReader>,
|
state: &mut State<impl DBReader>,
|
||||||
db_writer: &impl AuxStore,
|
db_writer: &dyn KeyValueDB,
|
||||||
head: Hash,
|
head: Hash,
|
||||||
finalized_number: &Option<BlockNumber>,
|
finalized_number: &Option<BlockNumber>,
|
||||||
) -> SubsystemResult<Vec<BlockImportedCandidates>> {
|
) -> SubsystemResult<Vec<BlockImportedCandidates>> {
|
||||||
@@ -1610,7 +1610,7 @@ mod tests {
|
|||||||
}.into(),
|
}.into(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let db_writer = crate::approval_db::v1::tests::TestStore::default();
|
let db_writer = kvdb_memorydb::create(1);
|
||||||
|
|
||||||
let test_fut = {
|
let test_fut = {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|||||||
@@ -44,9 +44,9 @@ use polkadot_node_primitives::approval::{
|
|||||||
use parity_scale_codec::Encode;
|
use parity_scale_codec::Encode;
|
||||||
use sc_keystore::LocalKeystore;
|
use sc_keystore::LocalKeystore;
|
||||||
use sp_consensus_slots::Slot;
|
use sp_consensus_slots::Slot;
|
||||||
use sc_client_api::backend::AuxStore;
|
|
||||||
use sp_runtime::traits::AppVerify;
|
use sp_runtime::traits::AppVerify;
|
||||||
use sp_application_crypto::Pair;
|
use sp_application_crypto::Pair;
|
||||||
|
use kvdb::KeyValueDB;
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::channel::{mpsc, oneshot};
|
use futures::channel::{mpsc, oneshot};
|
||||||
@@ -74,29 +74,53 @@ mod tests;
|
|||||||
const APPROVAL_SESSIONS: SessionIndex = 6;
|
const APPROVAL_SESSIONS: SessionIndex = 6;
|
||||||
const LOG_TARGET: &str = "approval_voting";
|
const LOG_TARGET: &str = "approval_voting";
|
||||||
|
|
||||||
|
/// Configuration for the approval voting subsystem
|
||||||
|
pub struct Config {
|
||||||
|
/// The path where the approval-voting DB should be kept. This directory is completely removed when starting
|
||||||
|
/// the service.
|
||||||
|
pub path: std::path::PathBuf,
|
||||||
|
/// The cache size, in bytes, to spend on approval checking metadata.
|
||||||
|
pub cache_size: Option<usize>,
|
||||||
|
/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
|
||||||
|
/// divisible by 500.
|
||||||
|
pub slot_duration_millis: u64,
|
||||||
|
}
|
||||||
|
|
||||||
/// The approval voting subsystem.
|
/// The approval voting subsystem.
|
||||||
pub struct ApprovalVotingSubsystem<T> {
|
pub struct ApprovalVotingSubsystem {
|
||||||
keystore: Arc<LocalKeystore>,
|
keystore: Arc<LocalKeystore>,
|
||||||
slot_duration_millis: u64,
|
slot_duration_millis: u64,
|
||||||
db: Arc<T>,
|
db: Arc<dyn KeyValueDB>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ApprovalVotingSubsystem<T> {
|
impl ApprovalVotingSubsystem {
|
||||||
/// Create a new approval voting subsystem with the given keystore, slot duration,
|
/// Create a new approval voting subsystem with the given keystore, slot duration,
|
||||||
/// and underlying DB.
|
/// which creates a DB at the given path. This function will delete the directory
|
||||||
pub fn new(keystore: Arc<LocalKeystore>, slot_duration_millis: u64, db: Arc<T>) -> Self {
|
/// at the given path if it already exists.
|
||||||
ApprovalVotingSubsystem {
|
pub fn with_config(
|
||||||
|
config: Config,
|
||||||
|
keystore: Arc<LocalKeystore>,
|
||||||
|
) -> std::io::Result<Self> {
|
||||||
|
const DEFAULT_CACHE_SIZE: usize = 100 * 1024 * 1024; // 100MiB default should be fine unless finality stalls.
|
||||||
|
|
||||||
|
let db = approval_db::v1::clear_and_recreate(
|
||||||
|
&config.path,
|
||||||
|
config.cache_size.unwrap_or(DEFAULT_CACHE_SIZE),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(ApprovalVotingSubsystem {
|
||||||
keystore,
|
keystore,
|
||||||
slot_duration_millis,
|
slot_duration_millis: config.slot_duration_millis,
|
||||||
db,
|
db,
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, C> Subsystem<C> for ApprovalVotingSubsystem<T>
|
impl<C> Subsystem<C> for ApprovalVotingSubsystem
|
||||||
where T: AuxStore + Send + Sync + 'static, C: SubsystemContext<Message = ApprovalVotingMessage> {
|
where C: SubsystemContext<Message = ApprovalVotingMessage>
|
||||||
|
{
|
||||||
fn start(self, ctx: C) -> SpawnedSubsystem {
|
fn start(self, ctx: C) -> SpawnedSubsystem {
|
||||||
let future = run::<T, C>(
|
let future = run::<C>(
|
||||||
ctx,
|
ctx,
|
||||||
self,
|
self,
|
||||||
Box::new(SystemClock),
|
Box::new(SystemClock),
|
||||||
@@ -204,20 +228,20 @@ trait DBReader {
|
|||||||
// This is a submodule to enforce opacity of the inner DB type.
|
// This is a submodule to enforce opacity of the inner DB type.
|
||||||
mod approval_db_v1_reader {
|
mod approval_db_v1_reader {
|
||||||
use super::{
|
use super::{
|
||||||
DBReader, AuxStore, Hash, CandidateHash, BlockEntry, CandidateEntry,
|
DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry,
|
||||||
Arc, SubsystemResult, SubsystemError, approval_db,
|
Arc, SubsystemResult, SubsystemError, approval_db,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A DB reader that uses the approval-db V1 under the hood.
|
/// A DB reader that uses the approval-db V1 under the hood.
|
||||||
pub(super) struct ApprovalDBV1Reader<T>(Arc<T>);
|
pub(super) struct ApprovalDBV1Reader<T: ?Sized>(Arc<T>);
|
||||||
|
|
||||||
impl<T> From<Arc<T>> for ApprovalDBV1Reader<T> {
|
impl<T: ?Sized> From<Arc<T>> for ApprovalDBV1Reader<T> {
|
||||||
fn from(a: Arc<T>) -> Self {
|
fn from(a: Arc<T>) -> Self {
|
||||||
ApprovalDBV1Reader(a)
|
ApprovalDBV1Reader(a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: AuxStore> DBReader for ApprovalDBV1Reader<T> {
|
impl DBReader for ApprovalDBV1Reader<dyn KeyValueDB> {
|
||||||
fn load_block_entry(
|
fn load_block_entry(
|
||||||
&self,
|
&self,
|
||||||
block_hash: &Hash,
|
block_hash: &Hash,
|
||||||
@@ -273,13 +297,13 @@ enum Action {
|
|||||||
Conclude,
|
Conclude,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run<T, C>(
|
async fn run<C>(
|
||||||
mut ctx: C,
|
mut ctx: C,
|
||||||
subsystem: ApprovalVotingSubsystem<T>,
|
subsystem: ApprovalVotingSubsystem,
|
||||||
clock: Box<dyn Clock + Send + Sync>,
|
clock: Box<dyn Clock + Send + Sync>,
|
||||||
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
|
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
|
||||||
) -> SubsystemResult<()>
|
) -> SubsystemResult<()>
|
||||||
where T: AuxStore + Send + Sync + 'static, C: SubsystemContext<Message = ApprovalVotingMessage>
|
where C: SubsystemContext<Message = ApprovalVotingMessage>
|
||||||
{
|
{
|
||||||
let (background_tx, background_rx) = mpsc::channel::<BackgroundRequest>(64);
|
let (background_tx, background_rx) = mpsc::channel::<BackgroundRequest>(64);
|
||||||
let mut state = State {
|
let mut state = State {
|
||||||
@@ -298,11 +322,6 @@ async fn run<T, C>(
|
|||||||
|
|
||||||
let db_writer = &*subsystem.db;
|
let db_writer = &*subsystem.db;
|
||||||
|
|
||||||
if let Err(e) = approval_db::v1::clear(db_writer) {
|
|
||||||
tracing::warn!(target: LOG_TARGET, "Failed to clear DB: {:?}", e);
|
|
||||||
return Err(SubsystemError::with_origin("db", e));
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let wait_til_next_tick = match wakeups.first() {
|
let wait_til_next_tick = match wakeups.first() {
|
||||||
None => future::Either::Left(future::pending()),
|
None => future::Either::Left(future::pending()),
|
||||||
@@ -367,7 +386,7 @@ async fn run<T, C>(
|
|||||||
async fn handle_actions(
|
async fn handle_actions(
|
||||||
ctx: &mut impl SubsystemContext,
|
ctx: &mut impl SubsystemContext,
|
||||||
wakeups: &mut Wakeups,
|
wakeups: &mut Wakeups,
|
||||||
db: &impl AuxStore,
|
db: &dyn KeyValueDB,
|
||||||
background_tx: &mpsc::Sender<BackgroundRequest>,
|
background_tx: &mpsc::Sender<BackgroundRequest>,
|
||||||
actions: impl IntoIterator<Item = Action>,
|
actions: impl IntoIterator<Item = Action>,
|
||||||
) -> SubsystemResult<bool> {
|
) -> SubsystemResult<bool> {
|
||||||
@@ -427,7 +446,7 @@ async fn handle_actions(
|
|||||||
async fn handle_from_overseer(
|
async fn handle_from_overseer(
|
||||||
ctx: &mut impl SubsystemContext,
|
ctx: &mut impl SubsystemContext,
|
||||||
state: &mut State<impl DBReader>,
|
state: &mut State<impl DBReader>,
|
||||||
db_writer: &impl AuxStore,
|
db_writer: &dyn KeyValueDB,
|
||||||
x: FromOverseer<ApprovalVotingMessage>,
|
x: FromOverseer<ApprovalVotingMessage>,
|
||||||
last_finalized_height: &mut Option<BlockNumber>,
|
last_finalized_height: &mut Option<BlockNumber>,
|
||||||
) -> SubsystemResult<Vec<Action>> {
|
) -> SubsystemResult<Vec<Action>> {
|
||||||
|
|||||||
@@ -107,6 +107,7 @@ default = ["db", "full-node"]
|
|||||||
db = ["service/db"]
|
db = ["service/db"]
|
||||||
full-node = [
|
full-node = [
|
||||||
"polkadot-node-core-av-store",
|
"polkadot-node-core-av-store",
|
||||||
|
"polkadot-node-core-approval-voting",
|
||||||
"sc-finality-grandpa-warp-sync"
|
"sc-finality-grandpa-warp-sync"
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -134,7 +135,6 @@ real-overseer = [
|
|||||||
"polkadot-pov-distribution",
|
"polkadot-pov-distribution",
|
||||||
"polkadot-statement-distribution",
|
"polkadot-statement-distribution",
|
||||||
"polkadot-approval-distribution",
|
"polkadot-approval-distribution",
|
||||||
"polkadot-node-core-approval-voting",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
approval-checking = [
|
approval-checking = [
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ use {
|
|||||||
tracing::info,
|
tracing::info,
|
||||||
polkadot_node_core_av_store::Config as AvailabilityConfig,
|
polkadot_node_core_av_store::Config as AvailabilityConfig,
|
||||||
polkadot_node_core_av_store::Error as AvailabilityError,
|
polkadot_node_core_av_store::Error as AvailabilityError,
|
||||||
|
polkadot_node_core_approval_voting::Config as ApprovalVotingConfig,
|
||||||
polkadot_node_core_proposer::ProposerFactory,
|
polkadot_node_core_proposer::ProposerFactory,
|
||||||
polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler},
|
polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler},
|
||||||
polkadot_primitives::v1::ParachainHost,
|
polkadot_primitives::v1::ParachainHost,
|
||||||
@@ -137,6 +138,10 @@ pub enum Error {
|
|||||||
|
|
||||||
#[error("Authorities require the real overseer implementation")]
|
#[error("Authorities require the real overseer implementation")]
|
||||||
AuthoritiesRequireRealOverseer,
|
AuthoritiesRequireRealOverseer,
|
||||||
|
|
||||||
|
#[cfg(feature = "full-node")]
|
||||||
|
#[error("Creating a custom database is required for validators")]
|
||||||
|
DatabasePathRequired,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network.
|
/// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network.
|
||||||
@@ -358,7 +363,7 @@ fn real_overseer<Spawner, RuntimeClient>(
|
|||||||
spawner: Spawner,
|
spawner: Spawner,
|
||||||
_: IsCollator,
|
_: IsCollator,
|
||||||
_: IsolationStrategy,
|
_: IsolationStrategy,
|
||||||
_: u64,
|
_: ApprovalVotingConfig,
|
||||||
) -> Result<(Overseer<Spawner>, OverseerHandler), Error>
|
) -> Result<(Overseer<Spawner>, OverseerHandler), Error>
|
||||||
where
|
where
|
||||||
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
|
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
|
||||||
@@ -386,7 +391,7 @@ fn real_overseer<Spawner, RuntimeClient>(
|
|||||||
spawner: Spawner,
|
spawner: Spawner,
|
||||||
is_collator: IsCollator,
|
is_collator: IsCollator,
|
||||||
isolation_strategy: IsolationStrategy,
|
isolation_strategy: IsolationStrategy,
|
||||||
slot_duration: u64,
|
approval_voting_config: ApprovalVotingConfig,
|
||||||
) -> Result<(Overseer<Spawner>, OverseerHandler), Error>
|
) -> Result<(Overseer<Spawner>, OverseerHandler), Error>
|
||||||
where
|
where
|
||||||
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
|
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
|
||||||
@@ -417,7 +422,7 @@ where
|
|||||||
use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
|
use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
|
||||||
|
|
||||||
#[cfg(not(feature = "approval-checking"))]
|
#[cfg(not(feature = "approval-checking"))]
|
||||||
let _ = slot_duration; // silence.
|
let _ = approval_voting_config; // silence.
|
||||||
|
|
||||||
let all_subsystems = AllSubsystems {
|
let all_subsystems = AllSubsystems {
|
||||||
availability_distribution: AvailabilityDistributionSubsystem::new(
|
availability_distribution: AvailabilityDistributionSubsystem::new(
|
||||||
@@ -494,11 +499,10 @@ where
|
|||||||
Metrics::register(registry)?,
|
Metrics::register(registry)?,
|
||||||
),
|
),
|
||||||
#[cfg(feature = "approval-checking")]
|
#[cfg(feature = "approval-checking")]
|
||||||
approval_voting: ApprovalVotingSubsystem::new(
|
approval_voting: ApprovalVotingSubsystem::with_config(
|
||||||
|
approval_voting_config,
|
||||||
keystore.clone(),
|
keystore.clone(),
|
||||||
slot_duration,
|
)?,
|
||||||
runtime_client.clone(),
|
|
||||||
),
|
|
||||||
#[cfg(not(feature = "approval-checking"))]
|
#[cfg(not(feature = "approval-checking"))]
|
||||||
approval_voting: polkadot_subsystem::DummySubsystem,
|
approval_voting: polkadot_subsystem::DummySubsystem,
|
||||||
};
|
};
|
||||||
@@ -656,6 +660,14 @@ pub fn new_full<RuntimeApi, Executor>(
|
|||||||
|
|
||||||
let availability_config = config.database.clone().try_into().map_err(Error::Availability)?;
|
let availability_config = config.database.clone().try_into().map_err(Error::Availability)?;
|
||||||
|
|
||||||
|
let approval_voting_config = ApprovalVotingConfig {
|
||||||
|
path: config.database.path()
|
||||||
|
.ok_or(Error::DatabasePathRequired)?
|
||||||
|
.join("parachains").join("approval-voting"),
|
||||||
|
slot_duration_millis: slot_duration,
|
||||||
|
cache_size: None, // default is fine.
|
||||||
|
};
|
||||||
|
|
||||||
let telemetry_span = TelemetrySpan::new();
|
let telemetry_span = TelemetrySpan::new();
|
||||||
let _telemetry_span_entered = telemetry_span.enter();
|
let _telemetry_span_entered = telemetry_span.enter();
|
||||||
|
|
||||||
@@ -749,7 +761,7 @@ pub fn new_full<RuntimeApi, Executor>(
|
|||||||
spawner,
|
spawner,
|
||||||
is_collator,
|
is_collator,
|
||||||
isolation_strategy,
|
isolation_strategy,
|
||||||
slot_duration,
|
approval_voting_config,
|
||||||
)?;
|
)?;
|
||||||
let overseer_handler_clone = overseer_handler.clone();
|
let overseer_handler_clone = overseer_handler.clone();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user