diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 8fd2243071..49ee4697f8 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5650,6 +5650,7 @@ dependencies = [ "assert_matches", "async-trait", "futures 0.3.13", + "futures-timer 3.0.2", "parity-scale-codec", "parking_lot 0.11.1", "polkadot-node-network-protocol", @@ -5659,6 +5660,7 @@ dependencies = [ "polkadot-primitives", "sc-authority-discovery", "sc-network", + "sp-consensus", "sp-core", "sp-keyring", "strum", @@ -5692,7 +5694,6 @@ dependencies = [ "futures-timer 3.0.2", "kvdb", "kvdb-memorydb", - "kvdb-rocksdb", "maplit", "merlin", "parity-scale-codec", @@ -5710,6 +5711,7 @@ dependencies = [ "schnorrkel", "sp-application-crypto", "sp-blockchain", + "sp-consensus", "sp-consensus-babe", "sp-consensus-slots", "sp-core", @@ -5730,7 +5732,6 @@ dependencies = [ "futures-timer 3.0.2", "kvdb", "kvdb-memorydb", - "kvdb-rocksdb", "log", "parity-scale-codec", "parking_lot 0.11.1", @@ -5741,7 +5742,6 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-overseer", "polkadot-primitives", - "sc-service", "sp-core", "sp-keyring", "thiserror", @@ -6345,6 +6345,8 @@ dependencies = [ "futures 0.3.13", "hex-literal", "kusama-runtime", + "kvdb", + "kvdb-rocksdb", "pallet-babe", "pallet-im-online", "pallet-mmr-primitives", diff --git a/polkadot/node/core/approval-voting/Cargo.toml b/polkadot/node/core/approval-voting/Cargo.toml index f3fe0fcb03..3b34ab3620 100644 --- a/polkadot/node/core/approval-voting/Cargo.toml +++ b/polkadot/node/core/approval-voting/Cargo.toml @@ -13,7 +13,6 @@ bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] } merlin = "2.0" schnorrkel = "0.9.1" kvdb = "0.9.0" -kvdb-rocksdb = "0.11.0" derive_more = "0.99.1" polkadot-node-subsystem = { path = "../../subsystem" } @@ -25,6 +24,7 @@ polkadot-node-jaeger = { path = "../../jaeger" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } +sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-consensus-slots = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false, features = ["full_crypto"] } diff --git a/polkadot/node/core/approval-voting/src/approval_checking.rs b/polkadot/node/core/approval-voting/src/approval_checking.rs index b5c2af36e7..79ef03c7bf 100644 --- a/polkadot/node/core/approval-voting/src/approval_checking.rs +++ b/polkadot/node/core/approval-voting/src/approval_checking.rs @@ -406,6 +406,7 @@ mod tests { tranches: Vec::new(), assignments: Default::default(), our_assignment: None, + our_approval_sig: None, backing_group: GroupIndex(0), approved: false, }.into(); @@ -452,6 +453,7 @@ mod tests { ], assignments: bitvec![BitOrderLsb0, u8; 1; 10], our_assignment: None, + our_approval_sig: None, backing_group: GroupIndex(0), approved: false, }.into(); @@ -515,6 +517,7 @@ mod tests { ], assignments: bitvec![BitOrderLsb0, u8; 1; 10], our_assignment: None, + our_approval_sig: None, backing_group: GroupIndex(0), approved: false, }.into(); @@ -582,6 +585,7 @@ mod tests { tranches: Vec::new(), assignments: bitvec![BitOrderLsb0, u8; 0; 5], our_assignment: None, + our_approval_sig: None, backing_group: GroupIndex(0), approved: false, }.into(); @@ -619,6 +623,7 @@ mod tests { tranches: Vec::new(), assignments: bitvec![BitOrderLsb0, u8; 0; 10], our_assignment: None, + our_approval_sig: None, backing_group: GroupIndex(0), approved: false, }.into(); @@ -657,6 +662,7 @@ mod tests { tranches: Vec::new(), assignments: bitvec![BitOrderLsb0, u8; 0; 10], our_assignment: None, + our_approval_sig: None, backing_group: GroupIndex(0), approved: false, }.into(); @@ -700,6 +706,7 @@ mod tests { tranches: Vec::new(), assignments: bitvec![BitOrderLsb0, u8; 0; n_validators], our_assignment: None, + our_approval_sig: None, backing_group: GroupIndex(0), approved: false, }.into(); @@ -765,6 +772,7 @@ mod tests { tranches: Vec::new(), assignments: bitvec![BitOrderLsb0, u8; 0; n_validators], our_assignment: None, + our_approval_sig: None, backing_group: GroupIndex(0), approved: false, }.into(); @@ -852,6 +860,7 @@ mod tests { tranches: Vec::new(), assignments: bitvec![BitOrderLsb0, u8; 0; n_validators], our_assignment: None, + our_approval_sig: None, backing_group: GroupIndex(0), approved: false, }.into(); diff --git a/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs b/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs index 8c76953fea..27960eb292 100644 --- a/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs +++ b/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs @@ -20,14 +20,13 @@ use kvdb::{DBTransaction, KeyValueDB}; use polkadot_node_primitives::approval::{DelayTranche, AssignmentCert}; use polkadot_primitives::v1::{ ValidatorIndex, GroupIndex, CandidateReceipt, SessionIndex, CoreIndex, - BlockNumber, Hash, CandidateHash, + BlockNumber, Hash, CandidateHash, ValidatorSignature, }; use sp_consensus_slots::Slot; use parity_scale_codec::{Encode, Decode}; use std::collections::{BTreeMap, HashMap}; use std::collections::hash_map::Entry; -use std::sync::Arc; use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0}; #[cfg(test)] @@ -40,11 +39,15 @@ pub struct Tick(u64); pub type Bitfield = BitVec; -const NUM_COLUMNS: u32 = 1; -const DATA_COL: u32 = 0; - const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks"; +/// The database config. +#[derive(Debug, Clone, Copy)] +pub struct Config { + /// The column family in the database where data is stored. + pub col_data: u32, +} + /// Details pertaining to our assignment on a block. #[derive(Encode, Decode, Debug, Clone, PartialEq)] pub struct OurAssignment { @@ -71,6 +74,7 @@ pub struct ApprovalEntry { pub tranches: Vec, pub backing_group: GroupIndex, pub our_assignment: Option, + pub our_approval_sig: Option, // `n_validators` bits. pub assignments: Bitfield, pub approved: bool, @@ -109,33 +113,6 @@ pub struct BlockEntry { pub children: Vec, } -/// Clear the given directory and create a RocksDB instance there. -pub fn clear_and_recreate(path: &std::path::Path, cache_size: usize) - -> std::io::Result> -{ - use kvdb_rocksdb::{DatabaseConfig, Database as RocksDB}; - - tracing::info!("Recreating approval-checking DB at {:?}", path); - - if let Err(e) = std::fs::remove_dir_all(path) { - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e); - } - } - std::fs::create_dir_all(path)?; - - let mut db_config = DatabaseConfig::with_columns(NUM_COLUMNS); - - db_config.memory_budget.insert(DATA_COL, cache_size); - - let path = path.to_str().ok_or_else(|| std::io::Error::new( - std::io::ErrorKind::Other, - format!("Non-UTF-8 database path {:?}", path), - ))?; - - Ok(Arc::new(RocksDB::open(&db_config, path)?)) -} - /// A range from earliest..last block number stored within the DB. #[derive(Encode, Decode, Debug, Clone, PartialEq)] pub struct StoredBlockRange(BlockNumber, BlockNumber); @@ -168,12 +145,13 @@ pub type Result = std::result::Result; /// pruning any competing branches at the same height. pub(crate) fn canonicalize( store: &dyn KeyValueDB, + config: &Config, canon_number: BlockNumber, canon_hash: Hash, ) -> Result<()> { - let range = match load_stored_blocks(store)? { + let range = match load_stored_blocks(store, config)? { None => return Ok(()), Some(range) => if range.0 >= canon_number { return Ok(()) @@ -197,17 +175,17 @@ pub(crate) fn canonicalize( transaction: &mut DBTransaction, visited_candidates: &mut HashMap, | -> Result> { - let block_entry = match load_block_entry(store, &block_hash)? { + let block_entry = match load_block_entry(store, config, &block_hash)? { None => return Ok(Vec::new()), Some(b) => b, }; - transaction.delete(DATA_COL, &block_entry_key(&block_hash)[..]); + transaction.delete(config.col_data, &block_entry_key(&block_hash)[..]); for &(_, ref candidate_hash) in &block_entry.candidates { let candidate = match visited_candidates.entry(*candidate_hash) { Entry::Occupied(e) => e.into_mut(), Entry::Vacant(e) => { - e.insert(match load_candidate_entry(store, candidate_hash)? { + e.insert(match load_candidate_entry(store, config, candidate_hash)? { None => continue, // Should not happen except for corrupt DB Some(c) => c, }) @@ -222,8 +200,8 @@ pub(crate) fn canonicalize( // First visit everything before the height. for i in range.0..canon_number { - let at_height = load_blocks_at_height(store, i)?; - transaction.delete(DATA_COL, &blocks_at_height_key(i)[..]); + let at_height = load_blocks_at_height(store, config, i)?; + transaction.delete(config.col_data, &blocks_at_height_key(i)[..]); for b in at_height { let _ = visit_and_remove_block_entry( @@ -236,8 +214,8 @@ pub(crate) fn canonicalize( // Then visit everything at the height. let pruned_branches = { - let at_height = load_blocks_at_height(store, canon_number)?; - transaction.delete(DATA_COL, &blocks_at_height_key(canon_number)); + let at_height = load_blocks_at_height(store, config, canon_number)?; + transaction.delete(config.col_data, &blocks_at_height_key(canon_number)); // Note that while there may be branches descending from blocks at earlier heights, // we have already covered them by removing everything at earlier heights. @@ -274,7 +252,7 @@ pub(crate) fn canonicalize( // visit the at-height key for this deleted block's height. let at_height = match visited_heights.entry(height) { Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => e.insert(load_blocks_at_height(store, height)?), + Entry::Vacant(e) => e.insert(load_blocks_at_height(store, config, height)?), }; if let Some(i) = at_height.iter().position(|x| x == &next_child) { @@ -286,10 +264,10 @@ pub(crate) fn canonicalize( // Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`. for (candidate_hash, candidate) in visited_candidates { if candidate.block_assignments.is_empty() { - transaction.delete(DATA_COL, &candidate_entry_key(&candidate_hash)[..]); + transaction.delete(config.col_data, &candidate_entry_key(&candidate_hash)[..]); } else { transaction.put_vec( - DATA_COL, + config.col_data, &candidate_entry_key(&candidate_hash)[..], candidate.encode(), ); @@ -299,9 +277,9 @@ pub(crate) fn canonicalize( // Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`. for (h, at) in visited_heights { if at.is_empty() { - transaction.delete(DATA_COL, &blocks_at_height_key(h)[..]); + transaction.delete(config.col_data, &blocks_at_height_key(h)[..]); } else { - transaction.put_vec(DATA_COL, &blocks_at_height_key(h), at.encode()); + transaction.put_vec(config.col_data, &blocks_at_height_key(h), at.encode()); } } @@ -312,16 +290,16 @@ pub(crate) fn canonicalize( std::cmp::max(range.1, canon_number + 2), ).encode(); - transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], new_range); + transaction.put_vec(config.col_data, &STORED_BLOCKS_KEY[..], new_range); // Update the values on-disk. store.write(transaction).map_err(Into::into) } -fn load_decode(store: &dyn KeyValueDB, key: &[u8]) +fn load_decode(store: &dyn KeyValueDB, col_data: u32, key: &[u8]) -> Result> { - match store.get(DATA_COL, key)? { + match store.get(col_data, key)? { None => Ok(None), Some(raw) => D::decode(&mut &raw[..]) .map(Some) @@ -350,6 +328,7 @@ pub(crate) struct NewCandidateInfo { /// no information about new candidates will be referred to by this function. pub(crate) fn add_block_entry( store: &dyn KeyValueDB, + config: &Config, entry: BlockEntry, n_validators: usize, candidate_info: impl Fn(&CandidateHash) -> Option, @@ -361,7 +340,7 @@ pub(crate) fn add_block_entry( // Update the stored block range. { - let new_range = match load_stored_blocks(store)? { + let new_range = match load_stored_blocks(store, config)? { None => Some(StoredBlockRange(number, number + 1)), Some(range) => if range.1 <= number { Some(StoredBlockRange(range.0, number + 1)) @@ -370,19 +349,19 @@ pub(crate) fn add_block_entry( } }; - new_range.map(|n| transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], n.encode())) + new_range.map(|n| transaction.put_vec(config.col_data, &STORED_BLOCKS_KEY[..], n.encode())) }; // Update the blocks at height meta key. { - let mut blocks_at_height = load_blocks_at_height(store, number)?; + let mut blocks_at_height = load_blocks_at_height(store, config, number)?; if blocks_at_height.contains(&entry.block_hash) { // seems we already have a block entry for this block. nothing to do here. return Ok(Vec::new()) } blocks_at_height.push(entry.block_hash); - transaction.put_vec(DATA_COL, &blocks_at_height_key(number)[..], blocks_at_height.encode()) + transaction.put_vec(config.col_data, &blocks_at_height_key(number)[..], blocks_at_height.encode()) }; let mut candidate_entries = Vec::with_capacity(entry.candidates.len()); @@ -399,7 +378,7 @@ pub(crate) fn add_block_entry( Some(info) => info, }; - let mut candidate_entry = load_candidate_entry(store, &candidate_hash)? + let mut candidate_entry = load_candidate_entry(store, config, &candidate_hash)? .unwrap_or_else(move || CandidateEntry { candidate, session, @@ -413,13 +392,14 @@ pub(crate) fn add_block_entry( tranches: Vec::new(), backing_group, our_assignment, + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], approved: false, } ); transaction.put_vec( - DATA_COL, + config.col_data, &candidate_entry_key(&candidate_hash)[..], candidate_entry.encode(), ); @@ -429,13 +409,13 @@ pub(crate) fn add_block_entry( }; // Update the child index for the parent. - load_block_entry(store, &parent_hash)?.map(|mut e| { + load_block_entry(store, config, &parent_hash)?.map(|mut e| { e.children.push(entry.block_hash); - transaction.put_vec(DATA_COL, &block_entry_key(&parent_hash)[..], e.encode()) + transaction.put_vec(config.col_data, &block_entry_key(&parent_hash)[..], e.encode()) }); // Put the new block entry in. - transaction.put_vec(DATA_COL, &block_entry_key(&entry.block_hash)[..], entry.encode()); + transaction.put_vec(config.col_data, &block_entry_key(&entry.block_hash)[..], entry.encode()); store.write(transaction)?; Ok(candidate_entries) @@ -445,6 +425,7 @@ pub(crate) fn add_block_entry( /// chain. pub fn force_approve( store: &dyn KeyValueDB, + db_config: Config, chain_head: Hash, up_to: BlockNumber, ) -> Result<()> { @@ -456,11 +437,11 @@ pub fn force_approve( let mut cur_hash = chain_head; let mut state = State::WalkTo; - let mut tx = Transaction::default(); + let mut tx = Transaction::new(db_config); // iterate back to the `up_to` block, and then iterate backwards until all blocks // are updated. - while let Some(mut entry) = load_block_entry(store, &cur_hash)? { + while let Some(mut entry) = load_block_entry(store, &db_config, &cur_hash)? { if entry.block_number <= up_to { state = State::Approving; @@ -480,15 +461,35 @@ pub fn force_approve( tx.write(store) } +/// Return all blocks which have entries in the DB, ascending, by height. +pub(crate) fn load_all_blocks(store: &dyn KeyValueDB, config: &Config) -> Result> { + let stored_blocks = load_stored_blocks(store, config)?; + + let mut hashes = Vec::new(); + for height in stored_blocks.into_iter().flat_map(|s| s.0..s.1) { + hashes.extend(load_blocks_at_height(store, config, height)?); + } + + Ok(hashes) +} + // An atomic transaction of multiple candidate or block entries. -#[derive(Default)] #[must_use = "Transactions do nothing unless written to a DB"] pub struct Transaction { block_entries: HashMap, candidate_entries: HashMap, + config: Config, } impl Transaction { + pub(crate) fn new(config: Config) -> Self { + Transaction { + block_entries: HashMap::default(), + candidate_entries: HashMap::default(), + config, + } + } + /// Put a block entry in the transaction, overwriting any other with the /// same hash. pub(crate) fn put_block_entry(&mut self, entry: BlockEntry) { @@ -519,14 +520,14 @@ impl Transaction { let k = block_entry_key(&hash); let v = entry.encode(); - db_transaction.put_vec(DATA_COL, &k, v); + db_transaction.put_vec(self.config.col_data, &k, v); } for (hash, entry) in self.candidate_entries { let k = candidate_entry_key(&hash); let v = entry.encode(); - db_transaction.put_vec(DATA_COL, &k, v); + db_transaction.put_vec(self.config.col_data, &k, v); } db.write(db_transaction).map_err(Into::into) @@ -534,31 +535,39 @@ impl Transaction { } /// Load the stored-blocks key from the state. -fn load_stored_blocks(store: &dyn KeyValueDB) +fn load_stored_blocks(store: &dyn KeyValueDB, config: &Config) -> Result> { - load_decode(store, STORED_BLOCKS_KEY) + load_decode(store, config.col_data, STORED_BLOCKS_KEY) } /// Load a blocks-at-height entry for a given block number. -pub(crate) fn load_blocks_at_height(store: &dyn KeyValueDB, block_number: BlockNumber) +pub(crate) fn load_blocks_at_height( + store: &dyn KeyValueDB, + config: &Config, + block_number: BlockNumber, +) -> Result> { - load_decode(store, &blocks_at_height_key(block_number)) + load_decode(store, config.col_data, &blocks_at_height_key(block_number)) .map(|x| x.unwrap_or_default()) } /// Load a block entry from the aux store. -pub(crate) fn load_block_entry(store: &dyn KeyValueDB, block_hash: &Hash) +pub(crate) fn load_block_entry(store: &dyn KeyValueDB, config: &Config, block_hash: &Hash) -> Result> { - load_decode(store, &block_entry_key(block_hash)) + load_decode(store, config.col_data, &block_entry_key(block_hash)) } /// Load a candidate entry from the aux store. -pub(crate) fn load_candidate_entry(store: &dyn KeyValueDB, candidate_hash: &CandidateHash) +pub(crate) fn load_candidate_entry( + store: &dyn KeyValueDB, + config: &Config, + candidate_hash: &CandidateHash, +) -> Result> { - load_decode(store, &candidate_entry_key(candidate_hash)) + load_decode(store, config.col_data, &candidate_entry_key(candidate_hash)) } /// The key a given block entry is stored under. diff --git a/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs b/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs index 199c672a27..71c4d3c47e 100644 --- a/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs +++ b/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs @@ -19,6 +19,13 @@ use super::*; use polkadot_primitives::v1::Id as ParaId; +const DATA_COL: u32 = 0; +const NUM_COLUMNS: u32 = 1; + +const TEST_CONFIG: Config = Config { + col_data: DATA_COL, +}; + pub(crate) fn write_stored_blocks(tx: &mut DBTransaction, range: StoredBlockRange) { tx.put_vec( DATA_COL, @@ -85,7 +92,7 @@ fn make_candidate(para_id: ParaId, relay_parent: Hash) -> CandidateReceipt { #[test] fn read_write() { - let store = kvdb_memorydb::create(1); + let store = kvdb_memorydb::create(NUM_COLUMNS); let hash_a = Hash::repeat_byte(1); let hash_b = Hash::repeat_byte(2); @@ -109,6 +116,7 @@ fn read_write() { tranches: Vec::new(), backing_group: GroupIndex(1), our_assignment: None, + our_approval_sig: None, assignments: Default::default(), approved: false, }) @@ -125,10 +133,13 @@ fn read_write() { store.write(tx).unwrap(); - assert_eq!(load_stored_blocks(&store).unwrap(), Some(range)); - assert_eq!(load_blocks_at_height(&store, 1).unwrap(), at_height); - assert_eq!(load_block_entry(&store, &hash_a).unwrap(), Some(block_entry)); - assert_eq!(load_candidate_entry(&store, &candidate_hash).unwrap(), Some(candidate_entry)); + assert_eq!(load_stored_blocks(&store, &TEST_CONFIG).unwrap(), Some(range)); + assert_eq!(load_blocks_at_height(&store, &TEST_CONFIG, 1).unwrap(), at_height); + assert_eq!(load_block_entry(&store, &TEST_CONFIG, &hash_a).unwrap(), Some(block_entry)); + assert_eq!( + load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash).unwrap(), + Some(candidate_entry), + ); let delete_keys = vec![ STORED_BLOCKS_KEY.to_vec(), @@ -144,15 +155,15 @@ fn read_write() { store.write(tx).unwrap(); - assert!(load_stored_blocks(&store).unwrap().is_none()); - assert!(load_blocks_at_height(&store, 1).unwrap().is_empty()); - assert!(load_block_entry(&store, &hash_a).unwrap().is_none()); - assert!(load_candidate_entry(&store, &candidate_hash).unwrap().is_none()); + assert!(load_stored_blocks(&store, &TEST_CONFIG).unwrap().is_none()); + assert!(load_blocks_at_height(&store, &TEST_CONFIG, 1).unwrap().is_empty()); + assert!(load_block_entry(&store, &TEST_CONFIG, &hash_a).unwrap().is_none()); + assert!(load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash).unwrap().is_none()); } #[test] fn add_block_entry_works() { - let store = kvdb_memorydb::create(1); + let store = kvdb_memorydb::create(NUM_COLUMNS); let parent_hash = Hash::repeat_byte(1); let block_hash_a = Hash::repeat_byte(2); @@ -188,6 +199,7 @@ fn add_block_entry_works() { add_block_entry( &store, + &TEST_CONFIG, block_entry_a.clone(), n_validators, |h| new_candidate_info.get(h).map(|x| x.clone()), @@ -201,24 +213,27 @@ fn add_block_entry_works() { add_block_entry( &store, + &TEST_CONFIG, block_entry_b.clone(), n_validators, |h| new_candidate_info.get(h).map(|x| x.clone()), ).unwrap(); - assert_eq!(load_block_entry(&store, &block_hash_a).unwrap(), Some(block_entry_a)); - assert_eq!(load_block_entry(&store, &block_hash_b).unwrap(), Some(block_entry_b)); + assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a)); + assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b)); - let candidate_entry_a = load_candidate_entry(&store, &candidate_hash_a).unwrap().unwrap(); + let candidate_entry_a = load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash_a) + .unwrap().unwrap(); assert_eq!(candidate_entry_a.block_assignments.keys().collect::>(), vec![&block_hash_a, &block_hash_b]); - let candidate_entry_b = load_candidate_entry(&store, &candidate_hash_b).unwrap().unwrap(); + let candidate_entry_b = load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash_b) + .unwrap().unwrap(); assert_eq!(candidate_entry_b.block_assignments.keys().collect::>(), vec![&block_hash_b]); } #[test] fn add_block_entry_adds_child() { - let store = kvdb_memorydb::create(1); + let store = kvdb_memorydb::create(NUM_COLUMNS); let parent_hash = Hash::repeat_byte(1); let block_hash_a = Hash::repeat_byte(2); @@ -242,6 +257,7 @@ fn add_block_entry_adds_child() { add_block_entry( &store, + &TEST_CONFIG, block_entry_a.clone(), n_validators, |_| None, @@ -249,6 +265,7 @@ fn add_block_entry_adds_child() { add_block_entry( &store, + &TEST_CONFIG, block_entry_b.clone(), n_validators, |_| None, @@ -256,13 +273,13 @@ fn add_block_entry_adds_child() { block_entry_a.children.push(block_hash_b); - assert_eq!(load_block_entry(&store, &block_hash_a).unwrap(), Some(block_entry_a)); - assert_eq!(load_block_entry(&store, &block_hash_b).unwrap(), Some(block_entry_b)); + assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_a).unwrap(), Some(block_entry_a)); + assert_eq!(load_block_entry(&store, &TEST_CONFIG, &block_hash_b).unwrap(), Some(block_entry_b)); } #[test] fn canonicalize_works() { - let store = kvdb_memorydb::create(1); + let store = kvdb_memorydb::create(NUM_COLUMNS); // -> B1 -> C1 -> D1 // A -> B2 -> C2 -> D2 @@ -377,6 +394,7 @@ fn canonicalize_works() { for block_entry in blocks { add_block_entry( &store, + &TEST_CONFIG, block_entry, n_validators, |h| candidate_info.get(h).map(|x| x.clone()), @@ -387,11 +405,11 @@ fn canonicalize_works() { for (c_hash, in_blocks) in expected { let (entry, in_blocks) = match in_blocks { None => { - assert!(load_candidate_entry(&store, &c_hash).unwrap().is_none()); + assert!(load_candidate_entry(&store, &TEST_CONFIG, &c_hash).unwrap().is_none()); continue } Some(i) => ( - load_candidate_entry(&store, &c_hash).unwrap().unwrap(), + load_candidate_entry(&store, &TEST_CONFIG, &c_hash).unwrap().unwrap(), i, ), }; @@ -408,11 +426,11 @@ fn canonicalize_works() { for (hash, with_candidates) in expected { let (entry, with_candidates) = match with_candidates { None => { - assert!(load_block_entry(&store, &hash).unwrap().is_none()); + assert!(load_block_entry(&store, &TEST_CONFIG, &hash).unwrap().is_none()); continue } Some(i) => ( - load_block_entry(&store, &hash).unwrap().unwrap(), + load_block_entry(&store, &TEST_CONFIG, &hash).unwrap().unwrap(), i, ), }; @@ -443,9 +461,9 @@ fn canonicalize_works() { (block_hash_d2, Some(vec![cand_hash_5])), ]); - canonicalize(&store, 3, block_hash_c1).unwrap(); + canonicalize(&store, &TEST_CONFIG, 3, block_hash_c1).unwrap(); - assert_eq!(load_stored_blocks(&store).unwrap().unwrap(), StoredBlockRange(4, 5)); + assert_eq!(load_stored_blocks(&store, &TEST_CONFIG).unwrap().unwrap(), StoredBlockRange(4, 5)); check_candidates_in_store(vec![ (cand_hash_1, None), @@ -468,7 +486,7 @@ fn canonicalize_works() { #[test] fn force_approve_works() { - let store = kvdb_memorydb::create(1); + let store = kvdb_memorydb::create(NUM_COLUMNS); let n_validators = 10; let mut tx = DBTransaction::new(); @@ -509,16 +527,101 @@ fn force_approve_works() { for block_entry in blocks { add_block_entry( &store, + &TEST_CONFIG, block_entry, n_validators, |h| candidate_info.get(h).map(|x| x.clone()), ).unwrap(); } - force_approve(&store, block_hash_d, 2).unwrap(); + force_approve(&store, TEST_CONFIG, block_hash_d, 2).unwrap(); - assert!(load_block_entry(&store, &block_hash_a).unwrap().unwrap().approved_bitfield.all()); - assert!(load_block_entry(&store, &block_hash_b).unwrap().unwrap().approved_bitfield.all()); - assert!(load_block_entry(&store, &block_hash_c).unwrap().unwrap().approved_bitfield.not_any()); - assert!(load_block_entry(&store, &block_hash_d).unwrap().unwrap().approved_bitfield.not_any()); + assert!(load_block_entry( + &store, + &TEST_CONFIG, + &block_hash_a, + ).unwrap().unwrap().approved_bitfield.all()); + assert!(load_block_entry( + &store, + &TEST_CONFIG, + &block_hash_b, + ).unwrap().unwrap().approved_bitfield.all()); + assert!(load_block_entry( + &store, + &TEST_CONFIG, + &block_hash_c, + ).unwrap().unwrap().approved_bitfield.not_any()); + assert!(load_block_entry( + &store, + &TEST_CONFIG, + &block_hash_d, + ).unwrap().unwrap().approved_bitfield.not_any()); +} + +#[test] +fn load_all_blocks_works() { + let store = kvdb_memorydb::create(NUM_COLUMNS); + + let parent_hash = Hash::repeat_byte(1); + let block_hash_a = Hash::repeat_byte(2); + let block_hash_b = Hash::repeat_byte(69); + let block_hash_c = Hash::repeat_byte(42); + + let block_number = 10; + + let block_entry_a = make_block_entry( + block_hash_a, + parent_hash, + block_number, + vec![], + ); + + let block_entry_b = make_block_entry( + block_hash_b, + parent_hash, + block_number, + vec![], + ); + + let block_entry_c = make_block_entry( + block_hash_c, + block_hash_a, + block_number + 1, + vec![], + ); + + let n_validators = 10; + + add_block_entry( + &store, + &TEST_CONFIG, + block_entry_a.clone(), + n_validators, + |_| None + ).unwrap(); + + // add C before B to test sorting. + add_block_entry( + &store, + &TEST_CONFIG, + block_entry_c.clone(), + n_validators, + |_| None + ).unwrap(); + + add_block_entry( + &store, + &TEST_CONFIG, + block_entry_b.clone(), + n_validators, + |_| None + ).unwrap(); + + assert_eq!( + load_all_blocks( + &store, + &TEST_CONFIG + ).unwrap(), + vec![block_hash_a, block_hash_b, block_hash_c], + ) } diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index e806de1a89..68dd5e5904 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -53,7 +53,7 @@ use bitvec::order::Lsb0 as BitOrderLsb0; use std::collections::HashMap; use std::convert::TryFrom; -use crate::approval_db; +use crate::approval_db::{self, v1::Config as DatabaseConfig}; use crate::persisted_entries::CandidateEntry; use crate::criteria::{AssignmentCriteria, OurAssignment}; use crate::time::{slot_number_to_tick, Tick}; @@ -561,6 +561,7 @@ pub(crate) async fn handle_new_head( ctx: &mut impl SubsystemContext, state: &mut State, db_writer: &dyn KeyValueDB, + db_config: DatabaseConfig, head: Hash, finalized_number: &Option, ) -> SubsystemResult> { @@ -737,7 +738,7 @@ pub(crate) async fn handle_new_head( "Enacting force-approve", ); - approval_db::v1::force_approve(db_writer, block_hash, up_to) + approval_db::v1::force_approve(db_writer, db_config, block_hash, up_to) .map_err(|e| SubsystemError::with_origin("approval-voting", e))?; } @@ -750,6 +751,7 @@ pub(crate) async fn handle_new_head( let candidate_entries = approval_db::v1::add_block_entry( db_writer, + &db_config, block_entry, n_validators, |candidate_hash| { @@ -815,6 +817,13 @@ mod tests { use crate::{criteria, BlockEntry}; + const DATA_COL: u32 = 0; + const NUM_COLUMNS: u32 = 1; + + const TEST_CONFIG: DatabaseConfig = DatabaseConfig { + col_data: DATA_COL, + }; + #[derive(Default)] struct TestDB { block_entries: HashMap, @@ -835,6 +844,14 @@ mod tests { ) -> SubsystemResult> { Ok(self.candidate_entries.get(candidate_hash).map(|c| c.clone())) } + + fn load_all_blocks(&self) -> SubsystemResult> { + let mut hashes: Vec<_> = self.block_entries.keys().cloned().collect(); + + hashes.sort_by_key(|k| self.block_entries.get(k).unwrap().block_number()); + + Ok(hashes) + } } #[derive(Default)] @@ -1876,7 +1893,7 @@ mod tests { }.into(), ); - let db_writer = kvdb_memorydb::create(1); + let db_writer = kvdb_memorydb::create(NUM_COLUMNS); let test_fut = { Box::pin(async move { @@ -1884,6 +1901,7 @@ mod tests { &mut ctx, &mut state, &db_writer, + TEST_CONFIG, hash, &Some(1), ).await.unwrap(); @@ -1895,7 +1913,11 @@ mod tests { assert_eq!(candidates[1].1.approvals().len(), 6); // the first candidate should be insta-approved // the second should not - let entry: BlockEntry = crate::approval_db::v1::load_block_entry(&db_writer, &hash) + let entry: BlockEntry = crate::approval_db::v1::load_block_entry( + &db_writer, + &TEST_CONFIG, + &hash, + ) .unwrap() .unwrap() .into(); diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index f7a1e4b078..eb287a75b4 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -43,10 +43,12 @@ use polkadot_primitives::v1::{ use polkadot_node_primitives::{ValidationResult, PoV}; use polkadot_node_primitives::approval::{ IndirectAssignmentCert, IndirectSignedApprovalVote, ApprovalVote, DelayTranche, + BlockApprovalMeta, }; use polkadot_node_jaeger as jaeger; use parity_scale_codec::Encode; use sc_keystore::LocalKeystore; +use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; use sp_runtime::traits::AppVerify; use sp_application_crypto::Pair; @@ -56,7 +58,7 @@ use futures::prelude::*; use futures::future::RemoteHandle; use futures::channel::{mpsc, oneshot}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::collections::btree_map::Entry; use std::sync::Arc; @@ -72,6 +74,8 @@ mod import; mod time; mod persisted_entries; +use crate::approval_db::v1::Config as DatabaseConfig; + #[cfg(test)] mod tests; @@ -79,25 +83,39 @@ const APPROVAL_SESSIONS: SessionIndex = 6; const LOG_TARGET: &str = "parachain::approval-voting"; /// Configuration for the approval voting subsystem +#[derive(Debug, Clone)] pub struct Config { - /// The path where the approval-voting DB should be kept. This directory is completely removed when starting - /// the service. - pub path: std::path::PathBuf, - /// The cache size, in bytes, to spend on approval checking metadata. - pub cache_size: Option, + /// The column family in the DB where approval-voting data is stored. + pub col_data: u32, /// The slot duration of the consensus algorithm, in milliseconds. Should be evenly /// divisible by 500. pub slot_duration_millis: u64, } +// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first +// starts, and then once it's reached the head of the chain it should move into the `Active` mode. +// +// In `Active` mode, the node is an active participant in the approvals protocol. When syncing, +// the node follows the new incoming blocks and finalized number, but does not yet participate. +// +// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution` +// subsystem of all unfinalized blocks and the candidates included within them, as well as all +// votes that the local node itself has cast on candidates within those blocks. +enum Mode { + Active, + Syncing(Box), +} + /// The approval voting subsystem. pub struct ApprovalVotingSubsystem { /// LocalKeystore is needed for assignment keys, but not necessarily approval keys. /// /// We do a lot of VRF signing and need the keys to have low latency. keystore: Arc, + db_config: DatabaseConfig, slot_duration_millis: u64, db: Arc, + mode: Mode, metrics: Metrics, } @@ -239,27 +257,24 @@ impl metrics::Metrics for Metrics { } impl ApprovalVotingSubsystem { - /// Create a new approval voting subsystem with the given keystore and config, - /// which creates a DB at the given path. This function will delete the directory - /// at the given path if it already exists. + /// Create a new approval voting subsystem with the given keystore, config, and database. pub fn with_config( config: Config, + db: Arc, keystore: Arc, + sync_oracle: Box, metrics: Metrics, - ) -> std::io::Result { - const DEFAULT_CACHE_SIZE: usize = 100 * 1024 * 1024; // 100MiB default should be fine unless finality stalls. - - let db = approval_db::v1::clear_and_recreate( - &config.path, - config.cache_size.unwrap_or(DEFAULT_CACHE_SIZE), - )?; - - Ok(ApprovalVotingSubsystem { + ) -> Self { + ApprovalVotingSubsystem { keystore, slot_duration_millis: config.slot_duration_millis, db, + db_config: DatabaseConfig { + col_data: config.col_data, + }, + mode: Mode::Syncing(sync_oracle), metrics, - }) + } } } @@ -305,6 +320,7 @@ struct Wakeups { // Tick -> [(Relay Block, Candidate Hash)] wakeups: BTreeMap>, reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>, + block_numbers: BTreeMap>, } impl Wakeups { @@ -313,9 +329,19 @@ impl Wakeups { self.wakeups.keys().next().map(|t| *t) } + fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) { + self.block_numbers.entry(block_number).or_default().insert(block_hash); + } + // Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up // for these values. replaces any later wakeup. - fn schedule(&mut self, block_hash: Hash, candidate_hash: CandidateHash, tick: Tick) { + fn schedule( + &mut self, + block_hash: Hash, + block_number: BlockNumber, + candidate_hash: CandidateHash, + tick: Tick, + ) { if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) { if prev <= &tick { return } @@ -331,12 +357,42 @@ impl Wakeups { let _ = entry.remove_entry(); } } + } else { + self.note_block(block_hash, block_number); } self.reverse_wakeups.insert((block_hash, candidate_hash), tick); self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash)); } + fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) { + let after = self.block_numbers.split_off(&(finalized_number + 1)); + let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after) + .into_iter() + .flat_map(|(_number, hashes)| hashes) + .collect(); + + let mut pruned_wakeups = BTreeMap::new(); + self.reverse_wakeups.retain(|&(ref h, ref c_h), tick| { + let live = !pruned_blocks.contains(h); + if !live { + pruned_wakeups.entry(*tick) + .or_insert_with(HashSet::new) + .insert((*h, *c_h)); + } + live + }); + + for (tick, pruned) in pruned_wakeups { + if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) { + entry.get_mut().retain(|wakeup| !pruned.contains(wakeup)); + if entry.get().is_empty() { + let _ = entry.remove(); + } + } + } + } + // Get the wakeup for a particular block/candidate combo, if any. fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option { self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t) @@ -379,30 +435,40 @@ trait DBReader { &self, candidate_hash: &CandidateHash, ) -> SubsystemResult>; + + fn load_all_blocks(&self) -> SubsystemResult>; } // This is a submodule to enforce opacity of the inner DB type. mod approval_db_v1_reader { use super::{ DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry, - Arc, SubsystemResult, SubsystemError, approval_db, + SubsystemResult, SubsystemError, DatabaseConfig, approval_db, }; /// A DB reader that uses the approval-db V1 under the hood. - pub(super) struct ApprovalDBV1Reader(Arc); + pub(super) struct ApprovalDBV1Reader { + inner: T, + config: DatabaseConfig, + } - impl From> for ApprovalDBV1Reader { - fn from(a: Arc) -> Self { - ApprovalDBV1Reader(a) + impl ApprovalDBV1Reader { + pub(super) fn new(inner: T, config: DatabaseConfig) -> Self { + ApprovalDBV1Reader { + inner, + config, + } } } - impl DBReader for ApprovalDBV1Reader { + impl<'a, T: 'a> DBReader for ApprovalDBV1Reader + where T: std::ops::Deref + { fn load_block_entry( &self, block_hash: &Hash, ) -> SubsystemResult> { - approval_db::v1::load_block_entry(&*self.0, block_hash) + approval_db::v1::load_block_entry(&*self.inner, &self.config, block_hash) .map(|e| e.map(Into::into)) .map_err(|e| SubsystemError::with_origin("approval-voting", e)) } @@ -411,14 +477,25 @@ mod approval_db_v1_reader { &self, candidate_hash: &CandidateHash, ) -> SubsystemResult> { - approval_db::v1::load_candidate_entry(&*self.0, candidate_hash) + approval_db::v1::load_candidate_entry(&*self.inner, &self.config, candidate_hash) .map(|e| e.map(Into::into)) .map_err(|e| SubsystemError::with_origin("approval-voting", e)) } + + fn load_all_blocks(&self) -> SubsystemResult> { + approval_db::v1::load_all_blocks(&*self.inner, &self.config) + .map_err(|e| SubsystemError::with_origin("approval-voting", e)) + } } } use approval_db_v1_reader::ApprovalDBV1Reader; +struct ApprovalStatus { + required_tranches: RequiredTranches, + tranche_now: DelayTranche, + block_tick: Tick, +} + struct State { session_window: import::RollingSessionWindow, keystore: Arc, @@ -432,12 +509,59 @@ impl State { fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> { self.session_window.session_info(i) } + + // Compute the required tranches for approval for this block and candidate combo. + // Fails if there is no approval entry for the block under the candidate or no candidate entry + // under the block, or if the session is out of bounds. + fn approval_status<'a, 'b>( + &'a self, + block_entry: &'a BlockEntry, + candidate_entry: &'b CandidateEntry, + ) -> Option<(&'b ApprovalEntry, ApprovalStatus)> { + let session_info = match self.session_info(block_entry.session()) { + Some(s) => s, + None => { + tracing::warn!(target: LOG_TARGET, "Unknown session info for {}", block_entry.session()); + return None; + } + }; + let block_hash = block_entry.block_hash(); + + let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot()); + let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot()); + let no_show_duration = slot_number_to_tick( + self.slot_duration_millis, + Slot::from(u64::from(session_info.no_show_slots)), + ); + + if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) { + let required_tranches = approval_checking::tranches_to_approve( + approval_entry, + candidate_entry.approvals(), + tranche_now, + block_tick, + no_show_duration, + session_info.needed_approvals as _ + ); + + let status = ApprovalStatus { + required_tranches, + block_tick, + tranche_now, + }; + + Some((approval_entry, status)) + } else { + None + } + } } #[derive(Debug)] enum Action { ScheduleWakeup { block_hash: Hash, + block_number: BlockNumber, candidate_hash: CandidateHash, tick: Tick, }, @@ -451,6 +575,7 @@ enum Action { candidate: CandidateReceipt, backing_group: GroupIndex, }, + BecomeActive, Conclude, } @@ -458,7 +583,7 @@ type BackgroundTaskMap = BTreeMap>>; async fn run( mut ctx: C, - subsystem: ApprovalVotingSubsystem, + mut subsystem: ApprovalVotingSubsystem, clock: Box, assignment_criteria: Box, ) -> SubsystemResult<()> @@ -469,7 +594,7 @@ async fn run( session_window: Default::default(), keystore: subsystem.keystore, slot_duration_millis: subsystem.slot_duration_millis, - db: ApprovalDBV1Reader::from(subsystem.db.clone()), + db: ApprovalDBV1Reader::new(subsystem.db.clone(), subsystem.db_config.clone()), clock, assignment_criteria, }; @@ -496,20 +621,28 @@ async fn run( )? } next_msg = ctx.recv().fuse() => { - let actions = handle_from_overseer( + let mut actions = handle_from_overseer( &mut ctx, &mut state, &subsystem.metrics, db_writer, + subsystem.db_config, next_msg?, &mut last_finalized_height, - &wakeups, + &mut wakeups, ).await?; if let Some(finalized_height) = last_finalized_height { cleanup_background_tasks(finalized_height, &mut background_tasks); } + if let Mode::Syncing(ref mut oracle) = subsystem.mode { + if !oracle.is_major_syncing() { + // note that we're active before processing other actions. + actions.insert(0, Action::BecomeActive) + } + } + actions } background_request = background_rx.next().fuse() => { @@ -531,8 +664,10 @@ async fn run( &subsystem.metrics, &mut wakeups, db_writer, + subsystem.db_config, &background_tx, &mut background_tasks, + &mut subsystem.mode, actions, ).await? { break; @@ -548,20 +683,25 @@ async fn handle_actions( metrics: &Metrics, wakeups: &mut Wakeups, db: &dyn KeyValueDB, + db_config: DatabaseConfig, background_tx: &mpsc::Sender, background_tasks: &mut BackgroundTaskMap, + mode: &mut Mode, actions: impl IntoIterator, ) -> SubsystemResult { - let mut transaction = approval_db::v1::Transaction::default(); + let mut transaction = approval_db::v1::Transaction::new(db_config); let mut conclude = false; for action in actions { match action { Action::ScheduleWakeup { block_hash, + block_number, candidate_hash, tick, - } => wakeups.schedule(block_hash, candidate_hash, tick), + } => { + wakeups.schedule(block_hash, block_number, candidate_hash, tick) + } Action::WriteBlockEntry(block_entry) => { transaction.put_block_entry(block_entry.into()); } @@ -576,6 +716,9 @@ async fn handle_actions( candidate, backing_group, } => { + // Don't launch approval work if the node is syncing. + if let Mode::Syncing(_) = *mode { continue } + metrics.on_assignment_produced(); let block_hash = indirect_cert.block_hash; let validator_index = indirect_cert.validator; @@ -600,6 +743,15 @@ async fn handle_actions( background_tasks.entry(relay_block_number).or_default().push(handle); } } + Action::BecomeActive => { + *mode = Mode::Active; + + let messages = distribution_messages_for_activation( + ApprovalDBV1Reader::new(db, db_config) + )?; + + ctx.send_messages(messages.into_iter().map(Into::into)).await; + } Action::Conclude => { conclude = true; } } } @@ -627,15 +779,113 @@ fn cleanup_background_tasks( // the task on drop. } +fn distribution_messages_for_activation<'a>( + db: impl DBReader + 'a, +) -> SubsystemResult> { + let all_blocks = db.load_all_blocks()?; + + let mut approval_meta = Vec::with_capacity(all_blocks.len()); + let mut messages = Vec::new(); + + messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value. + + for block_hash in all_blocks { + let block_entry = match db.load_block_entry(&block_hash)? { + Some(b) => b, + None => { + tracing::warn!( + target: LOG_TARGET, + ?block_hash, + "Missing block entry", + ); + + continue + } + }; + approval_meta.push(BlockApprovalMeta { + hash: block_hash, + number: block_entry.block_number(), + parent_hash: block_entry.parent_hash(), + candidates: block_entry.candidates().iter().map(|(_, c_hash)| *c_hash).collect(), + slot: block_entry.slot(), + }); + + for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() { + let candidate_entry = match db.load_candidate_entry(&candidate_hash)? { + Some(c) => c, + None => { + tracing::warn!( + target: LOG_TARGET, + ?block_hash, + ?candidate_hash, + "Missing candidate entry", + ); + + continue + } + }; + + match candidate_entry.approval_entry(&block_hash) { + Some(approval_entry) => { + match approval_entry.local_statements() { + (None, None) | (None, Some(_)) => {}, // second is impossible case. + (Some(assignment), None) => { + messages.push(ApprovalDistributionMessage::DistributeAssignment( + IndirectAssignmentCert { + block_hash, + validator: assignment.validator_index(), + cert: assignment.cert().clone(), + }, + i as _, + )); + } + (Some(assignment), Some(approval_sig)) => { + messages.push(ApprovalDistributionMessage::DistributeAssignment( + IndirectAssignmentCert { + block_hash, + validator: assignment.validator_index(), + cert: assignment.cert().clone(), + }, + i as _, + )); + + messages.push(ApprovalDistributionMessage::DistributeApproval( + IndirectSignedApprovalVote { + block_hash, + candidate_index: i as _, + validator: assignment.validator_index(), + signature: approval_sig, + } + )) + } + } + } + None => { + tracing::warn!( + target: LOG_TARGET, + ?block_hash, + ?candidate_hash, + "Missing approval entry", + ); + } + } + } + } + + messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta); + Ok(messages) +} + // Handle an incoming signal from the overseer. Returns true if execution should conclude. async fn handle_from_overseer( ctx: &mut impl SubsystemContext, state: &mut State, metrics: &Metrics, db_writer: &dyn KeyValueDB, + db_config: DatabaseConfig, x: FromOverseer, last_finalized_height: &mut Option, - wakeups: &Wakeups, + wakeups: &mut Wakeups, ) -> SubsystemResult> { let actions = match x { @@ -648,6 +898,7 @@ async fn handle_from_overseer( ctx, state, db_writer, + db_config, head, &*last_finalized_height, ).await { @@ -685,6 +936,7 @@ async fn handle_from_overseer( // and approvals which trigger rescheduling. actions.push(Action::ScheduleWakeup { block_hash: block_batch.block_hash, + block_number: block_batch.block_number, candidate_hash: c_hash, tick, }); @@ -700,9 +952,11 @@ async fn handle_from_overseer( FromOverseer::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => { *last_finalized_height = Some(block_number); - approval_db::v1::canonicalize(db_writer, block_number, block_hash) + approval_db::v1::canonicalize(db_writer, &db_config, block_number, block_hash) .map_err(|e| SubsystemError::with_origin("db", e))?; + wakeups.prune_finalized_wakeups(block_number); + Vec::new() } FromOverseer::Signal(OverseerSignal::Conclude) => { @@ -711,7 +965,7 @@ async fn handle_from_overseer( FromOverseer::Communication { msg } => match msg { ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => { let (check_outcome, actions) - = check_and_import_assignment(state, metrics, a, claimed_core)?; + = check_and_import_assignment(state, a, claimed_core)?; let _ = res.send(check_outcome); actions } @@ -996,6 +1250,7 @@ fn min_prefer_some( fn schedule_wakeup_action( approval_entry: &ApprovalEntry, block_hash: Hash, + block_number: BlockNumber, candidate_hash: CandidateHash, block_tick: Tick, required_tranches: RequiredTranches, @@ -1005,6 +1260,7 @@ fn schedule_wakeup_action( RequiredTranches::All => None, RequiredTranches::Exact { next_no_show, .. } => next_no_show.map(|tick| Action::ScheduleWakeup { block_hash, + block_number, candidate_hash, tick, }), @@ -1032,7 +1288,12 @@ fn schedule_wakeup_action( }; min_prefer_some(next_non_empty_tranche, next_no_show) - .map(|tick| Action::ScheduleWakeup { block_hash, candidate_hash, tick }) + .map(|tick| Action::ScheduleWakeup { + block_hash, + block_number, + candidate_hash, + tick, + }) } }; @@ -1060,7 +1321,6 @@ fn schedule_wakeup_action( fn check_and_import_assignment( state: &State, - metrics: &Metrics, assignment: IndirectAssignmentCert, candidate_index: CandidateIndex, ) -> SubsystemResult<(AssignmentCheckResult, Vec)> { @@ -1155,24 +1415,22 @@ fn check_and_import_assignment( } }; - // We check for approvals here because we may be late in seeing a block containing a - // candidate for which we have already seen approvals by the same validator. - // - // For these candidates, we will receive the assignments potentially after a corresponding - // approval, and so we must check for approval here. - // - // Note that this already produces actions for writing - // the candidate entry and any modified block entries to disk. - // - // It also produces actions to schedule wakeups for the candidate. - let actions = check_and_apply_full_approval( - state, - &metrics, - Some((assignment.block_hash, block_entry)), - assigned_candidate_hash, - candidate_entry, - |h, _| h == &assignment.block_hash, - )?; + let mut actions = Vec::new(); + + // We've imported a new approval, so we need to schedule a wake-up for when that might no-show. + if let Some((approval_entry, status)) = state.approval_status(&block_entry, &candidate_entry) { + actions.extend(schedule_wakeup_action( + approval_entry, + block_entry.block_hash(), + block_entry.block_number(), + assigned_candidate_hash, + status.block_tick, + status.required_tranches, + )); + } + + // We also write the candidate entry as it now contains the new candidate. + actions.push(Action::WriteCandidateEntry(assigned_candidate_hash, candidate_entry)); Ok((res, actions)) } @@ -1259,116 +1517,89 @@ fn check_and_import_approval( let actions = import_checked_approval( state, &metrics, - Some((approval.block_hash, block_entry)), + block_entry, approved_candidate_hash, candidate_entry, - approval.validator, - )?; + ApprovalSource::Remote(approval.validator), + ); Ok((actions, t)) } +enum ApprovalSource { + Remote(ValidatorIndex), + Local(ValidatorIndex, ValidatorSignature), +} + +impl ApprovalSource { + fn validator_index(&self) -> ValidatorIndex { + match *self { + ApprovalSource::Remote(v) | ApprovalSource::Local(v, _) => v, + } + } + + fn is_remote(&self) -> bool { + match *self { + ApprovalSource::Remote(_) => true, + ApprovalSource::Local(_, _) => false, + } + } +} + +// Import an approval vote which is already checked to be valid and corresponding to an assigned +// validator on the candidate and block. This updates the block entry and candidate entry as +// necessary and schedules any further wakeups. fn import_checked_approval( state: &State, metrics: &Metrics, - already_loaded: Option<(Hash, BlockEntry)>, + mut block_entry: BlockEntry, candidate_hash: CandidateHash, mut candidate_entry: CandidateEntry, - validator: ValidatorIndex, -) -> SubsystemResult> { - if candidate_entry.mark_approval(validator) { - // already approved - nothing to do here. - return Ok(Vec::new()); + source: ApprovalSource, +) -> Vec { + let validator_index = source.validator_index(); + + let already_approved_by = candidate_entry.mark_approval(validator_index); + let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash); + + // Check for early exits. + // + // If the candidate was approved + // but not the block, it means that we still need more approvals for the candidate under the + // block. + // + // If the block was approved, but the validator hadn't approved it yet, we should still hold + // onto the approval vote on-disk in case we restart and rebroadcast votes. Otherwise, our + // assignment might manifest as a no-show. + match source { + ApprovalSource::Remote(_) => { + // We don't store remote votes, so we can early exit as long at the candidate is + // already concluded under the block i.e. we don't need more approvals. + if candidate_approved_in_block { + return Vec::new(); + } + } + ApprovalSource::Local(_, _) => { + // We never early return on the local validator. + } } - // Check if this approval vote alters the approval state of any blocks. - // - // This may include blocks beyond the already loaded block. - let actions = check_and_apply_full_approval( - state, - metrics, - already_loaded, - candidate_hash, - candidate_entry, - |_, a| a.is_assigned(validator), - )?; - - Ok(actions) -} - -// Checks the candidate for full approval under all blocks matching the given filter. -// -// If returning without error, is guaranteed to have produced actions -// to write all modified block entries. It also schedules wakeups for -// the candidate under any blocks filtered. -fn check_and_apply_full_approval( - state: &State, - metrics: &Metrics, - mut already_loaded: Option<(Hash, BlockEntry)>, - candidate_hash: CandidateHash, - mut candidate_entry: CandidateEntry, - filter: impl Fn(&Hash, &ApprovalEntry) -> bool, -) -> SubsystemResult> { - // We only query this max once per hash. - let db = &state.db; - let mut load_block_entry = move |block_hash| -> SubsystemResult> { - if already_loaded.as_ref().map_or(false, |(h, _)| h == block_hash) { - Ok(already_loaded.take().map(|(_, c)| c)) - } else { - db.load_block_entry(block_hash) - } - }; - - let mut newly_approved = Vec::new(); let mut actions = Vec::new(); - for (block_hash, approval_entry) in candidate_entry.iter_approval_entries() - .into_iter() - .filter(|(h, a)| !a.is_approved() && filter(h, a)) + let block_hash = block_entry.block_hash(); + let block_number = block_entry.block_number(); + + let (is_approved, status) = if let Some((approval_entry, status)) + = state.approval_status(&block_entry, &candidate_entry) { - let mut block_entry = match load_block_entry(block_hash)? { - None => { - tracing::warn!( - target: LOG_TARGET, - "Missing block entry {} referenced by candidate {}", - block_hash, - candidate_hash, - ); - continue - } - Some(b) => b, - }; - - let session_info = match state.session_info(block_entry.session()) { - Some(s) => s, - None => { - tracing::warn!(target: LOG_TARGET, "Unknown session info for {}", block_entry.session()); - continue - } - }; - - let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot()); - let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot()); - let no_show_duration = slot_number_to_tick( - state.slot_duration_millis, - Slot::from(u64::from(session_info.no_show_slots)), - ); - - let required_tranches = approval_checking::tranches_to_approve( - approval_entry, - candidate_entry.approvals(), - tranche_now, - block_tick, - no_show_duration, - session_info.needed_approvals as _ - ); - let check = approval_checking::check_approval( &candidate_entry, approval_entry, - required_tranches.clone(), + status.required_tranches.clone(), ); - if check.is_approved() { + let is_approved = check.is_approved(); + + if is_approved { tracing::trace!( target: LOG_TARGET, ?candidate_hash, @@ -1378,42 +1609,74 @@ fn check_and_apply_full_approval( let no_shows = check.known_no_shows(); - let was_approved = block_entry.is_fully_approved(); - - newly_approved.push(*block_hash); + let was_block_approved = block_entry.is_fully_approved(); block_entry.mark_approved_by_hash(&candidate_hash); - let is_approved = block_entry.is_fully_approved(); + let is_block_approved = block_entry.is_fully_approved(); if no_shows != 0 { metrics.on_no_shows(no_shows); } - metrics.on_candidate_approved(tranche_now as _); + metrics.on_candidate_approved(status.tranche_now as _); - if is_approved && !was_approved { - metrics.on_block_approved(tranche_now as _) + if is_block_approved && !was_block_approved { + metrics.on_block_approved(status.tranche_now as _); } actions.push(Action::WriteBlockEntry(block_entry)); } + (is_approved, status) + } else { + tracing::warn!( + target: LOG_TARGET, + ?candidate_hash, + ?block_hash, + ?validator_index, + "No approval entry for approval under block", + ); + + return Vec::new(); + }; + + { + let approval_entry = candidate_entry.approval_entry_mut(&block_hash) + .expect("Approval entry just fetched; qed"); + + let was_approved = approval_entry.is_approved(); + let newly_approved = is_approved && !was_approved; + + if is_approved { + approval_entry.mark_approved(); + } + + if let ApprovalSource::Local(_, ref sig) = source { + approval_entry.import_approval_sig(sig.clone()); + } + actions.extend(schedule_wakeup_action( &approval_entry, - *block_hash, + block_hash, + block_number, candidate_hash, - block_tick, - required_tranches, + status.block_tick, + status.required_tranches, )); - } - for b in &newly_approved { - if let Some(a) = candidate_entry.approval_entry_mut(b) { - a.mark_approved(); + // We have no need to write the candidate entry if + // + // 1. The source is remote, as we don't store anything new in the approval entry. + // 2. The candidate is not newly approved, as we haven't altered the approval entry's + // approved flag with `mark_approved` above. + // 3. The source had already approved the candidate, as we haven't altered the bitfield. + if !source.is_remote() || newly_approved || !already_approved_by { + // In all other cases, we need to write the candidate entry. + actions.push(Action::WriteCandidateEntry(candidate_hash, candidate_entry)); } + } - actions.push(Action::WriteCandidateEntry(candidate_hash, candidate_entry)); - Ok(actions) + actions } fn should_trigger_assignment( @@ -1592,6 +1855,7 @@ fn process_wakeup( actions.extend(schedule_wakeup_action( &approval_entry, relay_block, + block_entry.block_number(), candidate_hash, block_tick, tranches_to_approve, @@ -1863,11 +2127,11 @@ fn issue_approval( let actions = import_checked_approval( state, metrics, - Some((block_hash, block_entry)), + block_entry, candidate_hash, candidate_entry, - validator_index as _, - )?; + ApprovalSource::Local(validator_index as _, sig.clone()), + ); metrics.on_approval_produced(); diff --git a/polkadot/node/core/approval-voting/src/persisted_entries.rs b/polkadot/node/core/approval-voting/src/persisted_entries.rs index 715e959981..646fdb19fe 100644 --- a/polkadot/node/core/approval-voting/src/persisted_entries.rs +++ b/polkadot/node/core/approval-voting/src/persisted_entries.rs @@ -23,7 +23,7 @@ use polkadot_node_primitives::approval::{DelayTranche, RelayVRFStory, AssignmentCert}; use polkadot_primitives::v1::{ ValidatorIndex, CandidateReceipt, SessionIndex, GroupIndex, CoreIndex, - Hash, CandidateHash, BlockNumber, + Hash, CandidateHash, BlockNumber, ValidatorSignature, }; use sp_consensus_slots::Slot; @@ -79,6 +79,7 @@ pub struct ApprovalEntry { tranches: Vec, backing_group: GroupIndex, our_assignment: Option, + our_approval_sig: Option, // `n_validators` bits. assignments: BitVec, approved: bool, @@ -108,6 +109,11 @@ impl ApprovalEntry { }) } + /// Import our local approval vote signature for this candidate. + pub fn import_approval_sig(&mut self, approval_sig: ValidatorSignature) { + self.our_approval_sig = Some(approval_sig); + } + /// Whether a validator is already assigned. pub fn is_assigned(&self, validator_index: ValidatorIndex) -> bool { self.assignments.get(validator_index.0 as usize).map(|b| *b).unwrap_or(false) @@ -190,6 +196,18 @@ impl ApprovalEntry { self.backing_group } + /// Get the assignment cert & approval signature. + /// + /// The approval signature will only be `Some` if the assignment is too. + pub fn local_statements(&self) -> (Option, Option) { + let approval_sig = self.our_approval_sig.clone(); + if let Some(our_assignment) = self.our_assignment.as_ref().filter(|a| a.triggered()) { + (Some(our_assignment.clone()), approval_sig) + } else { + (None, None) + } + } + /// For tests: set our assignment. #[cfg(test)] pub fn set_our_assignment(&mut self, our_assignment: OurAssignment) { @@ -203,6 +221,7 @@ impl From for ApprovalEntry { tranches: entry.tranches.into_iter().map(Into::into).collect(), backing_group: entry.backing_group, our_assignment: entry.our_assignment.map(Into::into), + our_approval_sig: entry.our_approval_sig.map(Into::into), assignments: entry.assignments, approved: entry.approved, } @@ -215,6 +234,7 @@ impl From for crate::approval_db::v1::ApprovalEntry { tranches: entry.tranches.into_iter().map(Into::into).collect(), backing_group: entry.backing_group, our_assignment: entry.our_assignment.map(Into::into), + our_approval_sig: entry.our_approval_sig.map(Into::into), assignments: entry.assignments, approved: entry.approved, } @@ -260,11 +280,6 @@ impl CandidateEntry { self.block_assignments.get(block_hash) } - /// Iterate over approval entries. - pub fn iter_approval_entries(&self) -> impl IntoIterator { - self.block_assignments.iter() - } - #[cfg(test)] pub fn add_approval_entry( &mut self, @@ -325,6 +340,13 @@ impl BlockEntry { } } + /// Whether a candidate is approved in the bitfield. + pub fn is_candidate_approved(&self, candidate_hash: &CandidateHash) -> bool { + self.candidates.iter().position(|(_, h)| h == candidate_hash) + .and_then(|p| self.approved_bitfield.get(p).map(|b| *b)) + .unwrap_or(false) + } + /// Whether the block entry is fully approved. pub fn is_fully_approved(&self) -> bool { self.approved_bitfield.all() @@ -339,18 +361,6 @@ impl BlockEntry { }) } - #[cfg(test)] - pub fn block_hash(&self) -> Hash { - self.block_hash - } - - #[cfg(test)] - pub fn is_candidate_approved(&self, candidate_hash: &CandidateHash) -> bool { - self.candidates.iter().position(|(_, h)| h == candidate_hash) - .and_then(|p| self.approved_bitfield.get(p).map(|b| *b)) - .unwrap_or(false) - } - /// For tests: Add a candidate to the block entry. Returns the /// index where the candidate was added. /// @@ -402,6 +412,16 @@ impl BlockEntry { pub fn block_number(&self) -> BlockNumber { self.block_number } + + /// Access the block hash of the block entry. + pub fn block_hash(&self) -> Hash { + self.block_hash + } + + /// Access the parent hash of the block entry. + pub fn parent_hash(&self) -> Hash { + self.parent_hash + } } impl From for BlockEntry { diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index c60dbbcb32..95b76d8a55 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -179,6 +179,14 @@ impl DBReader for TestStore { ) -> SubsystemResult> { Ok(self.candidate_entries.get(candidate_hash).cloned()) } + + fn load_all_blocks(&self) -> SubsystemResult> { + let mut hashes: Vec<_> = self.block_entries.keys().cloned().collect(); + + hashes.sort_by_key(|k| self.block_entries.get(k).unwrap().block_number()); + + Ok(hashes) + } } fn blank_state() -> State { @@ -226,6 +234,7 @@ fn sign_approval( key.sign(&super::approval_signing_payload(ApprovalVote(candidate_hash), session_index)).into() } +#[derive(Clone)] struct StateConfig { session_index: SessionIndex, slot: Slot, @@ -353,6 +362,7 @@ fn add_candidate_to_block( tranches: Vec::new(), backing_group, our_assignment: None, + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], approved: false, }.into(), @@ -378,7 +388,6 @@ fn rejects_bad_assignment() { let res = check_and_import_assignment( &mut state, - &Metrics(None), assignment_good.clone(), candidate_index, ).unwrap(); @@ -399,7 +408,6 @@ fn rejects_bad_assignment() { let res = check_and_import_assignment( &mut state, - &Metrics(None), assignment, candidate_index, ).unwrap(); @@ -415,7 +423,6 @@ fn rejects_bad_assignment() { // same assignment, but this time rejected let res = check_and_import_assignment( &mut state, - &Metrics(None), assignment_good, candidate_index, ).unwrap(); @@ -446,7 +453,6 @@ fn rejects_assignment_in_future() { let res = check_and_import_assignment( &mut state, - &Metrics(None), assignment.clone(), candidate_index, ).unwrap(); @@ -461,7 +467,6 @@ fn rejects_assignment_in_future() { let res = check_and_import_assignment( &mut state, - &Metrics(None), assignment.clone(), candidate_index, ).unwrap(); @@ -486,7 +491,6 @@ fn rejects_assignment_with_unknown_candidate() { let res = check_and_import_assignment( &mut state, - &Metrics(None), assignment.clone(), candidate_index, ).unwrap(); @@ -518,7 +522,6 @@ fn assignment_import_updates_candidate_entry_and_schedules_wakeup() { let (res, actions) = check_and_import_assignment( &mut state, - &Metrics(None), assignment.clone(), candidate_index, ).unwrap(); @@ -532,6 +535,7 @@ fn assignment_import_updates_candidate_entry_and_schedules_wakeup() { block_hash: b, candidate_hash: c, tick, + .. } => { assert_eq!(b, &block_hash); assert_eq!(c, &candidate_hash); @@ -700,10 +704,11 @@ fn accepts_and_imports_approval_after_assignment() { } #[test] -fn second_approval_import_is_no_op() { +fn second_approval_import_only_schedules_wakeups() { let block_hash = Hash::repeat_byte(0x01); let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); let validator_index = ValidatorIndex(0); + let validator_index_b = ValidatorIndex(1); let candidate_index = 0; let mut state = State { @@ -733,6 +738,25 @@ fn second_approval_import_is_no_op() { assert!(!state.db.candidate_entries.get_mut(&candidate_hash).unwrap() .mark_approval(validator_index)); + // There is only one assignment, so nothing to schedule if we double-import. + + let (actions, res) = check_and_import_approval( + &state, + &Metrics(None), + vote.clone(), + |r| r + ).unwrap(); + + assert_eq!(res, ApprovalCheckResult::Accepted); + assert!(actions.is_empty()); + + // After adding a second assignment, there should be a schedule wakeup action. + + state.db.candidate_entries.get_mut(&candidate_hash).unwrap() + .approval_entry_mut(&block_hash) + .unwrap() + .import_assignment(0, validator_index_b, 0); + let (actions, res) = check_and_import_approval( &state, &Metrics(None), @@ -741,11 +765,16 @@ fn second_approval_import_is_no_op() { ).unwrap(); assert_eq!(res, ApprovalCheckResult::Accepted); - assert!(actions.is_empty()) + assert_eq!(actions.len(), 1); + + assert_matches!( + actions.get(0).unwrap(), + Action::ScheduleWakeup { .. } => {} + ); } #[test] -fn check_and_apply_full_approval_sets_flag_and_bit() { +fn import_checked_approval_updates_entries_and_schedules() { let block_hash = Hash::repeat_byte(0x01); let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); let validator_index_a = ValidatorIndex(0); @@ -773,101 +802,67 @@ fn check_and_apply_full_approval_sets_flag_and_bit() { .unwrap() .import_assignment(0, validator_index_b, 0); - assert!(!state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .mark_approval(validator_index_a)); + { + let mut actions = import_checked_approval( + &state, + &Metrics(None), + state.db.block_entries.get(&block_hash).unwrap().clone(), + candidate_hash, + state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + ApprovalSource::Remote(validator_index_a), + ); - assert!(!state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .mark_approval(validator_index_b)); + assert_eq!(actions.len(), 2); + assert_matches!( + actions.get(0).unwrap(), + Action::ScheduleWakeup { + block_hash: b_hash, + candidate_hash: c_hash, + .. + } => { + assert_eq!(b_hash, &block_hash); + assert_eq!(c_hash, &candidate_hash); + } + ); + assert_matches!( + actions.get_mut(1).unwrap(), + Action::WriteCandidateEntry(c_hash, ref mut c_entry) => { + assert_eq!(c_hash, &candidate_hash); + assert!(!c_entry.approval_entry(&block_hash).unwrap().is_approved()); + assert!(c_entry.mark_approval(validator_index_a)); - let actions = check_and_apply_full_approval( - &state, - &Metrics(None), - None, - candidate_hash, - state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), - |b_hash, _a| b_hash == &block_hash, - ).unwrap(); + state.db.candidate_entries.insert(candidate_hash, c_entry.clone()); + } + ); + } - assert_eq!(actions.len(), 2); - assert_matches!( - actions.get(0).unwrap(), - Action::WriteBlockEntry(b_entry) => { - assert_eq!(b_entry.block_hash(), block_hash); - assert!(b_entry.is_fully_approved()); - assert!(b_entry.is_candidate_approved(&candidate_hash)); - } - ); - assert_matches!( - actions.get(1).unwrap(), - Action::WriteCandidateEntry(c_hash, c_entry) => { - assert_eq!(c_hash, &candidate_hash); - assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved()); - } - ); -} + { + let mut actions = import_checked_approval( + &state, + &Metrics(None), + state.db.block_entries.get(&block_hash).unwrap().clone(), + candidate_hash, + state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + ApprovalSource::Remote(validator_index_b), + ); -#[test] -fn check_and_apply_full_approval_does_not_load_cached_block_from_db() { - let block_hash = Hash::repeat_byte(0x01); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); - let validator_index_a = ValidatorIndex(0); - let validator_index_b = ValidatorIndex(1); - - let mut state = State { - assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { - Ok(0) - })), - ..some_state(StateConfig { - validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie], - validator_groups: vec![vec![ValidatorIndex(0), ValidatorIndex(1)], vec![ValidatorIndex(2)]], - needed_approvals: 2, - ..Default::default() - }) - }; - - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index_a, 0); - - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index_b, 0); - - assert!(!state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .mark_approval(validator_index_a)); - - assert!(!state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .mark_approval(validator_index_b)); - - let block_entry = state.db.block_entries.remove(&block_hash).unwrap(); - - let actions = check_and_apply_full_approval( - &state, - &Metrics(None), - Some((block_hash, block_entry)), - candidate_hash, - state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), - |b_hash, _a| b_hash == &block_hash, - ).unwrap(); - - assert_eq!(actions.len(), 2); - assert_matches!( - actions.get(0).unwrap(), - Action::WriteBlockEntry(b_entry) => { - assert_eq!(b_entry.block_hash(), block_hash); - assert!(b_entry.is_fully_approved()); - assert!(b_entry.is_candidate_approved(&candidate_hash)); - } - ); - assert_matches!( - actions.get(1).unwrap(), - Action::WriteCandidateEntry(c_hash, c_entry) => { - assert_eq!(c_hash, &candidate_hash); - assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved()); - } - ); + assert_matches!( + actions.get(0).unwrap(), + Action::WriteBlockEntry(b_entry) => { + assert_eq!(b_entry.block_hash(), block_hash); + assert!(b_entry.is_fully_approved()); + assert!(b_entry.is_candidate_approved(&candidate_hash)); + } + ); + assert_matches!( + actions.get_mut(1).unwrap(), + Action::WriteCandidateEntry(c_hash, ref mut c_entry) => { + assert_eq!(c_hash, &candidate_hash); + assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved()); + assert!(c_entry.mark_approval(validator_index_b)); + } + ); + } } #[test] @@ -886,6 +881,7 @@ fn assignment_triggered_by_all_with_less_than_threshold() { validator_index: ValidatorIndex(4), triggered: false, }), + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; 4], approved: false, }; @@ -931,6 +927,7 @@ fn assignment_not_triggered_by_all_with_threshold() { validator_index: ValidatorIndex(4), triggered: false, }), + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; 4], approved: false, }; @@ -982,6 +979,7 @@ fn assignment_not_triggered_if_already_triggered() { validator_index: ValidatorIndex(4), triggered: true, }), + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; 4], approved: false, }; @@ -1019,6 +1017,7 @@ fn assignment_not_triggered_by_exact() { validator_index: ValidatorIndex(4), triggered: false, }), + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; 4], approved: false, }; @@ -1057,6 +1056,7 @@ fn assignment_not_triggered_more_than_maximum() { validator_index: ValidatorIndex(4), triggered: false, }), + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; 4], approved: false, }; @@ -1100,6 +1100,7 @@ fn assignment_triggered_if_at_maximum() { validator_index: ValidatorIndex(4), triggered: false, }), + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; 4], approved: false, }; @@ -1143,6 +1144,7 @@ fn assignment_not_triggered_if_at_maximum_but_clock_is_before() { validator_index: ValidatorIndex(4), triggered: false, }), + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; 4], approved: false, }; @@ -1186,6 +1188,7 @@ fn assignment_not_triggered_if_at_maximum_but_clock_is_before_with_drift() { validator_index: ValidatorIndex(4), triggered: false, }), + our_approval_sig: None, assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; 4], approved: false, }; @@ -1222,9 +1225,9 @@ fn wakeups_next() { let c_a = CandidateHash(Hash::repeat_byte(2)); let c_b = CandidateHash(Hash::repeat_byte(3)); - wakeups.schedule(b_a, c_a, 1); - wakeups.schedule(b_a, c_b, 4); - wakeups.schedule(b_b, c_b, 3); + wakeups.schedule(b_a, 0, c_a, 1); + wakeups.schedule(b_a, 0, c_b, 4); + wakeups.schedule(b_b, 1, c_b, 3); assert_eq!(wakeups.first().unwrap(), 1); @@ -1237,6 +1240,28 @@ fn wakeups_next() { assert_eq!(wakeups.next(&clock).await, (4, b_a, c_b)); assert!(wakeups.first().is_none()); assert!(wakeups.wakeups.is_empty()); + + assert_eq!( + wakeups.block_numbers.get(&0).unwrap(), + &vec![b_a].into_iter().collect::>(), + ); + assert_eq!( + wakeups.block_numbers.get(&1).unwrap(), + &vec![b_b].into_iter().collect::>(), + ); + + wakeups.prune_finalized_wakeups(0); + + assert!(wakeups.block_numbers.get(&0).is_none()); + assert_eq!( + wakeups.block_numbers.get(&1).unwrap(), + &vec![b_b].into_iter().collect::>(), + ); + + wakeups.prune_finalized_wakeups(1); + + assert!(wakeups.block_numbers.get(&0).is_none()); + assert!(wakeups.block_numbers.get(&1).is_none()); }); let aux_fut = Box::pin(async move { @@ -1255,9 +1280,9 @@ fn wakeup_earlier_supersedes_later() { let b_a = Hash::repeat_byte(0); let c_a = CandidateHash(Hash::repeat_byte(2)); - wakeups.schedule(b_a, c_a, 4); - wakeups.schedule(b_a, c_a, 2); - wakeups.schedule(b_a, c_a, 3); + wakeups.schedule(b_a, 0, c_a, 4); + wakeups.schedule(b_a, 0, c_a, 2); + wakeups.schedule(b_a, 0, c_a, 3); let clock = MockClock::new(0); let clock_aux = clock.clone(); @@ -1276,7 +1301,7 @@ fn wakeup_earlier_supersedes_later() { } #[test] -fn block_not_approved_until_all_candidates_approved() { +fn import_checked_approval_sets_one_block_bit_at_a_time() { let block_hash = Hash::repeat_byte(0x01); let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); let candidate_hash_2 = CandidateHash(Hash::repeat_byte(0xDD)); @@ -1305,7 +1330,7 @@ fn block_not_approved_until_all_candidates_approved() { GroupIndex(1), ); - let approve_candidate = |db: &mut TestStore, c_hash| { + let setup_candidate = |db: &mut TestStore, c_hash| { db.candidate_entries.get_mut(&c_hash).unwrap() .approval_entry_mut(&block_hash) .unwrap() @@ -1318,27 +1343,51 @@ fn block_not_approved_until_all_candidates_approved() { assert!(!db.candidate_entries.get_mut(&c_hash).unwrap() .mark_approval(validator_index_a)); - - assert!(!db.candidate_entries.get_mut(&c_hash).unwrap() - .mark_approval(validator_index_b)); }; - approve_candidate(&mut state.db, candidate_hash_2); + setup_candidate(&mut state.db, candidate_hash); + setup_candidate(&mut state.db, candidate_hash_2); - { - let b = state.db.block_entries.get_mut(&block_hash).unwrap(); - b.mark_approved_by_hash(&candidate_hash); - assert!(!b.is_fully_approved()); - } - - let actions = check_and_apply_full_approval( + let actions = import_checked_approval( &state, &Metrics(None), - None, + state.db.block_entries.get(&block_hash).unwrap().clone(), + candidate_hash, + state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + ApprovalSource::Remote(validator_index_b), + ); + + assert_eq!(actions.len(), 2); + assert_matches!( + actions.get(0).unwrap(), + Action::WriteBlockEntry(b_entry) => { + assert_eq!(b_entry.block_hash(), block_hash); + assert!(!b_entry.is_fully_approved()); + assert!(b_entry.is_candidate_approved(&candidate_hash)); + assert!(!b_entry.is_candidate_approved(&candidate_hash_2)); + + state.db.block_entries.insert(block_hash, b_entry.clone()); + } + ); + + assert_matches!( + actions.get(1).unwrap(), + Action::WriteCandidateEntry(c_h, c_entry) => { + assert_eq!(c_h, &candidate_hash); + assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved()); + + state.db.candidate_entries.insert(*c_h, c_entry.clone()); + } + ); + + let actions = import_checked_approval( + &state, + &Metrics(None), + state.db.block_entries.get(&block_hash).unwrap().clone(), candidate_hash_2, state.db.candidate_entries.get(&candidate_hash_2).unwrap().clone(), - |b_hash, _a| b_hash == &block_hash, - ).unwrap(); + ApprovalSource::Remote(validator_index_b), + ); assert_eq!(actions.len(), 2); assert_matches!( @@ -1346,6 +1395,7 @@ fn block_not_approved_until_all_candidates_approved() { Action::WriteBlockEntry(b_entry) => { assert_eq!(b_entry.block_hash(), block_hash); assert!(b_entry.is_fully_approved()); + assert!(b_entry.is_candidate_approved(&candidate_hash)); assert!(b_entry.is_candidate_approved(&candidate_hash_2)); } ); @@ -1359,109 +1409,6 @@ fn block_not_approved_until_all_candidates_approved() { ); } -#[test] -fn candidate_approval_applied_to_all_blocks() { - let block_hash = Hash::repeat_byte(0x01); - let block_hash_2 = Hash::repeat_byte(0x02); - let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); - let validator_index_a = ValidatorIndex(0); - let validator_index_b = ValidatorIndex(1); - - let slot = Slot::from(1); - let session_index = 1; - - let mut state = State { - assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { - Ok(0) - })), - ..some_state(StateConfig { - validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie], - validator_groups: vec![vec![ValidatorIndex(0), ValidatorIndex(1)], vec![ValidatorIndex(2)]], - needed_approvals: 2, - session_index, - slot, - ..Default::default() - }) - }; - - add_block( - &mut state.db, - block_hash_2, - session_index, - slot, - ); - - add_candidate_to_block( - &mut state.db, - block_hash_2, - candidate_hash, - 3, - CoreIndex(1), - GroupIndex(1), - ); - - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index_a, 0); - - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash) - .unwrap() - .import_assignment(0, validator_index_b, 0); - - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash_2) - .unwrap() - .import_assignment(0, validator_index_a, 0); - - state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .approval_entry_mut(&block_hash_2) - .unwrap() - .import_assignment(0, validator_index_b, 0); - - assert!(!state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .mark_approval(validator_index_a)); - - assert!(!state.db.candidate_entries.get_mut(&candidate_hash).unwrap() - .mark_approval(validator_index_b)); - - let actions = check_and_apply_full_approval( - &state, - &Metrics(None), - None, - candidate_hash, - state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), - |_b_hash, _a| true, - ).unwrap(); - - assert_eq!(actions.len(), 3); - assert_matches!( - actions.get(0).unwrap(), - Action::WriteBlockEntry(b_entry) => { - assert_eq!(b_entry.block_hash(), block_hash); - assert!(b_entry.is_fully_approved()); - assert!(b_entry.is_candidate_approved(&candidate_hash)); - } - ); - assert_matches!( - actions.get(1).unwrap(), - Action::WriteBlockEntry(b_entry) => { - assert_eq!(b_entry.block_hash(), block_hash_2); - assert!(b_entry.is_fully_approved()); - assert!(b_entry.is_candidate_approved(&candidate_hash)); - } - ); - assert_matches!( - actions.get(2).unwrap(), - Action::WriteCandidateEntry(c_hash, c_entry) => { - assert_eq!(c_hash, &candidate_hash); - assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved()); - assert!(c_entry.approval_entry(&block_hash_2).unwrap().is_approved()); - } - ); -} - #[test] fn approved_ancestor_all_approved() { let block_hash_1 = Hash::repeat_byte(0x01); @@ -1760,7 +1707,7 @@ fn process_wakeup_schedules_wakeup() { assert_eq!(actions.len(), 1); assert_matches!( actions.get(0).unwrap(), - Action::ScheduleWakeup { block_hash: b, candidate_hash: c, tick } => { + Action::ScheduleWakeup { block_hash: b, candidate_hash: c, tick, .. } => { assert_eq!(b, &block_hash); assert_eq!(c, &candidate_hash); assert_eq!(tick, &(slot_to_tick(slot) + 10)); @@ -1777,3 +1724,123 @@ fn triggered_assignment_leads_to_recovery_and_validation() { fn finalization_event_prunes() { } + +#[test] +fn local_approval_import_always_updates_approval_entry() { + let block_hash = Hash::repeat_byte(0x01); + let block_hash_2 = Hash::repeat_byte(0x02); + let candidate_hash = CandidateHash(Hash::repeat_byte(0xCC)); + let validator_index = ValidatorIndex(0); + + let state_config = StateConfig { + validators: vec![Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie], + validator_groups: vec![vec![ValidatorIndex(0), ValidatorIndex(1)], vec![ValidatorIndex(2)]], + needed_approvals: 2, + ..Default::default() + }; + + let mut state = State { + assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|| { + Ok(0) + })), + ..some_state(state_config.clone()) + }; + + add_block( + &mut state.db, + block_hash_2, + state_config.session_index, + state_config.slot, + ); + + add_candidate_to_block( + &mut state.db, + block_hash_2, + candidate_hash, + state_config.validators.len(), + 1.into(), + GroupIndex(1), + ); + + let sig_a = sign_approval(Sr25519Keyring::Alice, candidate_hash, 1); + let sig_b = sign_approval(Sr25519Keyring::Alice, candidate_hash, 1); + + { + let mut import_local_assignment = |block_hash: Hash| { + let approval_entry = state.db.candidate_entries.get_mut(&candidate_hash).unwrap() + .approval_entry_mut(&block_hash) + .unwrap(); + + approval_entry.set_our_assignment(approval_db::v1::OurAssignment { + cert: garbage_assignment_cert( + AssignmentCertKind::RelayVRFModulo { sample: 0 } + ), + tranche: 0, + validator_index, + triggered: false, + }.into()); + + assert!(approval_entry.trigger_our_assignment(0).is_some()); + assert!(approval_entry.local_statements().0.is_some()); + }; + + import_local_assignment(block_hash); + import_local_assignment(block_hash_2); + } + + { + let mut actions = import_checked_approval( + &state, + &Metrics(None), + state.db.block_entries.get(&block_hash).unwrap().clone(), + candidate_hash, + state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + ApprovalSource::Local(validator_index, sig_a.clone()), + ); + + assert_eq!(actions.len(), 1); + + assert_matches!( + actions.get_mut(0).unwrap(), + Action::WriteCandidateEntry(c_hash, ref mut c_entry) => { + assert_eq!(c_hash, &candidate_hash); + assert_eq!( + c_entry.approval_entry(&block_hash).unwrap().local_statements().1, + Some(sig_a), + ); + assert!(c_entry.mark_approval(validator_index)); + + state.db.candidate_entries.insert(candidate_hash, c_entry.clone()); + } + ); + } + + { + let mut actions = import_checked_approval( + &state, + &Metrics(None), + state.db.block_entries.get(&block_hash_2).unwrap().clone(), + candidate_hash, + state.db.candidate_entries.get(&candidate_hash).unwrap().clone(), + ApprovalSource::Local(validator_index, sig_b.clone()), + ); + + assert_eq!(actions.len(), 1); + + assert_matches!( + actions.get_mut(0).unwrap(), + Action::WriteCandidateEntry(c_hash, ref mut c_entry) => { + assert_eq!(c_hash, &candidate_hash); + assert_eq!( + c_entry.approval_entry(&block_hash_2).unwrap().local_statements().1, + Some(sig_b), + ); + assert!(c_entry.mark_approval(validator_index)); + + state.db.candidate_entries.insert(candidate_hash, c_entry.clone()); + } + ); + } +} + +// TODO [now]: handling `BecomeActive` action broadcasts everything. diff --git a/polkadot/node/core/av-store/Cargo.toml b/polkadot/node/core/av-store/Cargo.toml index 80a399d260..87a57e4fb8 100644 --- a/polkadot/node/core/av-store/Cargo.toml +++ b/polkadot/node/core/av-store/Cargo.toml @@ -8,7 +8,6 @@ edition = "2018" futures = "0.3.12" futures-timer = "3.0.2" kvdb = "0.9.0" -kvdb-rocksdb = "0.11.0" thiserror = "1.0.23" tracing = "0.1.25" bitvec = "0.20.1" @@ -21,8 +20,6 @@ polkadot-overseer = { path = "../../overseer" } polkadot-primitives = { path = "../../../primitives" } polkadot-node-primitives = { path = "../../primitives" } -sc-service = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } - [dev-dependencies] log = "0.4.13" env_logger = "0.8.2" diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 4e6a4e3c65..efe0379a4a 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -21,14 +21,12 @@ use std::collections::HashMap; use std::io; -use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}; use parity_scale_codec::{Encode, Decode, Input, Error as CodecError}; use futures::{select, channel::oneshot, future, FutureExt}; use futures_timer::Delay; -use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb::{KeyValueDB, DBTransaction}; use polkadot_primitives::v1::{ @@ -54,12 +52,6 @@ mod tests; const LOG_TARGET: &str = "parachain::availability"; -mod columns { - pub const DATA: u32 = 0; - pub const META: u32 = 1; - pub const NUM_COLUMNS: u32 = 2; -} - /// The following constants are used under normal conditions: const AVAILABLE_PREFIX: &[u8; 9] = b"available"; @@ -177,97 +169,107 @@ fn query_inner( fn write_available_data( tx: &mut DBTransaction, + config: &Config, hash: &CandidateHash, available_data: &AvailableData, ) { let key = (AVAILABLE_PREFIX, hash).encode(); - tx.put_vec(columns::DATA, &key[..], available_data.encode()); + tx.put_vec(config.col_data, &key[..], available_data.encode()); } fn load_available_data( db: &Arc, + config: &Config, hash: &CandidateHash, ) -> Result, Error> { let key = (AVAILABLE_PREFIX, hash).encode(); - query_inner(db, columns::DATA, &key) + query_inner(db, config.col_data, &key) } fn delete_available_data( tx: &mut DBTransaction, + config: &Config, hash: &CandidateHash, ) { let key = (AVAILABLE_PREFIX, hash).encode(); - tx.delete(columns::DATA, &key[..]) + tx.delete(config.col_data, &key[..]) } fn load_chunk( db: &Arc, + config: &Config, candidate_hash: &CandidateHash, chunk_index: ValidatorIndex, ) -> Result, Error> { let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode(); - query_inner(db, columns::DATA, &key) + query_inner(db, config.col_data, &key) } fn write_chunk( tx: &mut DBTransaction, + config: &Config, candidate_hash: &CandidateHash, chunk_index: ValidatorIndex, erasure_chunk: &ErasureChunk, ) { let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode(); - tx.put_vec(columns::DATA, &key, erasure_chunk.encode()); + tx.put_vec(config.col_data, &key, erasure_chunk.encode()); } fn delete_chunk( tx: &mut DBTransaction, + config: &Config, candidate_hash: &CandidateHash, chunk_index: ValidatorIndex, ) { let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode(); - tx.delete(columns::DATA, &key[..]); + tx.delete(config.col_data, &key[..]); } fn load_meta( db: &Arc, + config: &Config, hash: &CandidateHash, ) -> Result, Error> { let key = (META_PREFIX, hash).encode(); - query_inner(db, columns::META, &key) + query_inner(db, config.col_meta, &key) } fn write_meta( tx: &mut DBTransaction, + config: &Config, hash: &CandidateHash, meta: &CandidateMeta, ) { let key = (META_PREFIX, hash).encode(); - tx.put_vec(columns::META, &key, meta.encode()); + tx.put_vec(config.col_meta, &key, meta.encode()); } -fn delete_meta(tx: &mut DBTransaction, hash: &CandidateHash) { +fn delete_meta(tx: &mut DBTransaction, config: &Config, hash: &CandidateHash) { let key = (META_PREFIX, hash).encode(); - tx.delete(columns::META, &key[..]) + tx.delete(config.col_meta, &key[..]) } fn delete_unfinalized_height( tx: &mut DBTransaction, + config: &Config, block_number: BlockNumber, ) { let prefix = (UNFINALIZED_PREFIX, BEBlockNumber(block_number)).encode(); - tx.delete_prefix(columns::META, &prefix); + tx.delete_prefix(config.col_meta, &prefix); } fn delete_unfinalized_inclusion( tx: &mut DBTransaction, + config: &Config, block_number: BlockNumber, block_hash: &Hash, candidate_hash: &CandidateHash, @@ -279,18 +281,28 @@ fn delete_unfinalized_inclusion( candidate_hash, ).encode(); - tx.delete(columns::META, &key[..]); + tx.delete(config.col_meta, &key[..]); } -fn delete_pruning_key(tx: &mut DBTransaction, t: impl Into, h: &CandidateHash) { +fn delete_pruning_key( + tx: &mut DBTransaction, + config: &Config, + t: impl Into, + h: &CandidateHash, +) { let key = (PRUNE_BY_TIME_PREFIX, t.into(), h).encode(); - tx.delete(columns::META, &key); + tx.delete(config.col_meta, &key); } -fn write_pruning_key(tx: &mut DBTransaction, t: impl Into, h: &CandidateHash) { +fn write_pruning_key( + tx: &mut DBTransaction, + config: &Config, + t: impl Into, + h: &CandidateHash, +) { let t = t.into(); let key = (PRUNE_BY_TIME_PREFIX, t, h).encode(); - tx.put(columns::META, &key, TOMBSTONE_VALUE); + tx.put(config.col_meta, &key, TOMBSTONE_VALUE); } fn finalized_block_range(finalized: BlockNumber) -> (Vec, Vec) { @@ -303,12 +315,13 @@ fn finalized_block_range(finalized: BlockNumber) -> (Vec, Vec) { fn write_unfinalized_block_contains( tx: &mut DBTransaction, + config: &Config, n: BlockNumber, h: &Hash, ch: &CandidateHash, ) { let key = (UNFINALIZED_PREFIX, BEBlockNumber(n), h, ch).encode(); - tx.put(columns::META, &key, TOMBSTONE_VALUE); + tx.put(config.col_meta, &key, TOMBSTONE_VALUE); } fn pruning_range(now: impl Into) -> (Vec, Vec) { @@ -405,28 +418,12 @@ impl Default for PruningConfig { } /// Configuration for the availability store. +#[derive(Debug, Clone, Copy)] pub struct Config { - /// Total cache size in megabytes. If `None` the default (128 MiB per column) is used. - pub cache_size: Option, - /// Path to the database. - pub path: PathBuf, -} - -impl std::convert::TryFrom for Config { - type Error = Error; - - fn try_from(config: sc_service::config::DatabaseConfig) -> Result { - let path = config.path().ok_or(Error::CustomDatabase)?; - - Ok(Self { - // substrate cache size is improper here; just use the default - cache_size: None, - // DB path is a sub-directory of substrate db path to give two properties: - // 1: column numbers don't conflict with substrate - // 2: commands like purge-chain work without further changes - path: path.join("parachains").join("av-store"), - }) - } + /// The column family for availability data and chunks. + pub col_data: u32, + /// The column family for availability store meta information. + pub col_meta: u32, } trait Clock: Send + Sync { @@ -445,6 +442,7 @@ impl Clock for SystemClock { /// An implementation of the Availability Store subsystem. pub struct AvailabilityStoreSubsystem { pruning_config: PruningConfig, + config: Config, db: Arc, metrics: Metrics, clock: Box, @@ -452,44 +450,33 @@ pub struct AvailabilityStoreSubsystem { impl AvailabilityStoreSubsystem { /// Create a new `AvailabilityStoreSubsystem` with a given config on disk. - pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result { - let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS); - - if let Some(cache_size) = config.cache_size { - let mut memory_budget = HashMap::new(); - - for i in 0..columns::NUM_COLUMNS { - memory_budget.insert(i, cache_size / columns::NUM_COLUMNS as usize); - } - db_config.memory_budget = memory_budget; - } - - let path = config.path.to_str().ok_or_else(|| io::Error::new( - io::ErrorKind::Other, - format!("Bad database path: {:?}", config.path), - ))?; - - std::fs::create_dir_all(&path)?; - let db = Database::open(&db_config, &path)?; - - Ok(Self { - pruning_config: PruningConfig::default(), - db: Arc::new(db), + pub fn new( + db: Arc, + config: Config, + metrics: Metrics, + ) -> Self { + Self::with_pruning_config_and_clock( + db, + config, + PruningConfig::default(), + Box::new(SystemClock), metrics, - clock: Box::new(SystemClock), - }) + ) } - #[cfg(test)] - fn new_in_memory( + /// Create a new `AvailabilityStoreSubsystem` with a given config on disk. + fn with_pruning_config_and_clock( db: Arc, + config: Config, pruning_config: PruningConfig, clock: Box, + metrics: Metrics, ) -> Self { Self { pruning_config, + config, db, - metrics: Metrics(None), + metrics, clock, } } @@ -581,7 +568,7 @@ where *next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse(); let _timer = subsystem.metrics.time_pruning(); - prune_all(&subsystem.db, &*subsystem.clock)?; + prune_all(&subsystem.db, &subsystem.config, &*subsystem.clock)?; } } @@ -648,6 +635,7 @@ async fn process_block_activated( note_block_backed( &subsystem.db, &mut tx, + &subsystem.config, &subsystem.pruning_config, now, n_validators, @@ -658,6 +646,7 @@ async fn process_block_activated( note_block_included( &subsystem.db, &mut tx, + &subsystem.config, &subsystem.pruning_config, (block_number, activated), receipt, @@ -675,6 +664,7 @@ async fn process_block_activated( fn note_block_backed( db: &Arc, db_transaction: &mut DBTransaction, + config: &Config, pruning_config: &PruningConfig, now: Duration, n_validators: usize, @@ -688,7 +678,7 @@ fn note_block_backed( "Candidate backed", ); - if load_meta(db, &candidate_hash)?.is_none() { + if load_meta(db, config, &candidate_hash)?.is_none() { let meta = CandidateMeta { state: State::Unavailable(now.into()), data_available: false, @@ -697,8 +687,8 @@ fn note_block_backed( let prune_at = now + pruning_config.keep_unavailable_for; - write_pruning_key(db_transaction, prune_at, &candidate_hash); - write_meta(db_transaction, &candidate_hash, &meta); + write_pruning_key(db_transaction, config, prune_at, &candidate_hash); + write_meta(db_transaction, config, &candidate_hash, &meta); } Ok(()) @@ -707,13 +697,14 @@ fn note_block_backed( fn note_block_included( db: &Arc, db_transaction: &mut DBTransaction, + config: &Config, pruning_config:&PruningConfig, block: (BlockNumber, Hash), candidate: CandidateReceipt, ) -> Result<(), Error> { let candidate_hash = candidate.hash(); - match load_meta(db, &candidate_hash)? { + match load_meta(db, config, &candidate_hash)? { None => { // This is alarming. We've observed a block being included without ever seeing it backed. // Warn and ignore. @@ -736,7 +727,7 @@ fn note_block_included( State::Unavailable(at) => { let at_d: Duration = at.into(); let prune_at = at_d + pruning_config.keep_unavailable_for; - delete_pruning_key(db_transaction, prune_at, &candidate_hash); + delete_pruning_key(db_transaction, config, prune_at, &candidate_hash); State::Unfinalized(at, vec![be_block]) } @@ -754,8 +745,14 @@ fn note_block_included( } }; - write_unfinalized_block_contains(db_transaction, block.0, &block.1, &candidate_hash); - write_meta(db_transaction, &candidate_hash, &meta); + write_unfinalized_block_contains( + db_transaction, + config, + block.0, + &block.1, + &candidate_hash, + ); + write_meta(db_transaction, config, &candidate_hash, &meta); } } @@ -788,7 +785,7 @@ async fn process_block_finalized( // as it is not `Send`. That is why we create the iterator once within this loop, drop it, // do an asynchronous request, and then instantiate the exact same iterator again. let batch_num = { - let mut iter = subsystem.db.iter_with_prefix(columns::META, &start_prefix) + let mut iter = subsystem.db.iter_with_prefix(subsystem.config.col_meta, &start_prefix) .take_while(|(k, _)| &k[..] < &end_prefix[..]) .peekable(); @@ -821,7 +818,7 @@ async fn process_block_finalized( } }; - let iter = subsystem.db.iter_with_prefix(columns::META, &start_prefix) + let iter = subsystem.db.iter_with_prefix(subsystem.config.col_meta, &start_prefix) .take_while(|(k, _)| &k[..] < &end_prefix[..]) .peekable(); @@ -830,7 +827,7 @@ async fn process_block_finalized( // Now that we've iterated over the entire batch at this finalized height, // update the meta. - delete_unfinalized_height(&mut db_transaction, batch_num); + delete_unfinalized_height(&mut db_transaction, &subsystem.config, batch_num); update_blocks_at_finalized_height( &subsystem, @@ -888,7 +885,7 @@ fn update_blocks_at_finalized_height( now: Duration, ) -> Result<(), Error> { for (candidate_hash, is_finalized) in candidates { - let mut meta = match load_meta(&subsystem.db, &candidate_hash)? { + let mut meta = match load_meta(&subsystem.db, &subsystem.config, &candidate_hash)? { None => { tracing::warn!( target: LOG_TARGET, @@ -909,7 +906,7 @@ fn update_blocks_at_finalized_height( // This is also not going to happen; the very fact that we are // iterating over the candidate here indicates that `State` should // be `Unfinalized`. - delete_pruning_key(db_transaction, at, &candidate_hash); + delete_pruning_key(db_transaction, &subsystem.config, at, &candidate_hash); } State::Unfinalized(_, blocks) => { for (block_num, block_hash) in blocks.iter().cloned() { @@ -917,6 +914,7 @@ fn update_blocks_at_finalized_height( if block_num.0 != block_number { delete_unfinalized_inclusion( db_transaction, + &subsystem.config, block_num.0, &block_hash, &candidate_hash, @@ -929,9 +927,10 @@ fn update_blocks_at_finalized_height( meta.state = State::Finalized(now.into()); // Write the meta and a pruning record. - write_meta(db_transaction, &candidate_hash, &meta); + write_meta(db_transaction, &subsystem.config, &candidate_hash, &meta); write_pruning_key( db_transaction, + &subsystem.config, now + subsystem.pruning_config.keep_finalized_for, &candidate_hash, ); @@ -948,7 +947,12 @@ fn update_blocks_at_finalized_height( if blocks.is_empty() { let at_d: Duration = at.into(); let prune_at = at_d + subsystem.pruning_config.keep_unavailable_for; - write_pruning_key(db_transaction, prune_at, &candidate_hash); + write_pruning_key( + db_transaction, + &subsystem.config, + prune_at, + &candidate_hash, + ); State::Unavailable(at) } else { State::Unfinalized(at, blocks) @@ -957,7 +961,7 @@ fn update_blocks_at_finalized_height( }; // Update the meta entry. - write_meta(db_transaction, &candidate_hash, &meta) + write_meta(db_transaction, &subsystem.config, &candidate_hash, &meta) } } @@ -970,18 +974,18 @@ fn process_message( ) -> Result<(), Error> { match msg { AvailabilityStoreMessage::QueryAvailableData(candidate, tx) => { - let _ = tx.send(load_available_data(&subsystem.db, &candidate)?); + let _ = tx.send(load_available_data(&subsystem.db, &subsystem.config, &candidate)?); } AvailabilityStoreMessage::QueryDataAvailability(candidate, tx) => { - let a = load_meta(&subsystem.db, &candidate)?.map_or(false, |m| m.data_available); + let a = load_meta(&subsystem.db, &subsystem.config, &candidate)?.map_or(false, |m| m.data_available); let _ = tx.send(a); } AvailabilityStoreMessage::QueryChunk(candidate, validator_index, tx) => { let _timer = subsystem.metrics.time_get_chunk(); - let _ = tx.send(load_chunk(&subsystem.db, &candidate, validator_index)?); + let _ = tx.send(load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?); } AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => { - match load_meta(&subsystem.db, &candidate)? { + match load_meta(&subsystem.db, &subsystem.config, &candidate)? { None => { let _ = tx.send(Vec::new()); } @@ -990,7 +994,12 @@ fn process_message( for (index, _) in meta.chunks_stored.iter().enumerate().filter(|(_, b)| **b) { let _timer = subsystem.metrics.time_get_chunk(); - match load_chunk(&subsystem.db, &candidate, ValidatorIndex(index as _))? { + match load_chunk( + &subsystem.db, + &subsystem.config, + &candidate, + ValidatorIndex(index as _), + )? { Some(c) => chunks.push(c), None => { tracing::warn!( @@ -1008,7 +1017,7 @@ fn process_message( } } AvailabilityStoreMessage::QueryChunkAvailability(candidate, validator_index, tx) => { - let a = load_meta(&subsystem.db, &candidate)? + let a = load_meta(&subsystem.db, &subsystem.config, &candidate)? .map_or(false, |m| *m.chunks_stored.get(validator_index.0 as usize).as_deref().unwrap_or(&false) ); @@ -1022,7 +1031,7 @@ fn process_message( subsystem.metrics.on_chunks_received(1); let _timer = subsystem.metrics.time_store_chunk(); - match store_chunk(&subsystem.db, candidate_hash, chunk) { + match store_chunk(&subsystem.db, &subsystem.config, candidate_hash, chunk) { Ok(true) => { let _ = tx.send(Ok(())); } @@ -1065,12 +1074,13 @@ fn process_message( // Ok(true) on success, Ok(false) on failure, and Err on internal error. fn store_chunk( db: &Arc, + config: &Config, candidate_hash: CandidateHash, chunk: ErasureChunk, ) -> Result { let mut tx = DBTransaction::new(); - let mut meta = match load_meta(db, &candidate_hash)? { + let mut meta = match load_meta(db, config, &candidate_hash)? { Some(m) => m, None => return Ok(false), // we weren't informed of this candidate by import events. }; @@ -1080,8 +1090,8 @@ fn store_chunk( Some(false) => { meta.chunks_stored.set(chunk.index.0 as usize, true); - write_chunk(&mut tx, &candidate_hash, chunk.index, &chunk); - write_meta(&mut tx, &candidate_hash, &meta); + write_chunk(&mut tx, config, &candidate_hash, chunk.index, &chunk); + write_meta(&mut tx, config, &candidate_hash, &meta); } None => return Ok(false), // out of bounds. } @@ -1106,7 +1116,7 @@ fn store_available_data( ) -> Result<(), Error> { let mut tx = DBTransaction::new(); - let mut meta = match load_meta(&subsystem.db, &candidate_hash)? { + let mut meta = match load_meta(&subsystem.db, &subsystem.config, &candidate_hash)? { Some(m) => { if m.data_available { return Ok(()); // already stored. @@ -1119,7 +1129,7 @@ fn store_available_data( // Write a pruning record. let prune_at = now + subsystem.pruning_config.keep_unavailable_for; - write_pruning_key(&mut tx, prune_at, &candidate_hash); + write_pruning_key(&mut tx, &subsystem.config, prune_at, &candidate_hash); CandidateMeta { state: State::Unavailable(now.into()), @@ -1142,14 +1152,14 @@ fn store_available_data( }); for chunk in erasure_chunks { - write_chunk(&mut tx, &candidate_hash, chunk.index, &chunk); + write_chunk(&mut tx, &subsystem.config, &candidate_hash, chunk.index, &chunk); } meta.data_available = true; meta.chunks_stored = bitvec::bitvec![BitOrderLsb0, u8; 1; n_validators]; - write_meta(&mut tx, &candidate_hash, &meta); - write_available_data(&mut tx, &candidate_hash, &available_data); + write_meta(&mut tx, &subsystem.config, &candidate_hash, &meta); + write_available_data(&mut tx, &subsystem.config, &candidate_hash, &available_data); subsystem.db.write(tx)?; @@ -1162,35 +1172,35 @@ fn store_available_data( Ok(()) } -fn prune_all(db: &Arc, clock: &dyn Clock) -> Result<(), Error> { +fn prune_all(db: &Arc, config: &Config, clock: &dyn Clock) -> Result<(), Error> { let now = clock.now()?; let (range_start, range_end) = pruning_range(now); let mut tx = DBTransaction::new(); - let iter = db.iter_with_prefix(columns::META, &range_start[..]) + let iter = db.iter_with_prefix(config.col_meta, &range_start[..]) .take_while(|(k, _)| &k[..] < &range_end[..]); for (k, _v) in iter { - tx.delete(columns::META, &k[..]); + tx.delete(config.col_meta, &k[..]); let (_, candidate_hash) = match decode_pruning_key(&k[..]) { Ok(m) => m, Err(_) => continue, // sanity }; - delete_meta(&mut tx, &candidate_hash); + delete_meta(&mut tx, config, &candidate_hash); // Clean up all attached data of the candidate. - if let Some(meta) = load_meta(db, &candidate_hash)? { + if let Some(meta) = load_meta(db, config, &candidate_hash)? { // delete available data. if meta.data_available { - delete_available_data(&mut tx, &candidate_hash) + delete_available_data(&mut tx, config, &candidate_hash) } // delete chunks. for (i, b) in meta.chunks_stored.iter().enumerate() { if *b { - delete_chunk(&mut tx, &candidate_hash, ValidatorIndex(i as _)); + delete_chunk(&mut tx, config, &candidate_hash, ValidatorIndex(i as _)); } } @@ -1200,6 +1210,7 @@ fn prune_all(db: &Arc, clock: &dyn Clock) -> Result<(), Error> { for (block_number, block_hash) in blocks { delete_unfinalized_inclusion( &mut tx, + config, block_number.0, &block_hash, &candidate_hash, diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index 63fd628885..3ec83f8a33 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -38,6 +38,17 @@ use polkadot_node_subsystem_test_helpers as test_helpers; use sp_keyring::Sr25519Keyring; use parking_lot::Mutex; +mod columns { + pub const DATA: u32 = 0; + pub const META: u32 = 1; + pub const NUM_COLUMNS: u32 = 2; +} + +const TEST_CONFIG: Config = Config { + col_data: columns::DATA, + col_meta: columns::META, +}; + struct TestHarness { virtual_overseer: test_helpers::TestSubsystemContextHandle, } @@ -149,10 +160,12 @@ fn test_harness>( let pool = sp_core::testing::TaskExecutor::new(); let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - let subsystem = AvailabilityStoreSubsystem::new_in_memory( + let subsystem = AvailabilityStoreSubsystem::with_pruning_config_and_clock( store, + TEST_CONFIG, state.pruning_config.clone(), Box::new(state.clock), + Metrics::default(), ); let subsystem = run(subsystem, context); @@ -297,7 +310,7 @@ fn store_chunk_works() { // Ensure an entry already exists. In reality this would come from watching // chain events. with_tx(&store, |tx| { - super::write_meta(tx, &candidate_hash, &CandidateMeta { + super::write_meta(tx, &TEST_CONFIG, &candidate_hash, &CandidateMeta { data_available: false, chunks_stored: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators], state: State::Unavailable(BETimestamp(0)), @@ -379,7 +392,7 @@ fn query_chunk_checks_meta() { // Ensure an entry already exists. In reality this would come from watching // chain events. with_tx(&store, |tx| { - super::write_meta(tx, &candidate_hash, &CandidateMeta { + super::write_meta(tx, &TEST_CONFIG, &candidate_hash, &CandidateMeta { data_available: false, chunks_stored: { let mut v = bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators]; @@ -548,7 +561,7 @@ fn query_all_chunks_works() { { with_tx(&store, |tx| { - super::write_meta(tx, &candidate_hash_2, &CandidateMeta { + super::write_meta(tx, &TEST_CONFIG, &candidate_hash_2, &CandidateMeta { data_available: false, chunks_stored: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators as _], state: State::Unavailable(BETimestamp(0)), diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 0b1e274465..f3c8e11b79 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -244,10 +244,13 @@ impl State { candidates, }); new_hashes.insert(meta.hash.clone()); + + // In case there are duplicates, we should only set this if the entry + // was vacant. + self.blocks_by_number.entry(meta.number).or_default().push(meta.hash); } _ => continue, } - self.blocks_by_number.entry(meta.number).or_default().push(meta.hash); } tracing::debug!( diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml index ea2528ed72..8bc471e7f9 100644 --- a/polkadot/node/network/bridge/Cargo.toml +++ b/polkadot/node/network/bridge/Cargo.toml @@ -12,6 +12,7 @@ polkadot-primitives = { path = "../../../primitives" } parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] } sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-node-network-protocol = { path = "../protocol" } strum = "0.20.0" @@ -23,3 +24,4 @@ polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } polkadot-node-subsystem-util = { path = "../../subsystem-util"} sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +futures-timer = "3" diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 0640673283..e9816f4395 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -25,6 +25,7 @@ use parking_lot::Mutex; use futures::prelude::*; use futures::channel::mpsc; use sc_network::Event as NetworkEvent; +use sp_consensus::SyncOracle; use polkadot_subsystem::{ ActiveLeavesUpdate, ActivatedLeaf, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, @@ -96,6 +97,7 @@ pub struct NetworkBridge { network_service: N, authority_discovery_service: AD, request_multiplexer: RequestMultiplexer, + sync_oracle: Box, } impl NetworkBridge { @@ -103,11 +105,17 @@ impl NetworkBridge { /// /// This assumes that the network service has had the notifications protocol for the network /// bridge already registered. See [`peers_sets_info`](peers_sets_info). - pub fn new(network_service: N, authority_discovery_service: AD, request_multiplexer: RequestMultiplexer) -> Self { + pub fn new( + network_service: N, + authority_discovery_service: AD, + request_multiplexer: RequestMultiplexer, + sync_oracle: Box, + ) -> Self { NetworkBridge { network_service, authority_discovery_service, request_multiplexer, + sync_oracle, } } } @@ -165,17 +173,23 @@ struct Shared(Arc>); #[derive(Default)] struct SharedInner { - local_view: View, + local_view: Option, validation_peers: HashMap, collation_peers: HashMap, } +enum Mode { + Syncing(Box), + Active, +} + async fn handle_subsystem_messages( mut ctx: Context, mut network_service: N, mut authority_discovery_service: AD, validator_discovery_notifications: mpsc::Receiver, shared: Shared, + sync_oracle: Box, ) -> Result<(), UnexpectedAbort> where Context: SubsystemContext, @@ -187,6 +201,8 @@ where let mut finalized_number = 0; let mut validator_discovery = validator_discovery::Service::::new(); + let mut mode = Mode::Syncing(sync_oracle); + let mut validator_discovery_notifications = validator_discovery_notifications.fuse(); loop { @@ -210,13 +226,26 @@ where } live_heads.retain(|h| !deactivated.contains(&h.hash)); - update_our_view( - &mut network_service, - &mut ctx, - &live_heads, - &shared, - finalized_number, - ).await?; + // if we're done syncing, set the mode to `Mode::Active`. + // Otherwise, we don't need to send view updates. + { + let is_done_syncing = match mode { + Mode::Active => true, + Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(), + }; + + if is_done_syncing { + mode = Mode::Active; + + update_our_view( + &mut network_service, + &mut ctx, + &live_heads, + &shared, + finalized_number, + ).await?; + } + } } Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => { tracing::trace!( @@ -413,7 +442,7 @@ async fn handle_network_messages( } } - shared.local_view.clone() + shared.local_view.clone().unwrap_or(View::default()) }; // Failure here means that the other side of the network bridge @@ -637,6 +666,7 @@ where network_service, request_multiplexer, authority_discovery_service, + sync_oracle, } = bridge; let (validation_worker_tx, validation_worker_rx) = mpsc::channel(1024); @@ -657,6 +687,7 @@ where authority_discovery_service, validation_worker_rx, shared, + sync_oracle, ); futures::pin_mut!(subsystem_event_handler); @@ -722,11 +753,22 @@ async fn update_our_view( // We only want to send a view update when the heads changed. // A change in finalized block number only is _not_ sufficient. - if shared.local_view.check_heads_eq(&new_view) { - return Ok(()) - } + // + // If this is the first view update since becoming active, but our view is empty, + // there is no need to send anything. + match shared.local_view { + Some(ref v) if v.check_heads_eq(&new_view) => { + return Ok(()) + } + None if live_heads.is_empty() => { + shared.local_view = Some(new_view); + return Ok(()) + } + _ => { + shared.local_view = Some(new_view.clone()); + } - shared.local_view = new_view.clone(); + } ( shared.validation_peers.keys().cloned().collect::>(), @@ -910,11 +952,12 @@ mod tests { use super::*; use futures::executor; use futures::stream::BoxStream; - use std::pin::Pin; - use std::sync::Arc; + use futures::channel::oneshot; use std::borrow::Cow; use std::collections::HashSet; + use std::pin::Pin; + use std::sync::atomic::{AtomicBool, Ordering}; use async_trait::async_trait; use parking_lot::Mutex; use assert_matches::assert_matches; @@ -1071,12 +1114,76 @@ mod tests { } } + #[derive(Clone)] + struct TestSyncOracle { + flag: Arc, + done_syncing_sender: Arc>>>, + } + + struct TestSyncOracleHandle { + done_syncing_receiver: oneshot::Receiver<()>, + flag: Arc, + } + + impl TestSyncOracleHandle { + fn set_done(&self) { + self.flag.store(false, Ordering::SeqCst); + } + + async fn await_mode_switch(self) { + let _ = self.done_syncing_receiver.await; + } + } + + impl SyncOracle for TestSyncOracle { + fn is_major_syncing(&mut self) -> bool { + let is_major_syncing = self.flag.load(Ordering::SeqCst); + + if !is_major_syncing { + if let Some(sender) = self.done_syncing_sender.lock().take() { + let _ = sender.send(()); + } + } + + is_major_syncing + } + + fn is_offline(&mut self) -> bool { + unimplemented!("not used in network bridge") + } + } + + // val - result of `is_major_syncing`. + fn make_sync_oracle(val: bool) -> (TestSyncOracle, TestSyncOracleHandle) { + let (tx, rx) = oneshot::channel(); + let flag = Arc::new(AtomicBool::new(val)); + + ( + TestSyncOracle { + flag: flag.clone(), + done_syncing_sender: Arc::new(Mutex::new(Some(tx))), + }, + TestSyncOracleHandle { + flag, + done_syncing_receiver: rx, + } + ) + } + + fn done_syncing_oracle() -> Box { + let (oracle, _) = make_sync_oracle(false); + Box::new(oracle) + } + struct TestHarness { network_handle: TestNetworkHandle, virtual_overseer: TestSubsystemContextHandle, } - fn test_harness>(test: impl FnOnce(TestHarness) -> T) { + fn test_harness>( + sync_oracle: Box, + test: impl FnOnce(TestHarness) -> T, + ) { let pool = sp_core::testing::TaskExecutor::new(); let (request_multiplexer, req_configs) = RequestMultiplexer::new(); let (network, network_handle, discovery) = new_test_network(req_configs); @@ -1086,6 +1193,7 @@ mod tests { network_service: network, authority_discovery_service: discovery, request_multiplexer, + sync_oracle, }; let network_bridge = run_network( @@ -1148,7 +1256,8 @@ mod tests { #[test] fn send_our_view_upon_connection() { - test_harness(|test_harness| async move { + let (oracle, handle) = make_sync_oracle(false); + test_harness(Box::new(oracle), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, @@ -1167,6 +1276,8 @@ mod tests { )) ).await; + handle.await_mode_switch().await; + network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await; network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await; @@ -1197,12 +1308,24 @@ mod tests { #[test] fn sends_view_updates_to_peers() { - test_harness(|test_harness| async move { + let (oracle, handle) = make_sync_oracle(false); + test_harness(Box::new(oracle), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; let peer_a = PeerId::random(); let peer_b = PeerId::random(); + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate { + activated: Default::default(), + deactivated: Default::default(), + } + )) + ).await; + + handle.await_mode_switch().await; + network_handle.connect_peer( peer_a.clone(), PeerSet::Validation, @@ -1210,10 +1333,33 @@ mod tests { ).await; network_handle.connect_peer( peer_b.clone(), - PeerSet::Validation, + PeerSet::Collation, ObservedRole::Full, ).await; + let actions = network_handle.next_network_actions(2).await; + let wire_message = WireMessage::::ViewUpdate( + View::default(), + ).encode(); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_a, + PeerSet::Validation, + wire_message.clone(), + ), + ); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_b, + PeerSet::Collation, + wire_message.clone(), + ), + ); + let hash_a = Hash::repeat_byte(1); virtual_overseer.send( @@ -1226,7 +1372,7 @@ mod tests { )) ).await; - let actions = network_handle.next_network_actions(4).await; + let actions = network_handle.next_network_actions(2).await; let wire_message = WireMessage::::ViewUpdate( view![hash_a] ).encode(); @@ -1244,16 +1390,119 @@ mod tests { &actions, &NetworkAction::WriteNotification( peer_b, - PeerSet::Validation, + PeerSet::Collation, wire_message.clone(), ), ); }); } + #[test] + fn do_not_send_view_update_until_synced() { + let (oracle, handle) = make_sync_oracle(true); + test_harness(Box::new(oracle), |test_harness| async move { + let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + network_handle.connect_peer( + peer_a.clone(), + PeerSet::Validation, + ObservedRole::Full, + ).await; + network_handle.connect_peer( + peer_b.clone(), + PeerSet::Collation, + ObservedRole::Full, + ).await; + + { + let actions = network_handle.next_network_actions(2).await; + let wire_message = WireMessage::::ViewUpdate( + View::default(), + ).encode(); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_a, + PeerSet::Validation, + wire_message.clone(), + ), + ); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_b, + PeerSet::Collation, + wire_message.clone(), + ), + ); + } + + let hash_a = Hash::repeat_byte(1); + let hash_b = Hash::repeat_byte(1); + + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: hash_a, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }) + )) + ).await; + + // delay until the previous update has certainly been processed. + futures_timer::Delay::new(std::time::Duration::from_millis(100)).await; + + handle.set_done(); + + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: hash_b, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }) + )) + ).await; + + handle.await_mode_switch().await; + + // There should be a mode switch only for the second view update. + { + let actions = network_handle.next_network_actions(2).await; + let wire_message = WireMessage::::ViewUpdate( + view![hash_a, hash_b] + ).encode(); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_a, + PeerSet::Validation, + wire_message.clone(), + ), + ); + + assert_network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer_b, + PeerSet::Collation, + wire_message.clone(), + ), + ); + } + }); + } + #[test] fn do_not_send_view_update_when_only_finalized_block_changed() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; let peer_a = PeerId::random(); @@ -1319,7 +1568,7 @@ mod tests { #[test] fn peer_view_updates_sent_via_overseer() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, @@ -1361,7 +1610,7 @@ mod tests { #[test] fn peer_messages_sent_via_overseer() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, @@ -1428,7 +1677,7 @@ mod tests { #[test] fn peer_disconnect_from_just_one_peerset() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, @@ -1503,7 +1752,7 @@ mod tests { #[test] fn relays_collation_protocol_messages() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, @@ -1590,7 +1839,7 @@ mod tests { #[test] fn different_views_on_different_peer_sets() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, @@ -1655,7 +1904,7 @@ mod tests { #[test] fn sent_views_include_finalized_number_update() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; let peer_a = PeerId::random(); @@ -1700,7 +1949,7 @@ mod tests { #[test] fn view_finalized_number_can_not_go_down() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, .. } = test_harness; let peer_a = PeerId::random(); @@ -1740,7 +1989,7 @@ mod tests { #[test] fn send_messages_to_peers() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, @@ -1876,7 +2125,7 @@ mod tests { #[test] fn our_view_updates_decreasing_order_and_limited_to_max() { - test_harness(|test_harness| async move { + test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut virtual_overseer, .. diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index dc2cc2ae94..0fd65c23f3 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -63,6 +63,8 @@ hex-literal = "0.3.1" tracing = "0.1.25" serde = { version = "1.0.123", features = ["derive"] } thiserror = "1.0.23" +kvdb = "0.9.0" +kvdb-rocksdb = { version = "0.11.0", optional = true } # Polkadot polkadot-node-core-proposer = { path = "../core/proposer" } @@ -111,7 +113,8 @@ db = ["service/db"] full-node = [ "polkadot-node-core-av-store", "polkadot-node-core-approval-voting", - "sc-finality-grandpa-warp-sync" + "sc-finality-grandpa-warp-sync", + "kvdb-rocksdb" ] runtime-benchmarks = [ @@ -128,6 +131,7 @@ try-runtime = [ ] real-overseer = [ + "full-node", "polkadot-availability-bitfield-distribution", "polkadot-availability-distribution", "polkadot-availability-recovery", diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 8a86a4b18a..00bf8a10f0 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -21,10 +21,10 @@ pub mod chain_spec; mod grandpa_support; mod client; +mod parachains_db; #[cfg(feature = "full-node")] use { - std::convert::TryInto, std::time::Duration, tracing::info, polkadot_node_core_av_store::Config as AvailabilityConfig, @@ -418,7 +418,9 @@ fn real_overseer( leaves: impl IntoIterator, _: Arc, _: Arc, + _parachains_db: (), _: AvailabilityConfig, + _: ApprovalVotingConfig, _: Arc>, _: AuthorityDiscoveryService, _request_multiplexer: (), @@ -426,7 +428,6 @@ fn real_overseer( spawner: Spawner, _: IsCollator, _: IsolationStrategy, - _: ApprovalVotingConfig, ) -> Result<(Overseer, OverseerHandler), Error> where RuntimeClient: 'static + ProvideRuntimeApi + HeaderBackend + AuxStore, @@ -446,7 +447,9 @@ fn real_overseer( leaves: impl IntoIterator, keystore: Arc, runtime_client: Arc, + parachains_db: Arc, availability_config: AvailabilityConfig, + approval_voting_config: ApprovalVotingConfig, network_service: Arc>, authority_discovery: AuthorityDiscoveryService, request_multiplexer: RequestMultiplexer, @@ -454,7 +457,6 @@ fn real_overseer( spawner: Spawner, is_collator: IsCollator, isolation_strategy: IsolationStrategy, - approval_voting_config: ApprovalVotingConfig, ) -> Result<(Overseer, OverseerHandler), Error> where RuntimeClient: 'static + ProvideRuntimeApi + HeaderBackend + AuxStore, @@ -489,10 +491,11 @@ where ), availability_recovery: AvailabilityRecoverySubsystem::with_chunks_only( ), - availability_store: AvailabilityStoreSubsystem::new_on_disk( + availability_store: AvailabilityStoreSubsystem::new( + parachains_db.clone(), availability_config, Metrics::register(registry)?, - )?, + ), bitfield_distribution: BitfieldDistributionSubsystem::new( Metrics::register(registry)?, ), @@ -537,9 +540,10 @@ where ) }, network_bridge: NetworkBridgeSubsystem::new( - network_service, + network_service.clone(), authority_discovery, request_multiplexer, + Box::new(network_service.clone()), ), provisioner: ProvisionerSubsystem::new( spawner.clone(), @@ -559,9 +563,11 @@ where ), approval_voting: ApprovalVotingSubsystem::with_config( approval_voting_config, + parachains_db, keystore.clone(), + Box::new(network_service.clone()), Metrics::register(registry)?, - )?, + ), gossip_support: GossipSupportSubsystem::new( keystore.clone(), runtime_client.clone(), @@ -843,17 +849,26 @@ pub fn new_full( ); } - let availability_config = config.database.clone().try_into().map_err(Error::Availability)?; - let chain_spec = config.chain_spec.cloned_box(); + #[cfg(feature = "real-overseer")] + let parachains_db = crate::parachains_db::open_creating( + config.database.path().ok_or(Error::DatabasePathRequired)?.into(), + crate::parachains_db::CacheSizes::default(), + )?; - let approval_voting_config = ApprovalVotingConfig { - path: config.database.path() - .ok_or(Error::DatabasePathRequired)? - .join("parachains").join("approval-voting"), - slot_duration_millis: slot_duration.as_millis() as u64, - cache_size: None, // default is fine. + #[cfg(not(feature = "real-overseer"))] + let parachains_db = (); + + let availability_config = AvailabilityConfig { + col_data: crate::parachains_db::REAL_COLUMNS.col_availability_data, + col_meta: crate::parachains_db::REAL_COLUMNS.col_availability_meta, }; + let approval_voting_config = ApprovalVotingConfig { + col_data: crate::parachains_db::REAL_COLUMNS.col_approval_data, + slot_duration_millis: slot_duration.as_millis() as u64, + }; + + let chain_spec = config.chain_spec.cloned_box(); let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams { config, backend: backend.clone(), @@ -922,7 +937,9 @@ pub fn new_full( active_leaves, keystore, overseer_client.clone(), + parachains_db, availability_config, + approval_voting_config, network.clone(), authority_discovery_service, request_multiplexer, @@ -930,7 +947,6 @@ pub fn new_full( spawner, is_collator, isolation_strategy, - approval_voting_config, )?; let overseer_handler_clone = overseer_handler.clone(); diff --git a/polkadot/node/service/src/parachains_db.rs b/polkadot/node/service/src/parachains_db.rs new file mode 100644 index 0000000000..25ca0e4ce0 --- /dev/null +++ b/polkadot/node/service/src/parachains_db.rs @@ -0,0 +1,103 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! A RocksDB instance for storing parachain data; availability data, and approvals. + +#[cfg(feature = "real-overseer")] +use { + std::io, + std::path::PathBuf, + std::sync::Arc, + + kvdb::KeyValueDB, +}; + + +mod columns { + #[cfg(feature = "real-overseer")] + pub const NUM_COLUMNS: u32 = 3; + + + pub const COL_AVAILABILITY_DATA: u32 = 0; + pub const COL_AVAILABILITY_META: u32 = 1; + pub const COL_APPROVAL_DATA: u32 = 2; +} + +/// Columns used by different subsystems. +#[derive(Debug, Clone)] +pub struct ColumnsConfig { + /// The column used by the av-store for data. + pub col_availability_data: u32, + /// The column used by the av-store for meta information. + pub col_availability_meta: u32, + /// The column used by approval voting for data. + pub col_approval_data: u32, +} + +/// The real columns used by the parachains DB. +pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig { + col_availability_data: columns::COL_AVAILABILITY_DATA, + col_availability_meta: columns::COL_AVAILABILITY_META, + col_approval_data: columns::COL_APPROVAL_DATA, +}; + +/// The cache size for each column, in bytes. +#[derive(Debug, Clone)] +pub struct CacheSizes { + /// Cache used by availability data. + pub availability_data: usize, + /// Cache used by availability meta. + pub availability_meta: usize, + /// Cache used by approval data. + pub approval_data: usize, +} + +impl Default for CacheSizes { + fn default() -> Self { + CacheSizes { + availability_data: 25 * 1024 * 1024, + availability_meta: 512 * 1024, + approval_data: 5 * 1024 * 1024, + } + } +} + +/// Open the database on disk, creating it if it doesn't exist. +#[cfg(feature = "real-overseer")] +pub fn open_creating( + root: PathBuf, + cache_sizes: CacheSizes, +) -> io::Result> { + use kvdb_rocksdb::{DatabaseConfig, Database}; + + let path = root.join("parachains").join("db"); + + let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS); + + let _ = db_config.memory_budget + .insert(columns::COL_AVAILABILITY_DATA, cache_sizes.availability_data); + let _ = db_config.memory_budget + .insert(columns::COL_AVAILABILITY_META, cache_sizes.availability_meta); + let _ = db_config.memory_budget + .insert(columns::COL_APPROVAL_DATA, cache_sizes.approval_data); + + let path = path.to_str().ok_or_else(|| io::Error::new( + io::ErrorKind::Other, + format!("Bad database path: {:?}", path), + ))?; + + std::fs::create_dir_all(&path)?; + let db = Database::open(&db_config, &path)?; + + Ok(Arc::new(db)) +} diff --git a/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md b/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md index 9bd01e036a..02f0073a6b 100644 --- a/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md +++ b/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md @@ -60,6 +60,7 @@ struct ApprovalEntry { tranches: Vec, // sorted ascending by tranche number. backing_group: GroupIndex, our_assignment: Option, + our_approval_sig: Option, assignments: Bitfield, // n_validators bits approved: bool, } @@ -205,7 +206,7 @@ On receiving a `ApprovalVotingMessage::CheckAndImportAssignment` message, we che * Ensure the validator index is not present in the approval entry already. * Create a tranche entry for the delay tranche in the approval entry and note the assignment within it. * Note the candidate index within the approval entry. - * [Check for full approval of the candidate entry](#check-full-approval) of the candidate_entry, filtering by this specific approval entry. + * [Schedule a wakeup](#schedule-wakeup) for this block, candidate pair. * return the appropriate `AssignmentCheckResult` on the response channel. #### `ApprovalVotingMessage::CheckAndImportApproval` @@ -232,20 +233,15 @@ On receiving an `ApprovedAncestor(Hash, BlockNumber, response_channel)`: ### Updates and Auxiliary Logic #### Import Checked Approval - * Import an approval vote which we can assume to have passed signature checks. + * Import an approval vote which we can assume to have passed signature checks and correspond to an imported assignment. * Requires `(BlockEntry, CandidateEntry, ValidatorIndex)` * Set the corresponding bit of the `approvals` bitfield in the `CandidateEntry` to `1`. If already `1`, return. - * [Check full approval of the candidate](#check-full-approval) - -#### Check Full Approval - * Checks the approval state of the candidate under every block it is included by, and updates the block entries accordingly. - * Requires `(CandidateEntry, filter)`, where filter is used to limit which approval entries are inspected. - * Checks every `ApprovalEntry` that is not yet `approved` for whether it is now approved. - * For each `ApprovalEntry` in the `CandidateEntry` that is not `approved` and passes the `filter` - * Load the block entry for the `ApprovalEntry`. - * If so, [determine the tranches to inspect](#determine-required-tranches) of the candidate, - * If [the candidate is approved under the block](#check-approval), set the corresponding bit in the `block_entry.approved_bitfield`. + * Checks the approval state of a candidate under a specific block, and updates the block and candidate entries accordingly. + * Checks the `ApprovalEntry` for the block. + * [determine the tranches to inspect](#determine-required-tranches) of the candidate, + * [the candidate is approved under the block](#check-approval), set the corresponding bit in the `block_entry.approved_bitfield`. * Otherwise, [schedule a wakeup of the candidate](#schedule-wakeup) + * If the approval vote originates locally, set the `our_approval_sig` in the candidate entry. #### Handling Wakeup * Handle a previously-scheduled wakeup of a candidate under a specific block. diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md b/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md index 3f65a84735..691e563470 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md @@ -59,6 +59,8 @@ Each network event is associated with a particular peer-set. The `activated` and `deactivated` lists determine the evolution of our local view over time. A `ProtocolMessage::ViewUpdate` is issued to each connected peer on each peer-set, and a `NetworkBridgeEvent::OurViewChange` is issued to each event handler for each protocol. +We only send view updates if the node has indicated that it has finished major blockchain synchronization. + If we are connected to the same peer on both peer-sets, we will send the peer two view updates as a result. ### Overseer Signal: BlockFinalized @@ -67,7 +69,7 @@ We update our view's `finalized_number` to the provided one and delay `ProtocolM ### Network Event: Peer Connected -Issue a `NetworkBridgeEvent::PeerConnected` for each [Event Handler](#event-handlers) of the peer-set and negotiated protocol version of the peer. Also issue a `NetworkBridgeEvent::PeerViewChange` and send the peer our current view. +Issue a `NetworkBridgeEvent::PeerConnected` for each [Event Handler](#event-handlers) of the peer-set and negotiated protocol version of the peer. Also issue a `NetworkBridgeEvent::PeerViewChange` and send the peer our current view, but only if the node has indicated that it has finished major blockchain synchronization. Otherwise, we only send the peer an empty view. ### Network Event: Peer Disconnected