Dispute coordinator overlay (#3462)

* node/dispute-coordinator: Modify db to return SubsystemResult.

In preparation of moving to the overlayed backend pattern, this commit
moves the db to return SubsystemResult values.

* node/dispute-coordinator: Add the Backend and OverlayedBackend.

This commit adds the backend and overlayed backend structs to the
dispute-coordinator subsystem.

* node/dispute-coordinator: Implement backend and overlayed-backend.

This commit finalizes the move from the previous transactional model
to the common overlay pattern in subsystem persistency. This can be
observed in the ApprovalVoting and ChainSelection subsystems.

* Add module docs + license

* Touchup merge
This commit is contained in:
Lldenaurois
2021-07-12 23:42:51 -04:00
committed by GitHub
parent 7948eae54b
commit dc0927787e
4 changed files with 539 additions and 405 deletions
@@ -0,0 +1,183 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! An abstraction over storage used by the chain selection subsystem.
//!
//! This provides both a [`Backend`] trait and an [`OverlayedBackend`]
//! struct which allows in-memory changes to be applied on top of a
//! [`Backend`], maintaining consistency between queries and temporary writes,
//! before any commit to the underlying storage is made.
use polkadot_primitives::v1::{CandidateHash, SessionIndex};
use polkadot_node_subsystem::SubsystemResult;
use std::collections::HashMap;
use super::db::v1::{RecentDisputes, CandidateVotes};
#[derive(Debug)]
pub enum BackendWriteOp {
WriteEarliestSession(SessionIndex),
WriteRecentDisputes(RecentDisputes),
WriteCandidateVotes(SessionIndex, CandidateHash, CandidateVotes),
DeleteCandidateVotes(SessionIndex, CandidateHash),
}
/// An abstraction over backend storage for the logic of this subsystem.
pub trait Backend {
/// Load the earliest session, if any.
fn load_earliest_session(&self) -> SubsystemResult<Option<SessionIndex>>;
/// Load the recent disputes, if any.
fn load_recent_disputes(&self) -> SubsystemResult<Option<RecentDisputes>>;
/// Load the candidate votes for the specific session-candidate pair, if any.
fn load_candidate_votes(
&self,
session: SessionIndex,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateVotes>>;
/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
fn write<I>(&mut self, ops: I) -> SubsystemResult<()>
where I: IntoIterator<Item = BackendWriteOp>;
}
/// An in-memory overllay for the backend.
///
/// This maintains read-only access to the underlying backend, but can be converted into a set of
/// write operations which will, when written to the underlying backend, give the same view as the
/// state of the overlay.
pub struct OverlayedBackend<'a, B: 'a> {
inner: &'a B,
// `None` means unchanged.
earliest_session: Option<SessionIndex>,
// `None` means unchanged.
recent_disputes: Option<RecentDisputes>,
// `None` means deleted, missing means query inner.
candidate_votes: HashMap<(SessionIndex, CandidateHash), Option<CandidateVotes>>,
}
impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> {
pub fn new(backend: &'a B) -> Self {
Self {
inner: backend,
earliest_session: None,
recent_disputes: None,
candidate_votes: HashMap::new(),
}
}
/// Returns true if the are no write operations to perform.
pub fn is_empty(&self) -> bool {
self.earliest_session.is_none() &&
self.recent_disputes.is_none() &&
self.candidate_votes.is_empty()
}
/// Load the earliest session, if any.
pub fn load_earliest_session(&self) -> SubsystemResult<Option<SessionIndex>> {
if let Some(val) = self.earliest_session {
return Ok(Some(val))
}
self.inner.load_earliest_session()
}
/// Load the recent disputes, if any.
pub fn load_recent_disputes(&self) -> SubsystemResult<Option<RecentDisputes>> {
if let Some(val) = &self.recent_disputes {
return Ok(Some(val.clone()))
}
self.inner.load_recent_disputes()
}
/// Load the candidate votes for the specific session-candidate pair, if any.
pub fn load_candidate_votes(
&self,
session: SessionIndex,
candidate_hash: &CandidateHash
) -> SubsystemResult<Option<CandidateVotes>> {
if let Some(val) = self.candidate_votes.get(&(session, *candidate_hash)) {
return Ok(val.clone())
}
self.inner.load_candidate_votes(session, candidate_hash)
}
/// Prepare a write to the 'earliest session' field of the DB.
///
/// Later calls to this function will override earlier ones.
pub fn write_earliest_session(&mut self, session: SessionIndex) {
self.earliest_session = Some(session);
}
/// Prepare a write to the recent disputes stored in the DB.
///
/// Later calls to this function will override earlier ones.
pub fn write_recent_disputes(&mut self, recent_disputes: RecentDisputes) {
self.recent_disputes = Some(recent_disputes)
}
/// Prepare a write of the candidate votes under the indicated candidate.
///
/// Later calls to this function for the same candidate will override earlier ones.
pub fn write_candidate_votes(
&mut self,
session: SessionIndex,
candidate_hash: CandidateHash,
votes: CandidateVotes
) {
self.candidate_votes.insert((session, candidate_hash), Some(votes));
}
/// Prepare a deletion of the candidate votes under the indicated candidate.
///
/// Later calls to this function for the same candidate will override earlier ones.
pub fn delete_candidate_votes(
&mut self,
session: SessionIndex,
candidate_hash: CandidateHash,
) {
self.candidate_votes.insert((session, candidate_hash), None);
}
/// Transform this backend into a set of write-ops to be written to the inner backend.
pub fn into_write_ops(self) -> impl Iterator<Item = BackendWriteOp> {
let earliest_session_ops = self.earliest_session
.map(|s| BackendWriteOp::WriteEarliestSession(s))
.into_iter();
let recent_dispute_ops = self.recent_disputes
.map(|d| BackendWriteOp::WriteRecentDisputes(d))
.into_iter();
let candidate_vote_ops = self.candidate_votes
.into_iter()
.map(|((session, candidate), votes)| match votes {
Some(votes) => BackendWriteOp::WriteCandidateVotes(session, candidate, votes),
None => BackendWriteOp::DeleteCandidateVotes(session, candidate),
});
earliest_session_ops
.chain(recent_dispute_ops)
.chain(candidate_vote_ops)
}
}
@@ -20,16 +20,96 @@ use polkadot_primitives::v1::{
CandidateReceipt, ValidDisputeStatementKind, InvalidDisputeStatementKind, ValidatorIndex,
ValidatorSignature, SessionIndex, CandidateHash, Hash,
};
use polkadot_node_subsystem::{SubsystemResult, SubsystemError};
use std::sync::Arc;
use kvdb::{KeyValueDB, DBTransaction};
use parity_scale_codec::{Encode, Decode};
use crate::{DISPUTE_WINDOW, DisputeStatus};
use crate::backend::{Backend, BackendWriteOp, OverlayedBackend};
const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes";
const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session";
const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes";
pub struct DbBackend {
inner: Arc<dyn KeyValueDB>,
config: ColumnConfiguration,
}
impl DbBackend {
pub fn new(db: Arc<dyn KeyValueDB>, config: ColumnConfiguration) -> Self {
Self {
inner: db,
config,
}
}
}
impl Backend for DbBackend {
/// Load the earliest session, if any.
fn load_earliest_session(&self) -> SubsystemResult<Option<SessionIndex>> {
load_earliest_session(&*self.inner, &self.config)
}
/// Load the recent disputes, if any.
fn load_recent_disputes(&self) -> SubsystemResult<Option<RecentDisputes>> {
load_recent_disputes(&*self.inner, &self.config)
}
/// Load the candidate votes for the specific session-candidate pair, if any.
fn load_candidate_votes(
&self,
session: SessionIndex,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateVotes>> {
load_candidate_votes(&*self.inner, &self.config, session, candidate_hash)
}
/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
fn write<I>(&mut self, ops: I) -> SubsystemResult<()>
where I: IntoIterator<Item = BackendWriteOp>
{
let mut tx = DBTransaction::new();
for op in ops {
match op {
BackendWriteOp::WriteEarliestSession(session) => {
tx.put_vec(
self.config.col_data,
EARLIEST_SESSION_KEY,
session.encode(),
);
}
BackendWriteOp::WriteRecentDisputes(recent_disputes) => {
tx.put_vec(
self.config.col_data,
RECENT_DISPUTES_KEY,
recent_disputes.encode(),
);
}
BackendWriteOp::WriteCandidateVotes(session, candidate_hash, votes) => {
tx.put_vec(
self.config.col_data,
&candidate_votes_key(session, &candidate_hash),
votes.encode(),
);
}
BackendWriteOp::DeleteCandidateVotes(session, candidate_hash) => {
tx.delete(
self.config.col_data,
&candidate_votes_key(session, &candidate_hash),
);
}
}
}
self.inner.write(tx).map_err(Into::into)
}
}
fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) -> [u8; 15 + 4 + 32] {
let mut buf = [0u8; 15 + 4 + 32];
buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY);
@@ -41,29 +121,6 @@ fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) ->
buf
}
// Computes the upper lexicographic bound on DB keys for candidate votes with a given
// upper-exclusive bound on sessions.
fn candidate_votes_range_upper_bound(upper_exclusive: SessionIndex) -> [u8; 15 + 4] {
let mut buf = [0; 15 + 4];
buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY);
// big-endian encoding is used to ensure lexicographic ordering.
buf[15..][..4].copy_from_slice(&upper_exclusive.to_be_bytes());
buf
}
fn decode_candidate_votes_key(key: &[u8]) -> Option<(SessionIndex, CandidateHash)> {
if key.len() != 15 + 4 + 32 {
return None;
}
let mut session_buf = [0; 4];
session_buf.copy_from_slice(&key[15..][..4]);
let session = SessionIndex::from_be_bytes(session_buf);
CandidateHash::decode(&mut &key[(15 + 4)..]).ok().map(|hash| (session, hash))
}
/// Column configuration information for the DB.
#[derive(Debug, Clone)]
pub struct ColumnConfiguration {
@@ -128,102 +185,33 @@ fn load_decode<D: Decode>(db: &dyn KeyValueDB, col_data: u32, key: &[u8])
}
}
/// Load the candidate votes for the identified candidate under the given hash.
/// Load the candidate votes for the specific session-candidate pair, if any.
pub(crate) fn load_candidate_votes(
db: &dyn KeyValueDB,
config: &ColumnConfiguration,
session: SessionIndex,
candidate_hash: &CandidateHash,
) -> Result<Option<CandidateVotes>> {
) -> SubsystemResult<Option<CandidateVotes>> {
load_decode(db, config.col_data, &candidate_votes_key(session, candidate_hash))
.map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
}
/// Load the earliest session, if any.
pub(crate) fn load_earliest_session(
db: &dyn KeyValueDB,
config: &ColumnConfiguration,
) -> Result<Option<SessionIndex>> {
) -> SubsystemResult<Option<SessionIndex>> {
load_decode(db, config.col_data, EARLIEST_SESSION_KEY)
.map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
}
/// Load the recent disputes, if any.
pub(crate) fn load_recent_disputes(
db: &dyn KeyValueDB,
config: &ColumnConfiguration,
) -> Result<Option<RecentDisputes>> {
) -> SubsystemResult<Option<RecentDisputes>> {
load_decode(db, config.col_data, RECENT_DISPUTES_KEY)
}
/// An atomic transaction to be commited to the underlying DB.
#[derive(Debug, Default, Clone)]
pub(crate) struct Transaction {
earliest_session: Option<SessionIndex>,
recent_disputes: Option<RecentDisputes>,
write_candidate_votes: Vec<(SessionIndex, CandidateHash, CandidateVotes)>,
delete_candidate_votes: Vec<(SessionIndex, CandidateHash)>,
}
impl Transaction {
/// Prepare a write to the 'earliest session' field of the DB.
///
/// Later calls to this function will override earlier ones.
pub(crate) fn put_earliest_session(&mut self, session: SessionIndex) {
self.earliest_session = Some(session);
}
/// Prepare a write to the recent disputes stored in the DB.
///
/// Later calls to this function will override earlier ones.
pub(crate) fn put_recent_disputes(&mut self, recent_disputes: RecentDisputes) {
self.recent_disputes = Some(recent_disputes);
}
/// Prepare a write of the candidate votes under the indicated candidate.
///
/// Later calls to this function for the same candidate will override earlier ones.
/// Any calls to this function will be overridden by deletions of the same candidate.
pub(crate) fn put_candidate_votes(
&mut self,
session: SessionIndex,
candidate_hash: CandidateHash,
votes: CandidateVotes,
) {
self.write_candidate_votes.push((session, candidate_hash, votes))
}
/// Prepare a deletion of the candidate votes under the indicated candidate.
///
/// Any calls to this function will override writes to the same candidate.
pub(crate) fn delete_candidate_votes(
&mut self,
session: SessionIndex,
candidate_hash: CandidateHash,
) {
self.delete_candidate_votes.push((session, candidate_hash))
}
/// Write the transaction atomically to the DB.
pub(crate) fn write(self, db: &dyn KeyValueDB, config: &ColumnConfiguration) -> Result<()> {
let mut tx = DBTransaction::new();
if let Some(s) = self.earliest_session {
tx.put_vec(config.col_data, EARLIEST_SESSION_KEY, s.encode());
}
if let Some(a) = self.recent_disputes {
tx.put_vec(config.col_data, RECENT_DISPUTES_KEY, a.encode());
}
for (session, candidate_hash, votes) in self.write_candidate_votes {
tx.put_vec(config.col_data, &candidate_votes_key(session, &candidate_hash), votes.encode());
}
for (session, candidate_hash) in self.delete_candidate_votes {
tx.delete(config.col_data, &candidate_votes_key(session, &candidate_hash));
}
db.write(tx).map_err(Into::into)
}
.map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
}
/// Maybe prune data in the DB based on the provided session index.
@@ -235,57 +223,46 @@ impl Transaction {
/// If one or more ancient sessions are pruned, all metadata on candidates within the ancient
/// session will be deleted.
pub(crate) fn note_current_session(
store: &dyn KeyValueDB,
config: &ColumnConfiguration,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
current_session: SessionIndex,
) -> Result<()> {
) -> SubsystemResult<()> {
let new_earliest = current_session.saturating_sub(DISPUTE_WINDOW);
let mut tx = Transaction::default();
match load_earliest_session(store, config)? {
match overlay_db.load_earliest_session()? {
None => {
// First launch - write new-earliest.
tx.put_earliest_session(new_earliest);
overlay_db.write_earliest_session(new_earliest);
}
Some(prev_earliest) if new_earliest > prev_earliest => {
// Prune all data in the outdated sessions.
tx.put_earliest_session(new_earliest);
overlay_db.write_earliest_session(new_earliest);
// Clear recent disputes metadata.
{
let mut recent_disputes = load_recent_disputes(store, config)?.unwrap_or_default();
let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let lower_bound = (
new_earliest,
CandidateHash(Hash::repeat_byte(0x00)),
);
let prev_len = recent_disputes.len();
recent_disputes = recent_disputes.split_off(&lower_bound);
let new_recent_disputes = recent_disputes.split_off(&lower_bound);
// Any remanining disputes are considered ancient and must be pruned.
let pruned_disputes = recent_disputes;
if recent_disputes.len() != prev_len {
tx.put_recent_disputes(recent_disputes);
if pruned_disputes.len() != 0 {
overlay_db.write_recent_disputes(new_recent_disputes);
for ((session, candidate_hash), _) in pruned_disputes {
overlay_db.delete_candidate_votes(session, candidate_hash);
}
}
}
// Clear all candidate data with session less than the new earliest kept.
{
let end_prefix = candidate_votes_range_upper_bound(new_earliest);
store.iter_with_prefix(config.col_data, CANDIDATE_VOTES_SUBKEY)
.take_while(|(k, _)| &k[..] < &end_prefix[..])
.filter_map(|(k, _)| decode_candidate_votes_key(&k[..]))
.for_each(|(session, candidate_hash)| {
tx.delete_candidate_votes(session, candidate_hash);
});
}
}
Some(_) => {
// nothing to do.
}
};
}
tx.write(store, config)
Ok(())
}
#[cfg(test)]
@@ -293,151 +270,150 @@ mod tests {
use super::*;
use polkadot_primitives::v1::{Hash, Id as ParaId};
fn make_db() -> DbBackend {
let store = Arc::new(kvdb_memorydb::create(1));
let config = ColumnConfiguration { col_data: 0 };
DbBackend::new(store, config)
}
#[test]
fn candidate_votes_key_works() {
let session = 4;
let candidate = CandidateHash(Hash::repeat_byte(0x01));
fn overlay_pre_and_post_commit_consistency() {
let mut backend = make_db();
let key = candidate_votes_key(session, &candidate);
let mut overlay_db = OverlayedBackend::new(&backend);
assert_eq!(&key[0..15], CANDIDATE_VOTES_SUBKEY);
assert_eq!(&key[15..19], &[0x00, 0x00, 0x00, 0x04]);
assert_eq!(&key[19..51], candidate.0.as_bytes());
overlay_db.write_earliest_session(0);
overlay_db.write_earliest_session(1);
overlay_db.write_recent_disputes(vec![
((0, CandidateHash(Hash::repeat_byte(0))), DisputeStatus::Active),
].into_iter().collect());
overlay_db.write_recent_disputes(vec![
((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
].into_iter().collect());
overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: Default::default(),
valid: Vec::new(),
invalid: Vec::new(),
},
);
overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: {
let mut receipt = CandidateReceipt::default();
receipt.descriptor.para_id = 5.into();
receipt
},
valid: Vec::new(),
invalid: Vec::new(),
},
);
// Test that overlay returns the correct values before committing.
assert_eq!(
overlay_db.load_earliest_session().unwrap().unwrap(),
1,
);
assert_eq!(
decode_candidate_votes_key(&key[..]),
Some((session, candidate)),
overlay_db.load_recent_disputes().unwrap().unwrap(),
vec![
((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
].into_iter().collect()
);
assert_eq!(
overlay_db.load_candidate_votes(
1,
&CandidateHash(Hash::repeat_byte(1))
).unwrap().unwrap().candidate_receipt.descriptor.para_id,
ParaId::from(5),
);
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();
// Test that subsequent writes were written.
assert_eq!(
backend.load_earliest_session().unwrap().unwrap(),
1,
);
assert_eq!(
backend.load_recent_disputes().unwrap().unwrap(),
vec![
((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
].into_iter().collect()
);
assert_eq!(
backend.load_candidate_votes(
1,
&CandidateHash(Hash::repeat_byte(1))
).unwrap().unwrap().candidate_receipt.descriptor.para_id,
ParaId::from(5),
);
}
#[test]
fn db_transaction() {
let store = kvdb_memorydb::create(1);
let config = ColumnConfiguration { col_data: 0 };
fn overlay_preserves_candidate_votes_operation_order() {
let mut backend = make_db();
{
let mut tx = Transaction::default();
let mut overlay_db = OverlayedBackend::new(&backend);
overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));
tx.put_earliest_session(0);
tx.put_earliest_session(1);
overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: Default::default(),
valid: Vec::new(),
invalid: Vec::new(),
}
);
tx.put_recent_disputes(vec![
((0, CandidateHash(Hash::repeat_byte(0))), DisputeStatus::Active),
].into_iter().collect());
tx.put_recent_disputes(vec![
((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
].into_iter().collect());
tx.put_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: Default::default(),
valid: Vec::new(),
invalid: Vec::new(),
},
);
tx.put_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: {
let mut receipt = CandidateReceipt::default();
receipt.descriptor.para_id = 5.into();
receipt
},
valid: Vec::new(),
invalid: Vec::new(),
},
);
tx.write(&store, &config).unwrap();
}
// Test that subsequent writes were written.
{
assert_eq!(
load_earliest_session(&store, &config).unwrap().unwrap(),
1,
);
assert_eq!(
load_recent_disputes(&store, &config).unwrap().unwrap(),
vec![
((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active),
].into_iter().collect()
);
assert_eq!(
load_candidate_votes(
&store,
&config,
1,
&CandidateHash(Hash::repeat_byte(1))
).unwrap().unwrap().candidate_receipt.descriptor.para_id,
ParaId::from(5),
);
}
}
#[test]
fn db_deletes_supersede_writes() {
let store = kvdb_memorydb::create(1);
let config = ColumnConfiguration { col_data: 0 };
{
let mut tx = Transaction::default();
tx.put_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: Default::default(),
valid: Vec::new(),
invalid: Vec::new(),
}
);
tx.write(&store, &config).unwrap();
}
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();
assert_eq!(
load_candidate_votes(
&store,
&config,
backend.load_candidate_votes(
1,
&CandidateHash(Hash::repeat_byte(1))
).unwrap().unwrap().candidate_receipt.descriptor.para_id,
ParaId::from(0),
);
{
let mut tx = Transaction::default();
tx.put_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: {
let mut receipt = CandidateReceipt::default();
receipt.descriptor.para_id = 5.into();
let mut overlay_db = OverlayedBackend::new(&backend);
overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: {
let mut receipt = CandidateReceipt::default();
receipt.descriptor.para_id = 5.into();
receipt
},
valid: Vec::new(),
invalid: Vec::new(),
}
);
receipt
},
valid: Vec::new(),
invalid: Vec::new(),
}
);
tx.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));
overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));
tx.write(&store, &config).unwrap();
}
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();
assert!(
load_candidate_votes(
&store,
&config,
backend.load_candidate_votes(
1,
&CandidateHash(Hash::repeat_byte(1))
).unwrap().is_none()
@@ -446,8 +422,7 @@ mod tests {
#[test]
fn note_current_session_prunes_old() {
let store = kvdb_memorydb::create(1);
let config = ColumnConfiguration { col_data: 0 };
let mut backend = make_db();
let hash_a = CandidateHash(Hash::repeat_byte(0x0a));
let hash_b = CandidateHash(Hash::repeat_byte(0x0b));
@@ -468,61 +443,61 @@ mod tests {
invalid: Vec::new(),
};
{
let mut tx = Transaction::default();
tx.put_earliest_session(prev_earliest_session);
tx.put_recent_disputes(vec![
((very_old, hash_a), DisputeStatus::Active),
((slightly_old, hash_b), DisputeStatus::Active),
((new_earliest_session, hash_c), DisputeStatus::Active),
((very_recent, hash_d), DisputeStatus::Active),
].into_iter().collect());
let mut overlay_db = OverlayedBackend::new(&backend);
overlay_db.write_earliest_session(prev_earliest_session);
overlay_db.write_recent_disputes(vec![
((very_old, hash_a), DisputeStatus::Active),
((slightly_old, hash_b), DisputeStatus::Active),
((new_earliest_session, hash_c), DisputeStatus::Active),
((very_recent, hash_d), DisputeStatus::Active),
].into_iter().collect());
tx.put_candidate_votes(
very_old,
hash_a,
blank_candidate_votes(),
);
overlay_db.write_candidate_votes(
very_old,
hash_a,
blank_candidate_votes(),
);
tx.put_candidate_votes(
slightly_old,
hash_b,
blank_candidate_votes(),
);
overlay_db.write_candidate_votes(
slightly_old,
hash_b,
blank_candidate_votes(),
);
tx.put_candidate_votes(
new_earliest_session,
hash_c,
blank_candidate_votes(),
);
overlay_db.write_candidate_votes(
new_earliest_session,
hash_c,
blank_candidate_votes(),
);
tx.put_candidate_votes(
very_recent,
hash_d,
blank_candidate_votes(),
);
overlay_db.write_candidate_votes(
very_recent,
hash_d,
blank_candidate_votes(),
);
tx.write(&store, &config).unwrap();
}
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();
note_current_session(&store, &config, current_session).unwrap();
let mut overlay_db = OverlayedBackend::new(&backend);
note_current_session(&mut overlay_db, current_session).unwrap();
assert_eq!(
load_earliest_session(&store, &config).unwrap(),
overlay_db.load_earliest_session().unwrap(),
Some(new_earliest_session),
);
assert_eq!(
load_recent_disputes(&store, &config).unwrap().unwrap(),
overlay_db.load_recent_disputes().unwrap().unwrap(),
vec![
((new_earliest_session, hash_c), DisputeStatus::Active),
((very_recent, hash_d), DisputeStatus::Active),
].into_iter().collect(),
);
assert!(load_candidate_votes(&store, &config, very_old, &hash_a).unwrap().is_none());
assert!(load_candidate_votes(&store, &config, slightly_old, &hash_b).unwrap().is_none());
assert!(load_candidate_votes(&store, &config, new_earliest_session, &hash_c).unwrap().is_some());
assert!(load_candidate_votes(&store, &config, very_recent, &hash_d).unwrap().is_some());
assert!(overlay_db.load_candidate_votes(very_old, &hash_a).unwrap().is_none());
assert!(overlay_db.load_candidate_votes(slightly_old, &hash_b).unwrap().is_none());
assert!(overlay_db.load_candidate_votes(new_earliest_session, &hash_c).unwrap().is_some());
assert!(overlay_db.load_candidate_votes(very_recent, &hash_d).unwrap().is_some());
}
}
+93 -118
View File
@@ -52,9 +52,11 @@ use kvdb::KeyValueDB;
use parity_scale_codec::{Encode, Decode, Error as CodecError};
use sc_keystore::LocalKeystore;
use db::v1::RecentDisputes;
use db::v1::{RecentDisputes, DbBackend};
use backend::{Backend, OverlayedBackend};
mod db;
mod backend;
#[cfg(test)]
mod tests;
@@ -112,7 +114,8 @@ where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(self, ctx, Box::new(SystemClock))
let backend = DbBackend::new(self.store.clone(), self.config.column_config());
let future = run(self, ctx, backend, Box::new(SystemClock))
.map(|_| Ok(()))
.boxed();
@@ -262,17 +265,19 @@ impl DisputeStatus {
}
}
async fn run<Context>(
async fn run<B, Context>(
subsystem: DisputeCoordinatorSubsystem,
mut ctx: Context,
mut backend: B,
clock: Box<dyn Clock>,
)
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
Context: SubsystemContext<Message = DisputeCoordinatorMessage>
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
B: Backend,
{
loop {
let res = run_iteration(&mut ctx, &subsystem, &*clock).await;
let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await;
match res {
Err(e) => {
e.trace();
@@ -294,24 +299,25 @@ where
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async fn run_iteration<Context>(
async fn run_iteration<B, Context>(
ctx: &mut Context,
subsystem: &DisputeCoordinatorSubsystem,
backend: &mut B,
clock: &dyn Clock,
)
-> Result<(), Error>
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
Context: SubsystemContext<Message = DisputeCoordinatorMessage>
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
B: Backend,
{
let DisputeCoordinatorSubsystem { ref store, ref keystore, ref config } = *subsystem;
let mut state = State {
keystore: keystore.clone(),
keystore: subsystem.keystore.clone(),
highest_session: None,
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
};
loop {
let mut overlay_db = OverlayedBackend::new(backend);
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::Conclude) => {
return Ok(())
@@ -319,9 +325,8 @@ where
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
handle_new_activations(
ctx,
&**store,
&mut overlay_db,
&mut state,
config,
update.activated.into_iter().map(|a| a.hash),
).await?
}
@@ -329,22 +334,25 @@ where
FromOverseer::Communication { msg } => {
handle_incoming(
ctx,
&**store,
&mut overlay_db,
&mut state,
config,
msg,
clock.now(),
).await?
}
}
if !overlay_db.is_empty() {
let ops = overlay_db.into_write_ops();
backend.write(ops)?;
}
}
}
async fn handle_new_activations(
ctx: &mut (impl SubsystemContext<Message = DisputeCoordinatorMessage> + overseer::SubsystemContext<Message = DisputeCoordinatorMessage>),
store: &dyn KeyValueDB,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
config: &Config,
new_activations: impl IntoIterator<Item = Hash>,
) -> Result<(), Error> {
for new_leaf in new_activations {
@@ -388,11 +396,7 @@ async fn handle_new_activations(
state.highest_session = Some(session);
db::v1::note_current_session(
store,
&config.column_config(),
session,
)?;
db::v1::note_current_session(overlay_db, session)?;
}
}
_ => {}
@@ -404,9 +408,8 @@ async fn handle_new_activations(
async fn handle_incoming(
ctx: &mut impl SubsystemContext,
store: &dyn KeyValueDB,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
config: &Config,
message: DisputeCoordinatorMessage,
now: Timestamp,
) -> Result<(), Error> {
@@ -420,9 +423,8 @@ async fn handle_incoming(
} => {
handle_import_statements(
ctx,
store,
overlay_db,
state,
config,
candidate_hash,
candidate_receipt,
session,
@@ -432,15 +434,11 @@ async fn handle_incoming(
).await?;
}
DisputeCoordinatorMessage::RecentDisputes(rx) => {
let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())?
.unwrap_or_default();
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let _ = rx.send(recent_disputes.keys().cloned().collect());
}
DisputeCoordinatorMessage::ActiveDisputes(rx) => {
let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())?
.unwrap_or_default();
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let _ = rx.send(collect_active(recent_disputes, now));
}
DisputeCoordinatorMessage::QueryCandidateVotes(
@@ -449,9 +447,7 @@ async fn handle_incoming(
) => {
let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
if let Some(v) = db::v1::load_candidate_votes(
store,
&config.column_config(),
if let Some(v) = overlay_db.load_candidate_votes(
session_index,
&candidate_hash,
)? {
@@ -475,8 +471,7 @@ async fn handle_incoming(
issue_local_statement(
ctx,
state,
store,
config,
overlay_db,
candidate_hash,
candidate_receipt,
session,
@@ -490,8 +485,7 @@ async fn handle_incoming(
tx,
} => {
let undisputed_chain = determine_undisputed_chain(
store,
&config,
overlay_db,
base_number,
block_descriptions
)?;
@@ -528,9 +522,8 @@ fn insert_into_statement_vec<T>(
async fn handle_import_statements(
ctx: &mut impl SubsystemContext,
store: &dyn KeyValueDB,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
config: &Config,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
@@ -563,12 +556,7 @@ async fn handle_import_statements(
let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold(n_validators);
let mut votes = db::v1::load_candidate_votes(
store,
&config.column_config(),
session,
&candidate_hash
)?
let mut votes = overlay_db.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
@@ -617,86 +605,79 @@ async fn handle_import_statements(
let concluded_valid = votes.valid.len() >= supermajority_threshold;
let concluded_invalid = votes.invalid.len() >= supermajority_threshold;
let mut recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())?
.unwrap_or_default();
let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
{ // Scope so we will only confirm valid import after the import got actually persisted.
let mut tx = db::v1::Transaction::default();
let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone());
let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone());
let status = if is_disputed {
let status = recent_disputes
.entry((session, candidate_hash))
.or_insert(DisputeStatus::active());
let status = if is_disputed {
let status = recent_disputes
.entry((session, candidate_hash))
.or_insert(DisputeStatus::active());
// Note: concluded-invalid overwrites concluded-valid,
// so we do this check first. Dispute state machine is
// non-commutative.
if concluded_valid {
*status = status.concluded_for(now);
}
// Note: concluded-invalid overwrites concluded-valid,
// so we do this check first. Dispute state machine is
// non-commutative.
if concluded_valid {
*status = status.concluded_for(now);
}
if concluded_invalid {
*status = status.concluded_against(now);
}
if concluded_invalid {
*status = status.concluded_against(now);
}
Some(*status)
} else {
None
};
Some(*status)
} else {
None
};
if status != prev_status {
// This branch is only hit when the candidate is freshly disputed -
// status was previously `None`, and now is not.
if prev_status.is_none() {
// No matter what, if the dispute is new, we participate.
//
// We also block the coordinator while awaiting our determination
// of whether the vote is available.
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
n_validators: n_validators as u32,
report_availability,
}).await;
if status != prev_status {
// Only write when updated.
tx.put_recent_disputes(recent_disputes);
// This branch is only hit when the candidate is freshly disputed -
// status was previously `None`, and now is not.
if prev_status.is_none() {
// No matter what, if the dispute is new, we participate.
if !receive_availability.await.map_err(Error::Oneshot)? {
// If the data is not available, we disregard the dispute votes.
// This is an indication that the dispute does not correspond to any included
// candidate and that it should be ignored.
//
// We also block the coordinator while awaiting our determination
// of whether the vote is available.
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
n_validators: n_validators as u32,
report_availability,
}).await;
// We expect that if the candidate is truly disputed that the higher-level network
// code will retry.
pending_confirmation.send(ImportStatementsResult::InvalidImport)
.map_err(|_| Error::OneshotSend)?;
if !receive_availability.await.map_err(Error::Oneshot)? {
// If the data is not available, we disregard the dispute votes.
// This is an indication that the dispute does not correspond to any included
// candidate and that it should be ignored.
//
// We expect that if the candidate is truly disputed that the higher-level network
// code will retry.
pending_confirmation.send(ImportStatementsResult::InvalidImport)
.map_err(|_| Error::OneshotSend)?;
tracing::debug!(
target: LOG_TARGET,
"Recovering availability failed - invalid import."
);
return Ok(())
}
tracing::debug!(
target: LOG_TARGET,
"Recovering availability failed - invalid import."
);
return Ok(())
}
}
tx.put_candidate_votes(session, candidate_hash, votes.into());
tx.write(store, &config.column_config())?;
// Only write when updated and vote is available.
overlay_db.write_recent_disputes(recent_disputes);
}
overlay_db.write_candidate_votes(session, candidate_hash, votes.into());
Ok(())
}
async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
state: &mut State,
store: &dyn KeyValueDB,
config: &Config,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
@@ -719,12 +700,7 @@ async fn issue_local_statement(
let validators = info.validators.clone();
let votes = db::v1::load_candidate_votes(
store,
&config.column_config(),
session,
&candidate_hash
)?
let votes = overlay_db.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
@@ -792,9 +768,8 @@ async fn issue_local_statement(
let (pending_confirmation, _rx) = oneshot::channel();
handle_import_statements(
ctx,
store,
overlay_db,
state,
config,
candidate_hash,
candidate_receipt,
session,
@@ -862,16 +837,16 @@ fn make_dispute_message(
}
fn determine_undisputed_chain(
store: &dyn KeyValueDB,
config: &Config,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
base_number: BlockNumber,
block_descriptions: Vec<(Hash, SessionIndex, Vec<CandidateHash>)>,
) -> Result<Option<(BlockNumber, Hash)>, Error> {
let last = block_descriptions.last()
.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.0));
// Fast path for no disputes.
let recent_disputes = match db::v1::load_recent_disputes(store, &config.column_config())? {
let recent_disputes = match overlay_db.load_recent_disputes()? {
None => return Ok(last),
Some(a) if a.is_empty() => return Ok(last),
Some(a) => a,
@@ -250,7 +250,8 @@ fn test_harness<F>(test: F)
state.subsystem_keystore.clone(),
);
let subsystem_task = run(subsystem, ctx, Box::new(state.clock.clone()));
let backend = DbBackend::new(state.db.clone(), state.config.column_config());
let subsystem_task = run(subsystem, ctx, backend, Box::new(state.clock.clone()));
let test_task = test(state, ctx_handle);
futures::executor::block_on(future::join(subsystem_task, test_task));