diff --git a/polkadot/node/core/dispute-coordinator/src/backend.rs b/polkadot/node/core/dispute-coordinator/src/backend.rs
new file mode 100644
index 0000000000..35c36f986c
--- /dev/null
+++ b/polkadot/node/core/dispute-coordinator/src/backend.rs
@@ -0,0 +1,183 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! An abstraction over storage used by the chain selection subsystem.
+//!
+//! This provides both a [`Backend`] trait and an [`OverlayedBackend`]
+//! struct which allows in-memory changes to be applied on top of a
+//! [`Backend`], maintaining consistency between queries and temporary writes,
+//! before any commit to the underlying storage is made.
+
+use polkadot_primitives::v1::{CandidateHash, SessionIndex};
+use polkadot_node_subsystem::SubsystemResult;
+
+use std::collections::HashMap;
+
+use super::db::v1::{RecentDisputes, CandidateVotes};
+
+#[derive(Debug)]
+pub enum BackendWriteOp {
+ WriteEarliestSession(SessionIndex),
+ WriteRecentDisputes(RecentDisputes),
+ WriteCandidateVotes(SessionIndex, CandidateHash, CandidateVotes),
+ DeleteCandidateVotes(SessionIndex, CandidateHash),
+}
+
+/// An abstraction over backend storage for the logic of this subsystem.
+pub trait Backend {
+ /// Load the earliest session, if any.
+ fn load_earliest_session(&self) -> SubsystemResult>;
+
+ /// Load the recent disputes, if any.
+ fn load_recent_disputes(&self) -> SubsystemResult >;
+
+ /// Load the candidate votes for the specific session-candidate pair, if any.
+ fn load_candidate_votes(
+ &self,
+ session: SessionIndex,
+ candidate_hash: &CandidateHash,
+ ) -> SubsystemResult >;
+
+ /// Atomically writes the list of operations, with later operations taking precedence over
+ /// prior.
+ fn write(&mut self, ops: I) -> SubsystemResult<()>
+ where I: IntoIterator- ;
+}
+
+/// An in-memory overllay for the backend.
+///
+/// This maintains read-only access to the underlying backend, but can be converted into a set of
+/// write operations which will, when written to the underlying backend, give the same view as the
+/// state of the overlay.
+pub struct OverlayedBackend<'a, B: 'a> {
+ inner: &'a B,
+
+ // `None` means unchanged.
+ earliest_session: Option
,
+ // `None` means unchanged.
+ recent_disputes: Option,
+ // `None` means deleted, missing means query inner.
+ candidate_votes: HashMap<(SessionIndex, CandidateHash), Option>,
+}
+
+impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> {
+ pub fn new(backend: &'a B) -> Self {
+ Self {
+ inner: backend,
+ earliest_session: None,
+ recent_disputes: None,
+ candidate_votes: HashMap::new(),
+ }
+ }
+
+ /// Returns true if the are no write operations to perform.
+ pub fn is_empty(&self) -> bool {
+ self.earliest_session.is_none() &&
+ self.recent_disputes.is_none() &&
+ self.candidate_votes.is_empty()
+ }
+
+ /// Load the earliest session, if any.
+ pub fn load_earliest_session(&self) -> SubsystemResult> {
+ if let Some(val) = self.earliest_session {
+ return Ok(Some(val))
+ }
+
+ self.inner.load_earliest_session()
+ }
+
+ /// Load the recent disputes, if any.
+ pub fn load_recent_disputes(&self) -> SubsystemResult > {
+ if let Some(val) = &self.recent_disputes {
+ return Ok(Some(val.clone()))
+ }
+
+ self.inner.load_recent_disputes()
+ }
+
+ /// Load the candidate votes for the specific session-candidate pair, if any.
+ pub fn load_candidate_votes(
+ &self,
+ session: SessionIndex,
+ candidate_hash: &CandidateHash
+ ) -> SubsystemResult > {
+ if let Some(val) = self.candidate_votes.get(&(session, *candidate_hash)) {
+ return Ok(val.clone())
+ }
+
+ self.inner.load_candidate_votes(session, candidate_hash)
+ }
+
+ /// Prepare a write to the 'earliest session' field of the DB.
+ ///
+ /// Later calls to this function will override earlier ones.
+ pub fn write_earliest_session(&mut self, session: SessionIndex) {
+ self.earliest_session = Some(session);
+ }
+
+ /// Prepare a write to the recent disputes stored in the DB.
+ ///
+ /// Later calls to this function will override earlier ones.
+ pub fn write_recent_disputes(&mut self, recent_disputes: RecentDisputes) {
+ self.recent_disputes = Some(recent_disputes)
+ }
+
+ /// Prepare a write of the candidate votes under the indicated candidate.
+ ///
+ /// Later calls to this function for the same candidate will override earlier ones.
+ pub fn write_candidate_votes(
+ &mut self,
+ session: SessionIndex,
+ candidate_hash: CandidateHash,
+ votes: CandidateVotes
+ ) {
+ self.candidate_votes.insert((session, candidate_hash), Some(votes));
+ }
+
+ /// Prepare a deletion of the candidate votes under the indicated candidate.
+ ///
+ /// Later calls to this function for the same candidate will override earlier ones.
+ pub fn delete_candidate_votes(
+ &mut self,
+ session: SessionIndex,
+ candidate_hash: CandidateHash,
+ ) {
+ self.candidate_votes.insert((session, candidate_hash), None);
+ }
+
+ /// Transform this backend into a set of write-ops to be written to the inner backend.
+ pub fn into_write_ops(self) -> impl Iterator- {
+ let earliest_session_ops = self.earliest_session
+ .map(|s| BackendWriteOp::WriteEarliestSession(s))
+ .into_iter();
+
+ let recent_dispute_ops = self.recent_disputes
+ .map(|d| BackendWriteOp::WriteRecentDisputes(d))
+ .into_iter();
+
+ let candidate_vote_ops = self.candidate_votes
+ .into_iter()
+ .map(|((session, candidate), votes)| match votes {
+ Some(votes) => BackendWriteOp::WriteCandidateVotes(session, candidate, votes),
+ None => BackendWriteOp::DeleteCandidateVotes(session, candidate),
+ });
+
+ earliest_session_ops
+ .chain(recent_dispute_ops)
+ .chain(candidate_vote_ops)
+
+ }
+}
diff --git a/polkadot/node/core/dispute-coordinator/src/db/v1.rs b/polkadot/node/core/dispute-coordinator/src/db/v1.rs
index 7ec30d51c1..d3f859e7d6 100644
--- a/polkadot/node/core/dispute-coordinator/src/db/v1.rs
+++ b/polkadot/node/core/dispute-coordinator/src/db/v1.rs
@@ -20,16 +20,96 @@ use polkadot_primitives::v1::{
CandidateReceipt, ValidDisputeStatementKind, InvalidDisputeStatementKind, ValidatorIndex,
ValidatorSignature, SessionIndex, CandidateHash, Hash,
};
+use polkadot_node_subsystem::{SubsystemResult, SubsystemError};
+
+use std::sync::Arc;
use kvdb::{KeyValueDB, DBTransaction};
use parity_scale_codec::{Encode, Decode};
use crate::{DISPUTE_WINDOW, DisputeStatus};
+use crate::backend::{Backend, BackendWriteOp, OverlayedBackend};
const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes";
const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session";
const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes";
+pub struct DbBackend {
+ inner: Arc
,
+ config: ColumnConfiguration,
+}
+
+impl DbBackend {
+ pub fn new(db: Arc, config: ColumnConfiguration) -> Self {
+ Self {
+ inner: db,
+ config,
+ }
+ }
+}
+
+impl Backend for DbBackend {
+ /// Load the earliest session, if any.
+ fn load_earliest_session(&self) -> SubsystemResult> {
+ load_earliest_session(&*self.inner, &self.config)
+ }
+
+ /// Load the recent disputes, if any.
+ fn load_recent_disputes(&self) -> SubsystemResult > {
+ load_recent_disputes(&*self.inner, &self.config)
+ }
+
+ /// Load the candidate votes for the specific session-candidate pair, if any.
+ fn load_candidate_votes(
+ &self,
+ session: SessionIndex,
+ candidate_hash: &CandidateHash,
+ ) -> SubsystemResult > {
+ load_candidate_votes(&*self.inner, &self.config, session, candidate_hash)
+ }
+
+ /// Atomically writes the list of operations, with later operations taking precedence over
+ /// prior.
+ fn write(&mut self, ops: I) -> SubsystemResult<()>
+ where I: IntoIterator-
+ {
+ let mut tx = DBTransaction::new();
+ for op in ops {
+ match op {
+ BackendWriteOp::WriteEarliestSession(session) => {
+ tx.put_vec(
+ self.config.col_data,
+ EARLIEST_SESSION_KEY,
+ session.encode(),
+ );
+ }
+ BackendWriteOp::WriteRecentDisputes(recent_disputes) => {
+ tx.put_vec(
+ self.config.col_data,
+ RECENT_DISPUTES_KEY,
+ recent_disputes.encode(),
+ );
+ }
+ BackendWriteOp::WriteCandidateVotes(session, candidate_hash, votes) => {
+ tx.put_vec(
+ self.config.col_data,
+ &candidate_votes_key(session, &candidate_hash),
+ votes.encode(),
+ );
+ }
+ BackendWriteOp::DeleteCandidateVotes(session, candidate_hash) => {
+ tx.delete(
+ self.config.col_data,
+ &candidate_votes_key(session, &candidate_hash),
+ );
+ }
+ }
+ }
+
+ self.inner.write(tx).map_err(Into::into)
+ }
+}
+
fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) -> [u8; 15 + 4 + 32] {
let mut buf = [0u8; 15 + 4 + 32];
buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY);
@@ -41,29 +121,6 @@ fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) ->
buf
}
-// Computes the upper lexicographic bound on DB keys for candidate votes with a given
-// upper-exclusive bound on sessions.
-fn candidate_votes_range_upper_bound(upper_exclusive: SessionIndex) -> [u8; 15 + 4] {
- let mut buf = [0; 15 + 4];
- buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY);
- // big-endian encoding is used to ensure lexicographic ordering.
- buf[15..][..4].copy_from_slice(&upper_exclusive.to_be_bytes());
-
- buf
-}
-
-fn decode_candidate_votes_key(key: &[u8]) -> Option<(SessionIndex, CandidateHash)> {
- if key.len() != 15 + 4 + 32 {
- return None;
- }
-
- let mut session_buf = [0; 4];
- session_buf.copy_from_slice(&key[15..][..4]);
- let session = SessionIndex::from_be_bytes(session_buf);
-
- CandidateHash::decode(&mut &key[(15 + 4)..]).ok().map(|hash| (session, hash))
-}
-
/// Column configuration information for the DB.
#[derive(Debug, Clone)]
pub struct ColumnConfiguration {
@@ -128,102 +185,33 @@ fn load_decode
(db: &dyn KeyValueDB, col_data: u32, key: &[u8])
}
}
-/// Load the candidate votes for the identified candidate under the given hash.
+/// Load the candidate votes for the specific session-candidate pair, if any.
pub(crate) fn load_candidate_votes(
db: &dyn KeyValueDB,
config: &ColumnConfiguration,
session: SessionIndex,
candidate_hash: &CandidateHash,
-) -> Result> {
+) -> SubsystemResult > {
load_decode(db, config.col_data, &candidate_votes_key(session, candidate_hash))
+ .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
}
/// Load the earliest session, if any.
pub(crate) fn load_earliest_session(
db: &dyn KeyValueDB,
config: &ColumnConfiguration,
-) -> Result > {
+) -> SubsystemResult > {
load_decode(db, config.col_data, EARLIEST_SESSION_KEY)
+ .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
}
/// Load the recent disputes, if any.
pub(crate) fn load_recent_disputes(
db: &dyn KeyValueDB,
config: &ColumnConfiguration,
-) -> Result > {
+) -> SubsystemResult > {
load_decode(db, config.col_data, RECENT_DISPUTES_KEY)
-}
-
-/// An atomic transaction to be commited to the underlying DB.
-#[derive(Debug, Default, Clone)]
-pub(crate) struct Transaction {
- earliest_session: Option,
- recent_disputes: Option,
- write_candidate_votes: Vec<(SessionIndex, CandidateHash, CandidateVotes)>,
- delete_candidate_votes: Vec<(SessionIndex, CandidateHash)>,
-}
-
-impl Transaction {
- /// Prepare a write to the 'earliest session' field of the DB.
- ///
- /// Later calls to this function will override earlier ones.
- pub(crate) fn put_earliest_session(&mut self, session: SessionIndex) {
- self.earliest_session = Some(session);
- }
-
- /// Prepare a write to the recent disputes stored in the DB.
- ///
- /// Later calls to this function will override earlier ones.
- pub(crate) fn put_recent_disputes(&mut self, recent_disputes: RecentDisputes) {
- self.recent_disputes = Some(recent_disputes);
- }
-
- /// Prepare a write of the candidate votes under the indicated candidate.
- ///
- /// Later calls to this function for the same candidate will override earlier ones.
- /// Any calls to this function will be overridden by deletions of the same candidate.
- pub(crate) fn put_candidate_votes(
- &mut self,
- session: SessionIndex,
- candidate_hash: CandidateHash,
- votes: CandidateVotes,
- ) {
- self.write_candidate_votes.push((session, candidate_hash, votes))
- }
-
- /// Prepare a deletion of the candidate votes under the indicated candidate.
- ///
- /// Any calls to this function will override writes to the same candidate.
- pub(crate) fn delete_candidate_votes(
- &mut self,
- session: SessionIndex,
- candidate_hash: CandidateHash,
- ) {
- self.delete_candidate_votes.push((session, candidate_hash))
- }
-
- /// Write the transaction atomically to the DB.
- pub(crate) fn write(self, db: &dyn KeyValueDB, config: &ColumnConfiguration) -> Result<()> {
- let mut tx = DBTransaction::new();
-
- if let Some(s) = self.earliest_session {
- tx.put_vec(config.col_data, EARLIEST_SESSION_KEY, s.encode());
- }
-
- if let Some(a) = self.recent_disputes {
- tx.put_vec(config.col_data, RECENT_DISPUTES_KEY, a.encode());
- }
-
- for (session, candidate_hash, votes) in self.write_candidate_votes {
- tx.put_vec(config.col_data, &candidate_votes_key(session, &candidate_hash), votes.encode());
- }
-
- for (session, candidate_hash) in self.delete_candidate_votes {
- tx.delete(config.col_data, &candidate_votes_key(session, &candidate_hash));
- }
-
- db.write(tx).map_err(Into::into)
- }
+ .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
}
/// Maybe prune data in the DB based on the provided session index.
@@ -235,57 +223,46 @@ impl Transaction {
/// If one or more ancient sessions are pruned, all metadata on candidates within the ancient
/// session will be deleted.
pub(crate) fn note_current_session(
- store: &dyn KeyValueDB,
- config: &ColumnConfiguration,
+ overlay_db: &mut OverlayedBackend<'_, impl Backend>,
current_session: SessionIndex,
-) -> Result<()> {
+) -> SubsystemResult<()> {
let new_earliest = current_session.saturating_sub(DISPUTE_WINDOW);
- let mut tx = Transaction::default();
-
- match load_earliest_session(store, config)? {
+ match overlay_db.load_earliest_session()? {
None => {
// First launch - write new-earliest.
- tx.put_earliest_session(new_earliest);
+ overlay_db.write_earliest_session(new_earliest);
}
Some(prev_earliest) if new_earliest > prev_earliest => {
// Prune all data in the outdated sessions.
- tx.put_earliest_session(new_earliest);
+ overlay_db.write_earliest_session(new_earliest);
// Clear recent disputes metadata.
{
- let mut recent_disputes = load_recent_disputes(store, config)?.unwrap_or_default();
+ let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let lower_bound = (
new_earliest,
CandidateHash(Hash::repeat_byte(0x00)),
);
- let prev_len = recent_disputes.len();
- recent_disputes = recent_disputes.split_off(&lower_bound);
+ let new_recent_disputes = recent_disputes.split_off(&lower_bound);
+ // Any remanining disputes are considered ancient and must be pruned.
+ let pruned_disputes = recent_disputes;
- if recent_disputes.len() != prev_len {
- tx.put_recent_disputes(recent_disputes);
+ if pruned_disputes.len() != 0 {
+ overlay_db.write_recent_disputes(new_recent_disputes);
+ for ((session, candidate_hash), _) in pruned_disputes {
+ overlay_db.delete_candidate_votes(session, candidate_hash);
+ }
}
}
-
- // Clear all candidate data with session less than the new earliest kept.
- {
- let end_prefix = candidate_votes_range_upper_bound(new_earliest);
-
- store.iter_with_prefix(config.col_data, CANDIDATE_VOTES_SUBKEY)
- .take_while(|(k, _)| &k[..] < &end_prefix[..])
- .filter_map(|(k, _)| decode_candidate_votes_key(&k[..]))
- .for_each(|(session, candidate_hash)| {
- tx.delete_candidate_votes(session, candidate_hash);
- });
- }
}
Some(_) => {
// nothing to do.
}
- };
+ }
- tx.write(store, config)
+ Ok(())
}
#[cfg(test)]
@@ -293,151 +270,150 @@ mod tests {
use super::*;
use polkadot_primitives::v1::{Hash, Id as ParaId};
+ fn make_db() -> DbBackend {
+ let store = Arc::new(kvdb_memorydb::create(1));
+ let config = ColumnConfiguration { col_data: 0 };
+ DbBackend::new(store, config)
+ }
+
#[test]
- fn candidate_votes_key_works() {
- let session = 4;
- let candidate = CandidateHash(Hash::repeat_byte(0x01));
+ fn overlay_pre_and_post_commit_consistency() {
+ let mut backend = make_db();
- let key = candidate_votes_key(session, &candidate);
+ let mut overlay_db = OverlayedBackend::new(&backend);
- assert_eq!(&key[0..15], CANDIDATE_VOTES_SUBKEY);
- assert_eq!(&key[15..19], &[0x00, 0x00, 0x00, 0x04]);
- assert_eq!(&key[19..51], candidate.0.as_bytes());
+ overlay_db.write_earliest_session(0);
+ overlay_db.write_earliest_session(1);
+
+ overlay_db.write_recent_disputes(vec![
+ ((0, CandidateHash(Hash::repeat_byte(0))), DisputeStatus::Active),
+ ].into_iter().collect());
+
+ overlay_db.write_recent_disputes(vec![
+ ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
+ ].into_iter().collect());
+
+ overlay_db.write_candidate_votes(
+ 1,
+ CandidateHash(Hash::repeat_byte(1)),
+ CandidateVotes {
+ candidate_receipt: Default::default(),
+ valid: Vec::new(),
+ invalid: Vec::new(),
+ },
+ );
+ overlay_db.write_candidate_votes(
+ 1,
+ CandidateHash(Hash::repeat_byte(1)),
+ CandidateVotes {
+ candidate_receipt: {
+ let mut receipt = CandidateReceipt::default();
+ receipt.descriptor.para_id = 5.into();
+
+ receipt
+ },
+ valid: Vec::new(),
+ invalid: Vec::new(),
+ },
+ );
+
+ // Test that overlay returns the correct values before committing.
+ assert_eq!(
+ overlay_db.load_earliest_session().unwrap().unwrap(),
+ 1,
+ );
assert_eq!(
- decode_candidate_votes_key(&key[..]),
- Some((session, candidate)),
+ overlay_db.load_recent_disputes().unwrap().unwrap(),
+ vec![
+ ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
+ ].into_iter().collect()
+ );
+
+ assert_eq!(
+ overlay_db.load_candidate_votes(
+ 1,
+ &CandidateHash(Hash::repeat_byte(1))
+ ).unwrap().unwrap().candidate_receipt.descriptor.para_id,
+ ParaId::from(5),
+ );
+
+ let write_ops = overlay_db.into_write_ops();
+ backend.write(write_ops).unwrap();
+
+ // Test that subsequent writes were written.
+ assert_eq!(
+ backend.load_earliest_session().unwrap().unwrap(),
+ 1,
+ );
+
+ assert_eq!(
+ backend.load_recent_disputes().unwrap().unwrap(),
+ vec![
+ ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
+ ].into_iter().collect()
+ );
+
+ assert_eq!(
+ backend.load_candidate_votes(
+ 1,
+ &CandidateHash(Hash::repeat_byte(1))
+ ).unwrap().unwrap().candidate_receipt.descriptor.para_id,
+ ParaId::from(5),
);
}
#[test]
- fn db_transaction() {
- let store = kvdb_memorydb::create(1);
- let config = ColumnConfiguration { col_data: 0 };
+ fn overlay_preserves_candidate_votes_operation_order() {
+ let mut backend = make_db();
- {
- let mut tx = Transaction::default();
+ let mut overlay_db = OverlayedBackend::new(&backend);
+ overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));
- tx.put_earliest_session(0);
- tx.put_earliest_session(1);
+ overlay_db.write_candidate_votes(
+ 1,
+ CandidateHash(Hash::repeat_byte(1)),
+ CandidateVotes {
+ candidate_receipt: Default::default(),
+ valid: Vec::new(),
+ invalid: Vec::new(),
+ }
+ );
- tx.put_recent_disputes(vec![
- ((0, CandidateHash(Hash::repeat_byte(0))), DisputeStatus::Active),
- ].into_iter().collect());
-
- tx.put_recent_disputes(vec![
- ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
- ].into_iter().collect());
-
- tx.put_candidate_votes(
- 1,
- CandidateHash(Hash::repeat_byte(1)),
- CandidateVotes {
- candidate_receipt: Default::default(),
- valid: Vec::new(),
- invalid: Vec::new(),
- },
- );
- tx.put_candidate_votes(
- 1,
- CandidateHash(Hash::repeat_byte(1)),
- CandidateVotes {
- candidate_receipt: {
- let mut receipt = CandidateReceipt::default();
- receipt.descriptor.para_id = 5.into();
-
- receipt
- },
- valid: Vec::new(),
- invalid: Vec::new(),
- },
- );
-
- tx.write(&store, &config).unwrap();
- }
-
- // Test that subsequent writes were written.
- {
- assert_eq!(
- load_earliest_session(&store, &config).unwrap().unwrap(),
- 1,
- );
-
- assert_eq!(
- load_recent_disputes(&store, &config).unwrap().unwrap(),
- vec![
- ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
- ].into_iter().collect()
- );
-
- assert_eq!(
- load_candidate_votes(
- &store,
- &config,
- 1,
- &CandidateHash(Hash::repeat_byte(1))
- ).unwrap().unwrap().candidate_receipt.descriptor.para_id,
- ParaId::from(5),
- );
- }
- }
-
- #[test]
- fn db_deletes_supersede_writes() {
- let store = kvdb_memorydb::create(1);
- let config = ColumnConfiguration { col_data: 0 };
-
- {
- let mut tx = Transaction::default();
- tx.put_candidate_votes(
- 1,
- CandidateHash(Hash::repeat_byte(1)),
- CandidateVotes {
- candidate_receipt: Default::default(),
- valid: Vec::new(),
- invalid: Vec::new(),
- }
- );
-
- tx.write(&store, &config).unwrap();
- }
+ let write_ops = overlay_db.into_write_ops();
+ backend.write(write_ops).unwrap();
assert_eq!(
- load_candidate_votes(
- &store,
- &config,
+ backend.load_candidate_votes(
1,
&CandidateHash(Hash::repeat_byte(1))
).unwrap().unwrap().candidate_receipt.descriptor.para_id,
ParaId::from(0),
);
- {
- let mut tx = Transaction::default();
- tx.put_candidate_votes(
- 1,
- CandidateHash(Hash::repeat_byte(1)),
- CandidateVotes {
- candidate_receipt: {
- let mut receipt = CandidateReceipt::default();
- receipt.descriptor.para_id = 5.into();
+ let mut overlay_db = OverlayedBackend::new(&backend);
+ overlay_db.write_candidate_votes(
+ 1,
+ CandidateHash(Hash::repeat_byte(1)),
+ CandidateVotes {
+ candidate_receipt: {
+ let mut receipt = CandidateReceipt::default();
+ receipt.descriptor.para_id = 5.into();
- receipt
- },
- valid: Vec::new(),
- invalid: Vec::new(),
- }
- );
+ receipt
+ },
+ valid: Vec::new(),
+ invalid: Vec::new(),
+ }
+ );
- tx.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));
+ overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));
- tx.write(&store, &config).unwrap();
- }
+ let write_ops = overlay_db.into_write_ops();
+ backend.write(write_ops).unwrap();
assert!(
- load_candidate_votes(
- &store,
- &config,
+ backend.load_candidate_votes(
1,
&CandidateHash(Hash::repeat_byte(1))
).unwrap().is_none()
@@ -446,8 +422,7 @@ mod tests {
#[test]
fn note_current_session_prunes_old() {
- let store = kvdb_memorydb::create(1);
- let config = ColumnConfiguration { col_data: 0 };
+ let mut backend = make_db();
let hash_a = CandidateHash(Hash::repeat_byte(0x0a));
let hash_b = CandidateHash(Hash::repeat_byte(0x0b));
@@ -468,61 +443,61 @@ mod tests {
invalid: Vec::new(),
};
- {
- let mut tx = Transaction::default();
- tx.put_earliest_session(prev_earliest_session);
- tx.put_recent_disputes(vec![
- ((very_old, hash_a), DisputeStatus::Active),
- ((slightly_old, hash_b), DisputeStatus::Active),
- ((new_earliest_session, hash_c), DisputeStatus::Active),
- ((very_recent, hash_d), DisputeStatus::Active),
- ].into_iter().collect());
+ let mut overlay_db = OverlayedBackend::new(&backend);
+ overlay_db.write_earliest_session(prev_earliest_session);
+ overlay_db.write_recent_disputes(vec![
+ ((very_old, hash_a), DisputeStatus::Active),
+ ((slightly_old, hash_b), DisputeStatus::Active),
+ ((new_earliest_session, hash_c), DisputeStatus::Active),
+ ((very_recent, hash_d), DisputeStatus::Active),
+ ].into_iter().collect());
- tx.put_candidate_votes(
- very_old,
- hash_a,
- blank_candidate_votes(),
- );
+ overlay_db.write_candidate_votes(
+ very_old,
+ hash_a,
+ blank_candidate_votes(),
+ );
- tx.put_candidate_votes(
- slightly_old,
- hash_b,
- blank_candidate_votes(),
- );
+ overlay_db.write_candidate_votes(
+ slightly_old,
+ hash_b,
+ blank_candidate_votes(),
+ );
- tx.put_candidate_votes(
- new_earliest_session,
- hash_c,
- blank_candidate_votes(),
- );
+ overlay_db.write_candidate_votes(
+ new_earliest_session,
+ hash_c,
+ blank_candidate_votes(),
+ );
- tx.put_candidate_votes(
- very_recent,
- hash_d,
- blank_candidate_votes(),
- );
+ overlay_db.write_candidate_votes(
+ very_recent,
+ hash_d,
+ blank_candidate_votes(),
+ );
- tx.write(&store, &config).unwrap();
- }
+ let write_ops = overlay_db.into_write_ops();
+ backend.write(write_ops).unwrap();
- note_current_session(&store, &config, current_session).unwrap();
+ let mut overlay_db = OverlayedBackend::new(&backend);
+ note_current_session(&mut overlay_db, current_session).unwrap();
assert_eq!(
- load_earliest_session(&store, &config).unwrap(),
+ overlay_db.load_earliest_session().unwrap(),
Some(new_earliest_session),
);
assert_eq!(
- load_recent_disputes(&store, &config).unwrap().unwrap(),
+ overlay_db.load_recent_disputes().unwrap().unwrap(),
vec![
((new_earliest_session, hash_c), DisputeStatus::Active),
((very_recent, hash_d), DisputeStatus::Active),
].into_iter().collect(),
);
- assert!(load_candidate_votes(&store, &config, very_old, &hash_a).unwrap().is_none());
- assert!(load_candidate_votes(&store, &config, slightly_old, &hash_b).unwrap().is_none());
- assert!(load_candidate_votes(&store, &config, new_earliest_session, &hash_c).unwrap().is_some());
- assert!(load_candidate_votes(&store, &config, very_recent, &hash_d).unwrap().is_some());
+ assert!(overlay_db.load_candidate_votes(very_old, &hash_a).unwrap().is_none());
+ assert!(overlay_db.load_candidate_votes(slightly_old, &hash_b).unwrap().is_none());
+ assert!(overlay_db.load_candidate_votes(new_earliest_session, &hash_c).unwrap().is_some());
+ assert!(overlay_db.load_candidate_votes(very_recent, &hash_d).unwrap().is_some());
}
}
diff --git a/polkadot/node/core/dispute-coordinator/src/lib.rs b/polkadot/node/core/dispute-coordinator/src/lib.rs
index 1b7fc24a22..2cc155a51e 100644
--- a/polkadot/node/core/dispute-coordinator/src/lib.rs
+++ b/polkadot/node/core/dispute-coordinator/src/lib.rs
@@ -52,9 +52,11 @@ use kvdb::KeyValueDB;
use parity_scale_codec::{Encode, Decode, Error as CodecError};
use sc_keystore::LocalKeystore;
-use db::v1::RecentDisputes;
+use db::v1::{RecentDisputes, DbBackend};
+use backend::{Backend, OverlayedBackend};
mod db;
+mod backend;
#[cfg(test)]
mod tests;
@@ -112,7 +114,8 @@ where
Context: overseer::SubsystemContext,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
- let future = run(self, ctx, Box::new(SystemClock))
+ let backend = DbBackend::new(self.store.clone(), self.config.column_config());
+ let future = run(self, ctx, backend, Box::new(SystemClock))
.map(|_| Ok(()))
.boxed();
@@ -262,17 +265,19 @@ impl DisputeStatus {
}
}
-async fn run(
+async fn run(
subsystem: DisputeCoordinatorSubsystem,
mut ctx: Context,
+ mut backend: B,
clock: Box,
)
where
Context: overseer::SubsystemContext,
- Context: SubsystemContext
+ Context: SubsystemContext,
+ B: Backend,
{
loop {
- let res = run_iteration(&mut ctx, &subsystem, &*clock).await;
+ let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await;
match res {
Err(e) => {
e.trace();
@@ -294,24 +299,25 @@ where
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
-async fn run_iteration(
+async fn run_iteration(
ctx: &mut Context,
subsystem: &DisputeCoordinatorSubsystem,
+ backend: &mut B,
clock: &dyn Clock,
-)
- -> Result<(), Error>
+) -> Result<(), Error>
where
Context: overseer::SubsystemContext,
- Context: SubsystemContext
+ Context: SubsystemContext,
+ B: Backend,
{
- let DisputeCoordinatorSubsystem { ref store, ref keystore, ref config } = *subsystem;
let mut state = State {
- keystore: keystore.clone(),
+ keystore: subsystem.keystore.clone(),
highest_session: None,
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
};
loop {
+ let mut overlay_db = OverlayedBackend::new(backend);
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::Conclude) => {
return Ok(())
@@ -319,9 +325,8 @@ where
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
handle_new_activations(
ctx,
- &**store,
+ &mut overlay_db,
&mut state,
- config,
update.activated.into_iter().map(|a| a.hash),
).await?
}
@@ -329,22 +334,25 @@ where
FromOverseer::Communication { msg } => {
handle_incoming(
ctx,
- &**store,
+ &mut overlay_db,
&mut state,
- config,
msg,
clock.now(),
).await?
}
}
+
+ if !overlay_db.is_empty() {
+ let ops = overlay_db.into_write_ops();
+ backend.write(ops)?;
+ }
}
}
async fn handle_new_activations(
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
- store: &dyn KeyValueDB,
+ overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
- config: &Config,
new_activations: impl IntoIterator- ,
) -> Result<(), Error> {
for new_leaf in new_activations {
@@ -388,11 +396,7 @@ async fn handle_new_activations(
state.highest_session = Some(session);
- db::v1::note_current_session(
- store,
- &config.column_config(),
- session,
- )?;
+ db::v1::note_current_session(overlay_db, session)?;
}
}
_ => {}
@@ -404,9 +408,8 @@ async fn handle_new_activations(
async fn handle_incoming(
ctx: &mut impl SubsystemContext,
- store: &dyn KeyValueDB,
+ overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
- config: &Config,
message: DisputeCoordinatorMessage,
now: Timestamp,
) -> Result<(), Error> {
@@ -420,9 +423,8 @@ async fn handle_incoming(
} => {
handle_import_statements(
ctx,
- store,
+ overlay_db,
state,
- config,
candidate_hash,
candidate_receipt,
session,
@@ -432,15 +434,11 @@ async fn handle_incoming(
).await?;
}
DisputeCoordinatorMessage::RecentDisputes(rx) => {
- let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())?
- .unwrap_or_default();
-
+ let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let _ = rx.send(recent_disputes.keys().cloned().collect());
}
DisputeCoordinatorMessage::ActiveDisputes(rx) => {
- let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())?
- .unwrap_or_default();
-
+ let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let _ = rx.send(collect_active(recent_disputes, now));
}
DisputeCoordinatorMessage::QueryCandidateVotes(
@@ -449,9 +447,7 @@ async fn handle_incoming(
) => {
let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
- if let Some(v) = db::v1::load_candidate_votes(
- store,
- &config.column_config(),
+ if let Some(v) = overlay_db.load_candidate_votes(
session_index,
&candidate_hash,
)? {
@@ -475,8 +471,7 @@ async fn handle_incoming(
issue_local_statement(
ctx,
state,
- store,
- config,
+ overlay_db,
candidate_hash,
candidate_receipt,
session,
@@ -490,8 +485,7 @@ async fn handle_incoming(
tx,
} => {
let undisputed_chain = determine_undisputed_chain(
- store,
- &config,
+ overlay_db,
base_number,
block_descriptions
)?;
@@ -528,9 +522,8 @@ fn insert_into_statement_vec
(
async fn handle_import_statements(
ctx: &mut impl SubsystemContext,
- store: &dyn KeyValueDB,
+ overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
- config: &Config,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
@@ -563,12 +556,7 @@ async fn handle_import_statements(
let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold(n_validators);
- let mut votes = db::v1::load_candidate_votes(
- store,
- &config.column_config(),
- session,
- &candidate_hash
- )?
+ let mut votes = overlay_db.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
@@ -617,86 +605,79 @@ async fn handle_import_statements(
let concluded_valid = votes.valid.len() >= supermajority_threshold;
let concluded_invalid = votes.invalid.len() >= supermajority_threshold;
- let mut recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())?
- .unwrap_or_default();
+ let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
- { // Scope so we will only confirm valid import after the import got actually persisted.
- let mut tx = db::v1::Transaction::default();
+ let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone());
- let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone());
+ let status = if is_disputed {
+ let status = recent_disputes
+ .entry((session, candidate_hash))
+ .or_insert(DisputeStatus::active());
- let status = if is_disputed {
- let status = recent_disputes
- .entry((session, candidate_hash))
- .or_insert(DisputeStatus::active());
+ // Note: concluded-invalid overwrites concluded-valid,
+ // so we do this check first. Dispute state machine is
+ // non-commutative.
+ if concluded_valid {
+ *status = status.concluded_for(now);
+ }
- // Note: concluded-invalid overwrites concluded-valid,
- // so we do this check first. Dispute state machine is
- // non-commutative.
- if concluded_valid {
- *status = status.concluded_for(now);
- }
+ if concluded_invalid {
+ *status = status.concluded_against(now);
+ }
- if concluded_invalid {
- *status = status.concluded_against(now);
- }
+ Some(*status)
+ } else {
+ None
+ };
- Some(*status)
- } else {
- None
- };
+ if status != prev_status {
+ // This branch is only hit when the candidate is freshly disputed -
+ // status was previously `None`, and now is not.
+ if prev_status.is_none() {
+ // No matter what, if the dispute is new, we participate.
+ //
+ // We also block the coordinator while awaiting our determination
+ // of whether the vote is available.
+ let (report_availability, receive_availability) = oneshot::channel();
+ ctx.send_message(DisputeParticipationMessage::Participate {
+ candidate_hash,
+ candidate_receipt,
+ session,
+ n_validators: n_validators as u32,
+ report_availability,
+ }).await;
- if status != prev_status {
- // Only write when updated.
- tx.put_recent_disputes(recent_disputes);
-
- // This branch is only hit when the candidate is freshly disputed -
- // status was previously `None`, and now is not.
- if prev_status.is_none() {
- // No matter what, if the dispute is new, we participate.
+ if !receive_availability.await.map_err(Error::Oneshot)? {
+ // If the data is not available, we disregard the dispute votes.
+ // This is an indication that the dispute does not correspond to any included
+ // candidate and that it should be ignored.
//
- // We also block the coordinator while awaiting our determination
- // of whether the vote is available.
- let (report_availability, receive_availability) = oneshot::channel();
- ctx.send_message(DisputeParticipationMessage::Participate {
- candidate_hash,
- candidate_receipt,
- session,
- n_validators: n_validators as u32,
- report_availability,
- }).await;
+ // We expect that if the candidate is truly disputed that the higher-level network
+ // code will retry.
+ pending_confirmation.send(ImportStatementsResult::InvalidImport)
+ .map_err(|_| Error::OneshotSend)?;
- if !receive_availability.await.map_err(Error::Oneshot)? {
- // If the data is not available, we disregard the dispute votes.
- // This is an indication that the dispute does not correspond to any included
- // candidate and that it should be ignored.
- //
- // We expect that if the candidate is truly disputed that the higher-level network
- // code will retry.
- pending_confirmation.send(ImportStatementsResult::InvalidImport)
- .map_err(|_| Error::OneshotSend)?;
-
- tracing::debug!(
- target: LOG_TARGET,
- "Recovering availability failed - invalid import."
- );
- return Ok(())
- }
+ tracing::debug!(
+ target: LOG_TARGET,
+ "Recovering availability failed - invalid import."
+ );
+ return Ok(())
}
}
- tx.put_candidate_votes(session, candidate_hash, votes.into());
- tx.write(store, &config.column_config())?;
+ // Only write when updated and vote is available.
+ overlay_db.write_recent_disputes(recent_disputes);
}
+ overlay_db.write_candidate_votes(session, candidate_hash, votes.into());
+
Ok(())
}
async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
state: &mut State,
- store: &dyn KeyValueDB,
- config: &Config,
+ overlay_db: &mut OverlayedBackend<'_, impl Backend>,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
@@ -719,12 +700,7 @@ async fn issue_local_statement(
let validators = info.validators.clone();
- let votes = db::v1::load_candidate_votes(
- store,
- &config.column_config(),
- session,
- &candidate_hash
- )?
+ let votes = overlay_db.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
@@ -792,9 +768,8 @@ async fn issue_local_statement(
let (pending_confirmation, _rx) = oneshot::channel();
handle_import_statements(
ctx,
- store,
+ overlay_db,
state,
- config,
candidate_hash,
candidate_receipt,
session,
@@ -862,16 +837,16 @@ fn make_dispute_message(
}
fn determine_undisputed_chain(
- store: &dyn KeyValueDB,
- config: &Config,
+ overlay_db: &mut OverlayedBackend<'_, impl Backend>,
base_number: BlockNumber,
block_descriptions: Vec<(Hash, SessionIndex, Vec)>,
) -> Result, Error> {
let last = block_descriptions.last()
+
.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.0));
// Fast path for no disputes.
- let recent_disputes = match db::v1::load_recent_disputes(store, &config.column_config())? {
+ let recent_disputes = match overlay_db.load_recent_disputes()? {
None => return Ok(last),
Some(a) if a.is_empty() => return Ok(last),
Some(a) => a,
diff --git a/polkadot/node/core/dispute-coordinator/src/tests.rs b/polkadot/node/core/dispute-coordinator/src/tests.rs
index 860a3e31e2..92ca2f6ffe 100644
--- a/polkadot/node/core/dispute-coordinator/src/tests.rs
+++ b/polkadot/node/core/dispute-coordinator/src/tests.rs
@@ -250,7 +250,8 @@ fn test_harness(test: F)
state.subsystem_keystore.clone(),
);
- let subsystem_task = run(subsystem, ctx, Box::new(state.clock.clone()));
+ let backend = DbBackend::new(state.db.clone(), state.config.column_config());
+ let subsystem_task = run(subsystem, ctx, backend, Box::new(state.clock.clone()));
let test_task = test(state, ctx_handle);
futures::executor::block_on(future::join(subsystem_task, test_task));