mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 07:41:08 +00:00
Candidate Agreement + Consensus (#29)
* candidate statement importing * import votes on validity * import availability votes * candidate receipt type * make table mod public * test context for table * add harness for tests * some tests for misbehavior * produce proposal from table * count candidate issuance as implicit vote * keep track of messages known by validators * fix primitives compilation * simple BFT agreement * kill unused macro_use annotation * tests for BFT agreement * test for not concluding on different prepares * return summary upon statement import * accept bft agreement on proposal not locally submitted * check justification set for BFT * BFT rewrite: vote accumulator with tests * squash some warnings * a few more tests for the accumulator * add sender to table's signed statement * implement honest node strategy for BFT * inex -> index * import and broadcast lock proofs * poll repeatedly when state changes * don't broadcast advance vote immediately if locked * do not check validity of locked candidate * basic tests for the strategy * remove unused context trait and fix warning * address some review grumbles * address some more review nits * fix lock import logic and add a test * fix spaces * fix a couple more style grumbles * more type-safe justifications * rename Communication enum variants * improve some panic guard proofs * add trailing comma
This commit is contained in:
committed by
GitHub
parent
9612e1d7c1
commit
45c3e40a62
Generated
+18
-10
@@ -1,13 +1,3 @@
|
||||
[root]
|
||||
name = "polkadot-validator"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"polkadot-primitives 0.1.0",
|
||||
"polkadot-serializer 0.1.0",
|
||||
"serde 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aho-corasick"
|
||||
version = "0.6.3"
|
||||
@@ -612,6 +602,14 @@ dependencies = [
|
||||
"polkadot-cli 0.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "polkadot-candidate-agreement"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"polkadot-primitives 0.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "polkadot-cli"
|
||||
version = "0.1.0"
|
||||
@@ -714,6 +712,16 @@ dependencies = [
|
||||
"triehash 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "polkadot-validator"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"polkadot-primitives 0.1.0",
|
||||
"polkadot-serializer 0.1.0",
|
||||
"serde 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pretty_assertions"
|
||||
version = "0.4.0"
|
||||
|
||||
@@ -9,6 +9,7 @@ polkadot-cli = { path = "cli", version = "0.1" }
|
||||
|
||||
[workspace]
|
||||
members = [
|
||||
"candidate-agreement",
|
||||
"client",
|
||||
"collator",
|
||||
"contracts",
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
[package]
|
||||
name = "polkadot-candidate-agreement"
|
||||
version = "0.1.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
|
||||
[dependencies]
|
||||
futures = "0.1"
|
||||
polkadot-primitives = { path = "../primitives" }
|
||||
@@ -0,0 +1,607 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Message accumulator for each round of BFT consensus.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::hash::Hash;
|
||||
|
||||
use super::{Message, LocalizedMessage};
|
||||
|
||||
/// Justification for some state at a given round.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct UncheckedJustification<D, S> {
|
||||
/// The round.
|
||||
pub round_number: usize,
|
||||
/// The digest prepared for.
|
||||
pub digest: D,
|
||||
/// Signatures for the prepare messages.
|
||||
pub signatures: Vec<S>,
|
||||
}
|
||||
|
||||
impl<D, S> UncheckedJustification<D, S> {
|
||||
/// Fails if there are duplicate signatures or invalid.
|
||||
///
|
||||
/// Provide a closure for checking whether the signature is valid on a
|
||||
/// digest.
|
||||
///
|
||||
/// The closure should returns a checked justification iff the round number, digest, and signature
|
||||
/// represent a valid message and the signer was authorized to issue
|
||||
/// it.
|
||||
///
|
||||
/// The `check_message` closure may vary based on context.
|
||||
pub fn check<F, V>(self, threshold: usize, mut check_message: F)
|
||||
-> Result<Justification<D, S>, Self>
|
||||
where
|
||||
F: FnMut(usize, &D, &S) -> Option<V>,
|
||||
V: Hash + Eq,
|
||||
{
|
||||
let checks_out = {
|
||||
let mut checks_out = || {
|
||||
let mut voted = HashSet::new();
|
||||
|
||||
for signature in &self.signatures {
|
||||
match check_message(self.round_number, &self.digest, signature) {
|
||||
None => return false,
|
||||
Some(v) => {
|
||||
if !voted.insert(v) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
voted.len() >= threshold
|
||||
};
|
||||
|
||||
checks_out()
|
||||
};
|
||||
|
||||
if checks_out {
|
||||
Ok(Justification(self))
|
||||
} else {
|
||||
Err(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A checked justification.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Justification<D,S>(UncheckedJustification<D,S>);
|
||||
|
||||
impl<D, S> ::std::ops::Deref for Justification<D, S> {
|
||||
type Target = UncheckedJustification<D, S>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Type alias to represent a justification specifically for a prepare.
|
||||
pub type PrepareJustification<D, S> = Justification<D, S>;
|
||||
|
||||
/// The round's state, based on imported messages.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum State<Candidate, Digest, Signature> {
|
||||
/// No proposal yet.
|
||||
Begin,
|
||||
/// Proposal received.
|
||||
Proposed(Candidate),
|
||||
/// Seen n - f prepares for this digest.
|
||||
Prepared(PrepareJustification<Digest, Signature>),
|
||||
/// Seen n - f commits for a digest.
|
||||
Committed(Justification<Digest, Signature>),
|
||||
/// Seen n - f round-advancement messages.
|
||||
Advanced(Option<PrepareJustification<Digest, Signature>>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct VoteCounts {
|
||||
prepared: usize,
|
||||
committed: usize,
|
||||
}
|
||||
|
||||
/// Accumulates messages for a given round of BFT consensus.
|
||||
///
|
||||
/// This isn't tied to the "view" of a single validator. It
|
||||
/// keeps accurate track of the state of the BFT consensus based
|
||||
/// on all messages imported.
|
||||
#[derive(Debug)]
|
||||
pub struct Accumulator<Candidate, Digest, ValidatorId, Signature>
|
||||
where
|
||||
Candidate: Eq + Clone,
|
||||
Digest: Hash + Eq + Clone,
|
||||
ValidatorId: Hash + Eq,
|
||||
Signature: Eq + Clone,
|
||||
{
|
||||
round_number: usize,
|
||||
threshold: usize,
|
||||
round_proposer: ValidatorId,
|
||||
proposal: Option<Candidate>,
|
||||
prepares: HashMap<ValidatorId, (Digest, Signature)>,
|
||||
commits: HashMap<ValidatorId, (Digest, Signature)>,
|
||||
vote_counts: HashMap<Digest, VoteCounts>,
|
||||
advance_round: HashSet<ValidatorId>,
|
||||
state: State<Candidate, Digest, Signature>,
|
||||
}
|
||||
|
||||
impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, ValidatorId, Signature>
|
||||
where
|
||||
Candidate: Eq + Clone,
|
||||
Digest: Hash + Eq + Clone,
|
||||
ValidatorId: Hash + Eq,
|
||||
Signature: Eq + Clone,
|
||||
{
|
||||
/// Create a new state accumulator.
|
||||
pub fn new(round_number: usize, threshold: usize, round_proposer: ValidatorId) -> Self {
|
||||
Accumulator {
|
||||
round_number,
|
||||
threshold,
|
||||
round_proposer,
|
||||
proposal: None,
|
||||
prepares: HashMap::new(),
|
||||
commits: HashMap::new(),
|
||||
vote_counts: HashMap::new(),
|
||||
advance_round: HashSet::new(),
|
||||
state: State::Begin,
|
||||
}
|
||||
}
|
||||
|
||||
/// How advance votes we have seen.
|
||||
pub fn advance_votes(&self) -> usize {
|
||||
self.advance_round.len()
|
||||
}
|
||||
|
||||
/// Get the round number.
|
||||
pub fn round_number(&self) -> usize {
|
||||
self.round_number.clone()
|
||||
}
|
||||
|
||||
/// Get the round proposer.
|
||||
pub fn round_proposer(&self) -> &ValidatorId {
|
||||
&self.round_proposer
|
||||
}
|
||||
|
||||
pub fn proposal(&self) -> Option<&Candidate> {
|
||||
self.proposal.as_ref()
|
||||
}
|
||||
|
||||
/// Inspect the current consensus state.
|
||||
pub fn state(&self) -> &State<Candidate, Digest, Signature> {
|
||||
&self.state
|
||||
}
|
||||
|
||||
/// Import a message. Importing duplicates is fine, but the signature
|
||||
/// and authorization should have already been checked.
|
||||
pub fn import_message(
|
||||
&mut self,
|
||||
message: LocalizedMessage<Candidate, Digest, ValidatorId, Signature>,
|
||||
)
|
||||
{
|
||||
// message from different round.
|
||||
if message.message.round_number() != self.round_number {
|
||||
return;
|
||||
}
|
||||
|
||||
let (sender, signature) = (message.sender, message.signature);
|
||||
|
||||
match message.message {
|
||||
Message::Propose(_, p) => self.import_proposal(p, sender),
|
||||
Message::Prepare(_, d) => self.import_prepare(d, sender, signature),
|
||||
Message::Commit(_, d) => self.import_commit(d, sender, signature),
|
||||
Message::AdvanceRound(_) => self.import_advance_round(sender),
|
||||
}
|
||||
}
|
||||
|
||||
fn import_proposal(
|
||||
&mut self,
|
||||
proposal: Candidate,
|
||||
sender: ValidatorId,
|
||||
) {
|
||||
if sender != self.round_proposer || self.proposal.is_some() { return }
|
||||
|
||||
self.proposal = Some(proposal.clone());
|
||||
self.state = State::Proposed(proposal);
|
||||
}
|
||||
|
||||
fn import_prepare(
|
||||
&mut self,
|
||||
digest: Digest,
|
||||
sender: ValidatorId,
|
||||
signature: Signature,
|
||||
) {
|
||||
// ignore any subsequent prepares by the same sender.
|
||||
// TODO: if digest is different, that's misbehavior.
|
||||
let threshold_prepared = if let Entry::Vacant(vacant) = self.prepares.entry(sender) {
|
||||
vacant.insert((digest.clone(), signature));
|
||||
let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default);
|
||||
count.prepared += 1;
|
||||
|
||||
if count.prepared >= self.threshold {
|
||||
Some(digest)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// only allow transition to prepare from begin or proposed state.
|
||||
let valid_transition = match self.state {
|
||||
State::Begin | State::Proposed(_) => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if let (true, Some(threshold_prepared)) = (valid_transition, threshold_prepared) {
|
||||
let signatures = self.prepares
|
||||
.values()
|
||||
.filter(|&&(ref d, _)| d == &threshold_prepared)
|
||||
.map(|&(_, ref s)| s.clone())
|
||||
.collect();
|
||||
|
||||
self.state = State::Prepared(Justification(UncheckedJustification {
|
||||
round_number: self.round_number,
|
||||
digest: threshold_prepared,
|
||||
signatures: signatures,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
fn import_commit(
|
||||
&mut self,
|
||||
digest: Digest,
|
||||
sender: ValidatorId,
|
||||
signature: Signature,
|
||||
) {
|
||||
// ignore any subsequent commits by the same sender.
|
||||
// TODO: if digest is different, that's misbehavior.
|
||||
let threshold_committed = if let Entry::Vacant(vacant) = self.commits.entry(sender) {
|
||||
vacant.insert((digest.clone(), signature));
|
||||
let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default);
|
||||
count.committed += 1;
|
||||
|
||||
if count.committed >= self.threshold {
|
||||
Some(digest)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// transition to concluded state always valid.
|
||||
// only weird case is if the prior state was "advanced",
|
||||
// but technically it's the same behavior as if the order of receiving
|
||||
// the last "advance round" and "commit" messages were reversed.
|
||||
if let Some(threshold_committed) = threshold_committed {
|
||||
let signatures = self.commits
|
||||
.values()
|
||||
.filter(|&&(ref d, _)| d == &threshold_committed)
|
||||
.map(|&(_, ref s)| s.clone())
|
||||
.collect();
|
||||
|
||||
self.state = State::Committed(Justification(UncheckedJustification {
|
||||
round_number: self.round_number,
|
||||
digest: threshold_committed,
|
||||
signatures: signatures,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
fn import_advance_round(
|
||||
&mut self,
|
||||
sender: ValidatorId,
|
||||
) {
|
||||
self.advance_round.insert(sender);
|
||||
|
||||
if self.advance_round.len() < self.threshold { return }
|
||||
|
||||
// allow transition to new round only if we haven't produced a justification
|
||||
// yet.
|
||||
self.state = match ::std::mem::replace(&mut self.state, State::Begin) {
|
||||
State::Committed(j) => State::Committed(j),
|
||||
State::Prepared(j) => State::Advanced(Some(j)),
|
||||
State::Advanced(j) => State::Advanced(j),
|
||||
State::Begin | State::Proposed(_) => State::Advanced(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
pub struct Candidate(usize);
|
||||
|
||||
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
|
||||
pub struct Digest(usize);
|
||||
|
||||
#[derive(Hash, PartialEq, Eq, Debug)]
|
||||
pub struct ValidatorId(usize);
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Debug)]
|
||||
pub struct Signature(usize, usize);
|
||||
|
||||
#[test]
|
||||
fn justification_checks_out() {
|
||||
let mut justification = UncheckedJustification {
|
||||
round_number: 2,
|
||||
digest: Digest(600),
|
||||
signatures: (0..10).map(|i| Signature(600, i)).collect(),
|
||||
};
|
||||
|
||||
let check_message = |r, d: &Digest, s: &Signature| {
|
||||
if r == 2 && d.0 == 600 && s.0 == 600 {
|
||||
Some(ValidatorId(s.1))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
assert!(justification.clone().check(7, &check_message).is_ok());
|
||||
assert!(justification.clone().check(11, &check_message).is_err());
|
||||
|
||||
{
|
||||
// one bad signature is enough to spoil it.
|
||||
justification.signatures.push(Signature(1001, 255));
|
||||
assert!(justification.clone().check(7, &check_message).is_err());
|
||||
|
||||
justification.signatures.pop();
|
||||
}
|
||||
// duplicates not allowed.
|
||||
justification.signatures.extend((0..10).map(|i| Signature(600, i)));
|
||||
assert!(justification.clone().check(11, &check_message).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accepts_proposal_from_proposer_only() {
|
||||
let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(5),
|
||||
signature: Signature(999, 5),
|
||||
message: Message::Propose(1, Candidate(999)),
|
||||
});
|
||||
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(8),
|
||||
signature: Signature(999, 8),
|
||||
message: Message::Propose(1, Candidate(999)),
|
||||
});
|
||||
|
||||
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reaches_prepare_phase() {
|
||||
let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(8),
|
||||
signature: Signature(999, 8),
|
||||
message: Message::Propose(1, Candidate(999)),
|
||||
});
|
||||
|
||||
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
|
||||
|
||||
for i in 0..6 {
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(i),
|
||||
signature: Signature(999, i),
|
||||
message: Message::Prepare(1, Digest(999)),
|
||||
});
|
||||
|
||||
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
|
||||
}
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(7),
|
||||
signature: Signature(999, 7),
|
||||
message: Message::Prepare(1, Digest(999)),
|
||||
});
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prepare_to_commit() {
|
||||
let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(8),
|
||||
signature: Signature(999, 8),
|
||||
message: Message::Propose(1, Candidate(999)),
|
||||
});
|
||||
|
||||
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
|
||||
|
||||
for i in 0..6 {
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(i),
|
||||
signature: Signature(999, i),
|
||||
message: Message::Prepare(1, Digest(999)),
|
||||
});
|
||||
|
||||
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
|
||||
}
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(7),
|
||||
signature: Signature(999, 7),
|
||||
message: Message::Prepare(1, Digest(999)),
|
||||
});
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
|
||||
for i in 0..6 {
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(i),
|
||||
signature: Signature(999, i),
|
||||
message: Message::Commit(1, Digest(999)),
|
||||
});
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Prepared(_) => {},
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(7),
|
||||
signature: Signature(999, 7),
|
||||
message: Message::Commit(1, Digest(999)),
|
||||
});
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prepare_to_advance() {
|
||||
let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(8),
|
||||
signature: Signature(999, 8),
|
||||
message: Message::Propose(1, Candidate(999)),
|
||||
});
|
||||
|
||||
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
|
||||
|
||||
for i in 0..7 {
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(i),
|
||||
signature: Signature(999, i),
|
||||
message: Message::Prepare(1, Digest(999)),
|
||||
});
|
||||
}
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
|
||||
for i in 0..6 {
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(i),
|
||||
signature: Signature(999, i),
|
||||
message: Message::AdvanceRound(1),
|
||||
});
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Prepared(_) => {},
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(7),
|
||||
signature: Signature(999, 7),
|
||||
message: Message::AdvanceRound(1),
|
||||
});
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Advanced(Some(_)) => {},
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conclude_different_than_proposed() {
|
||||
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
for i in 0..7 {
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(i),
|
||||
signature: Signature(999, i),
|
||||
message: Message::Prepare(1, Digest(999)),
|
||||
});
|
||||
}
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
|
||||
for i in 0..7 {
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(i),
|
||||
signature: Signature(999, i),
|
||||
message: Message::Commit(1, Digest(999)),
|
||||
});
|
||||
}
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn begin_to_advance() {
|
||||
let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
for i in 0..7 {
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(i),
|
||||
signature: Signature(1, i),
|
||||
message: Message::AdvanceRound(1),
|
||||
});
|
||||
}
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Advanced(ref j) => assert!(j.is_none()),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conclude_without_prepare() {
|
||||
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
for i in 0..7 {
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
sender: ValidatorId(i),
|
||||
signature: Signature(999, i),
|
||||
message: Message::Commit(1, Digest(999)),
|
||||
});
|
||||
}
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,714 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! BFT Agreement based on a rotating proposer in different rounds.
|
||||
|
||||
mod accumulator;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
|
||||
use futures::{future, Future, Stream, Sink, Poll, Async, AsyncSink};
|
||||
|
||||
use self::accumulator::State;
|
||||
|
||||
pub use self::accumulator::{Accumulator, Justification, PrepareJustification, UncheckedJustification};
|
||||
|
||||
/// Messages over the proposal.
|
||||
/// Each message carries an associated round number.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Message<C, D> {
|
||||
/// Send a full proposal.
|
||||
Propose(usize, C),
|
||||
/// Prepare to vote for proposal with digest D.
|
||||
Prepare(usize, D),
|
||||
/// Commit to proposal with digest D..
|
||||
Commit(usize, D),
|
||||
/// Propose advancement to a new round.
|
||||
AdvanceRound(usize),
|
||||
}
|
||||
|
||||
impl<C, D> Message<C, D> {
|
||||
fn round_number(&self) -> usize {
|
||||
match *self {
|
||||
Message::Propose(round, _) => round,
|
||||
Message::Prepare(round, _) => round,
|
||||
Message::Commit(round, _) => round,
|
||||
Message::AdvanceRound(round) => round,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A localized message, including the sender.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LocalizedMessage<C, D, V, S> {
|
||||
/// The message received.
|
||||
pub message: Message<C, D>,
|
||||
/// The sender of the message
|
||||
pub sender: V,
|
||||
/// The signature of the message.
|
||||
pub signature: S,
|
||||
}
|
||||
|
||||
/// Context necessary for agreement.
|
||||
///
|
||||
/// Provides necessary types for protocol messages, and functions necessary for a
|
||||
/// participant to evaluate and create those messages.
|
||||
pub trait Context {
|
||||
/// Candidate proposed.
|
||||
type Candidate: Debug + Eq + Clone;
|
||||
/// Candidate digest.
|
||||
type Digest: Debug + Hash + Eq + Clone;
|
||||
/// Validator ID.
|
||||
type ValidatorId: Debug + Hash + Eq + Clone;
|
||||
/// Signature.
|
||||
type Signature: Debug + Eq + Clone;
|
||||
/// A future that resolves when a round timeout is concluded.
|
||||
type RoundTimeout: Future<Item=()>;
|
||||
/// A future that resolves when a proposal is ready.
|
||||
type Proposal: Future<Item=Self::Candidate>;
|
||||
|
||||
/// Get the local validator ID.
|
||||
fn local_id(&self) -> Self::ValidatorId;
|
||||
|
||||
/// Get the best proposal.
|
||||
fn proposal(&self) -> Self::Proposal;
|
||||
|
||||
/// Get the digest of a candidate.
|
||||
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
|
||||
|
||||
/// Sign a message using the local validator ID.
|
||||
fn sign_local(&self, message: Message<Self::Candidate, Self::Digest>)
|
||||
-> LocalizedMessage<Self::Candidate, Self::Digest, Self::ValidatorId, Self::Signature>;
|
||||
|
||||
/// Get the proposer for a given round of consensus.
|
||||
fn round_proposer(&self, round: usize) -> Self::ValidatorId;
|
||||
|
||||
/// Whether the candidate is valid.
|
||||
fn candidate_valid(&self, candidate: &Self::Candidate) -> bool;
|
||||
|
||||
/// Create a round timeout. The context will determine the correct timeout
|
||||
/// length, and create a future that will resolve when the timeout is
|
||||
/// concluded.
|
||||
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout;
|
||||
}
|
||||
|
||||
/// Communication that can occur between participants in consensus.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Communication<C, D, V, S> {
|
||||
/// A consensus message (proposal or vote)
|
||||
Consensus(LocalizedMessage<C, D, V, S>),
|
||||
/// Auxiliary communication (just proof-of-lock for now).
|
||||
Auxiliary(PrepareJustification<D, S>),
|
||||
}
|
||||
|
||||
/// Type alias for a localized message using only type parameters from `Context`.
|
||||
// TODO: actual type alias when it's no longer a warning.
|
||||
pub struct ContextCommunication<C: Context + ?Sized>(pub Communication<C::Candidate, C::Digest, C::ValidatorId, C::Signature>);
|
||||
|
||||
impl<C: Context + ?Sized> Clone for ContextCommunication<C>
|
||||
where
|
||||
LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>: Clone,
|
||||
PrepareJustification<C::Digest, C::Signature>: Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
ContextCommunication(self.0.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Sending<T> {
|
||||
items: VecDeque<T>,
|
||||
flushing: bool,
|
||||
}
|
||||
|
||||
impl<T> Sending<T> {
|
||||
fn with_capacity(n: usize) -> Self {
|
||||
Sending {
|
||||
items: VecDeque::with_capacity(n),
|
||||
flushing: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&mut self, item: T) {
|
||||
self.items.push_back(item);
|
||||
self.flushing = false;
|
||||
}
|
||||
|
||||
// process all the sends into the sink.
|
||||
fn process_all<S: Sink<SinkItem=T>>(&mut self, sink: &mut S) -> Poll<(), S::SinkError> {
|
||||
while let Some(item) = self.items.pop_front() {
|
||||
match sink.start_send(item) {
|
||||
Err(e) => return Err(e),
|
||||
Ok(AsyncSink::NotReady(item)) => {
|
||||
self.items.push_front(item);
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
Ok(AsyncSink::Ready) => { self.flushing = true; }
|
||||
}
|
||||
}
|
||||
|
||||
if self.flushing {
|
||||
match sink.poll_complete() {
|
||||
Err(e) => return Err(e),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(())) => { self.flushing = false; }
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Error returned when the input stream concludes.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct InputStreamConcluded;
|
||||
|
||||
impl ::std::fmt::Display for InputStreamConcluded {
|
||||
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
|
||||
write!(f, "{}", ::std::error::Error::description(self))
|
||||
}
|
||||
}
|
||||
|
||||
impl ::std::error::Error for InputStreamConcluded {
|
||||
fn description(&self) -> &str {
|
||||
"input stream of messages concluded prematurely"
|
||||
}
|
||||
}
|
||||
|
||||
// get the "full BFT" threshold based on an amount of nodes and
|
||||
// a maximum faulty. if nodes == 3f + 1, then threshold == 2f + 1.
|
||||
fn bft_threshold(nodes: usize, max_faulty: usize) -> usize {
|
||||
nodes - max_faulty
|
||||
}
|
||||
|
||||
/// Committed successfully.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Committed<C, D, S> {
|
||||
/// The candidate committed for. This will be unknown if
|
||||
/// we never witnessed the proposal of the last round.
|
||||
pub candidate: Option<C>,
|
||||
/// A justification for the candidate.
|
||||
pub justification: Justification<D, S>,
|
||||
}
|
||||
|
||||
struct Locked<D, S> {
|
||||
justification: PrepareJustification<D, S>,
|
||||
}
|
||||
|
||||
impl<D, S> Locked<D, S> {
|
||||
fn digest(&self) -> &D {
|
||||
&self.justification.digest
|
||||
}
|
||||
}
|
||||
|
||||
// the state of the local node during the current state of consensus.
|
||||
//
|
||||
// behavior is different when locked on a proposal.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum LocalState {
|
||||
Start,
|
||||
Proposed,
|
||||
Prepared,
|
||||
Committed,
|
||||
VoteAdvance,
|
||||
}
|
||||
|
||||
// This structure manages a single "view" of consensus.
|
||||
//
|
||||
// We maintain two message accumulators: one for the round we are currently in,
|
||||
// and one for a future round.
|
||||
//
|
||||
// We advance the round accumulators when one of two conditions is met:
|
||||
// - we witness consensus of advancement in the current round. in this case we
|
||||
// advance by one.
|
||||
// - a higher threshold-prepare is broadcast to us. in this case we can
|
||||
// advance to the round of the threshold-prepare. this is an indication
|
||||
// that we have experienced severe asynchrony/clock drift with the remainder
|
||||
// of the other validators, and it is unlikely that we can assist in
|
||||
// consensus meaningfully. nevertheless we make an attempt.
|
||||
struct Strategy<C: Context> {
|
||||
nodes: usize,
|
||||
max_faulty: usize,
|
||||
fetching_proposal: Option<C::Proposal>,
|
||||
round_timeout: future::Fuse<C::RoundTimeout>,
|
||||
local_state: LocalState,
|
||||
locked: Option<Locked<C::Digest, C::Signature>>,
|
||||
notable_candidates: HashMap<C::Digest, C::Candidate>,
|
||||
current_accumulator: Accumulator<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
|
||||
future_accumulator: Accumulator<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
|
||||
local_id: C::ValidatorId,
|
||||
}
|
||||
|
||||
impl<C: Context> Strategy<C> {
|
||||
fn create(context: &C, nodes: usize, max_faulty: usize) -> Self {
|
||||
let timeout = context.begin_round_timeout(0);
|
||||
let threshold = bft_threshold(nodes, max_faulty);
|
||||
|
||||
let current_accumulator = Accumulator::new(
|
||||
0,
|
||||
threshold,
|
||||
context.round_proposer(0),
|
||||
);
|
||||
|
||||
let future_accumulator = Accumulator::new(
|
||||
1,
|
||||
threshold,
|
||||
context.round_proposer(1),
|
||||
);
|
||||
|
||||
Strategy {
|
||||
nodes,
|
||||
max_faulty,
|
||||
current_accumulator,
|
||||
future_accumulator,
|
||||
fetching_proposal: None,
|
||||
local_state: LocalState::Start,
|
||||
locked: None,
|
||||
notable_candidates: HashMap::new(),
|
||||
round_timeout: timeout.fuse(),
|
||||
local_id: context.local_id(),
|
||||
}
|
||||
}
|
||||
|
||||
fn import_message(
|
||||
&mut self,
|
||||
msg: LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>
|
||||
) {
|
||||
let round_number = msg.message.round_number();
|
||||
|
||||
if round_number == self.current_accumulator.round_number() {
|
||||
self.current_accumulator.import_message(msg);
|
||||
} else if round_number == self.future_accumulator.round_number() {
|
||||
self.future_accumulator.import_message(msg);
|
||||
}
|
||||
}
|
||||
|
||||
fn import_lock_proof(
|
||||
&mut self,
|
||||
context: &C,
|
||||
justification: PrepareJustification<C::Digest, C::Signature>,
|
||||
) {
|
||||
// TODO: find a way to avoid processing of the signatures if the sender is
|
||||
// not the primary or the round number is low.
|
||||
if justification.round_number > self.current_accumulator.round_number() {
|
||||
// jump ahead to the prior round as this is an indication of a supermajority
|
||||
// good nodes being at least on that round.
|
||||
self.advance_to_round(context, justification.round_number);
|
||||
}
|
||||
|
||||
let lock_to_new = self.locked.as_ref()
|
||||
.map_or(true, |l| l.justification.round_number < justification.round_number);
|
||||
|
||||
if lock_to_new {
|
||||
self.locked = Some(Locked { justification })
|
||||
}
|
||||
}
|
||||
|
||||
// poll the strategy: this will queue messages to be sent and advance
|
||||
// rounds if necessary.
|
||||
//
|
||||
// only call within the context of a `Task`.
|
||||
fn poll<E>(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
|
||||
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
|
||||
where
|
||||
C::RoundTimeout: Future<Error=E>,
|
||||
C::Proposal: Future<Error=E>,
|
||||
{
|
||||
let mut last_watermark = (
|
||||
self.current_accumulator.round_number(),
|
||||
self.local_state
|
||||
);
|
||||
|
||||
// poll until either completion or state doesn't change.
|
||||
loop {
|
||||
match self.poll_once(context, sending)? {
|
||||
Async::Ready(x) => return Ok(Async::Ready(x)),
|
||||
Async::NotReady => {
|
||||
let new_watermark = (
|
||||
self.current_accumulator.round_number(),
|
||||
self.local_state
|
||||
);
|
||||
|
||||
if new_watermark == last_watermark {
|
||||
return Ok(Async::NotReady)
|
||||
} else {
|
||||
last_watermark = new_watermark;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// perform one round of polling: attempt to broadcast messages and change the state.
|
||||
// if the round or internal round-state changes, this should be called again.
|
||||
fn poll_once<E>(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
|
||||
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
|
||||
where
|
||||
C::RoundTimeout: Future<Error=E>,
|
||||
C::Proposal: Future<Error=E>,
|
||||
{
|
||||
self.propose(context, sending)?;
|
||||
self.prepare(context, sending);
|
||||
self.commit(context, sending);
|
||||
self.vote_advance(context, sending)?;
|
||||
|
||||
let advance = match self.current_accumulator.state() {
|
||||
&State::Advanced(ref p_just) => {
|
||||
// lock to any witnessed prepare justification.
|
||||
if let Some(p_just) = p_just.as_ref() {
|
||||
self.locked = Some(Locked { justification: p_just.clone() });
|
||||
}
|
||||
|
||||
let round_number = self.current_accumulator.round_number();
|
||||
Some(round_number + 1)
|
||||
}
|
||||
&State::Committed(ref just) => {
|
||||
// fetch the agreed-upon candidate:
|
||||
// - we may not have received the proposal in the first place
|
||||
// - there is no guarantee that the proposal we got was agreed upon
|
||||
// (can happen if faulty primary)
|
||||
// - look in the candidates of prior rounds just in case.
|
||||
let candidate = self.current_accumulator
|
||||
.proposal()
|
||||
.and_then(|c| if context.candidate_digest(c) == just.digest {
|
||||
Some(c.clone())
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.or_else(|| self.notable_candidates.get(&just.digest).cloned());
|
||||
|
||||
let committed = Committed {
|
||||
candidate,
|
||||
justification: just.clone()
|
||||
};
|
||||
|
||||
return Ok(Async::Ready(committed))
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if let Some(new_round) = advance {
|
||||
self.advance_to_round(context, new_round);
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
|
||||
fn propose(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
|
||||
-> Result<(), <C::Proposal as Future>::Error>
|
||||
{
|
||||
if let LocalState::Start = self.local_state {
|
||||
let mut propose = false;
|
||||
if let &State::Begin = self.current_accumulator.state() {
|
||||
let round_number = self.current_accumulator.round_number();
|
||||
let primary = context.round_proposer(round_number);
|
||||
propose = self.local_id == primary;
|
||||
};
|
||||
|
||||
if !propose { return Ok(()) }
|
||||
|
||||
// obtain the proposal to broadcast.
|
||||
let proposal = match self.locked {
|
||||
Some(ref locked) => {
|
||||
// TODO: it's possible but very unlikely that we don't have the
|
||||
// corresponding proposal for what we are locked to.
|
||||
//
|
||||
// since this is an edge case on an edge case, it is fine
|
||||
// to eat the round timeout for now, but it can be optimized by
|
||||
// broadcasting an advance vote.
|
||||
self.notable_candidates.get(locked.digest()).cloned()
|
||||
}
|
||||
None => {
|
||||
let res = self.fetching_proposal
|
||||
.get_or_insert_with(|| context.proposal())
|
||||
.poll()?;
|
||||
|
||||
match res {
|
||||
Async::Ready(p) => Some(p),
|
||||
Async::NotReady => None,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(proposal) = proposal {
|
||||
self.fetching_proposal = None;
|
||||
|
||||
let message = Message::Propose(
|
||||
self.current_accumulator.round_number(),
|
||||
proposal
|
||||
);
|
||||
|
||||
self.import_and_send_message(message, context, sending);
|
||||
|
||||
// broadcast the justification along with the proposal if we are locked.
|
||||
if let Some(ref locked) = self.locked {
|
||||
sending.push(
|
||||
ContextCommunication(Communication::Auxiliary(locked.justification.clone()))
|
||||
);
|
||||
}
|
||||
|
||||
self.local_state = LocalState::Proposed;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>) {
|
||||
// prepare only upon start or having proposed.
|
||||
match self.local_state {
|
||||
LocalState::Start | LocalState::Proposed => {},
|
||||
_ => return
|
||||
};
|
||||
|
||||
let mut prepare_for = None;
|
||||
|
||||
// we can't prepare until something was proposed.
|
||||
if let &State::Proposed(ref candidate) = self.current_accumulator.state() {
|
||||
let digest = context.candidate_digest(candidate);
|
||||
|
||||
// vote to prepare only if we believe the candidate to be valid and
|
||||
// we are not locked on some other candidate.
|
||||
match self.locked {
|
||||
Some(ref locked) if locked.digest() != &digest => {}
|
||||
Some(_) => {
|
||||
// don't check validity if we are locked.
|
||||
// this is necessary to preserve the liveness property.
|
||||
prepare_for = Some(digest);
|
||||
}
|
||||
None => if context.candidate_valid(candidate) {
|
||||
prepare_for = Some(digest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(digest) = prepare_for {
|
||||
let message = Message::Prepare(
|
||||
self.current_accumulator.round_number(),
|
||||
digest
|
||||
);
|
||||
|
||||
self.import_and_send_message(message, context, sending);
|
||||
self.local_state = LocalState::Prepared;
|
||||
}
|
||||
}
|
||||
|
||||
fn commit(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>) {
|
||||
// commit only if we haven't voted to advance or committed already
|
||||
match self.local_state {
|
||||
LocalState::Committed | LocalState::VoteAdvance => return,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let mut commit_for = None;
|
||||
|
||||
if let &State::Prepared(ref p_just) = self.current_accumulator.state() {
|
||||
// we are now locked to this prepare justification.
|
||||
let digest = p_just.digest.clone();
|
||||
self.locked = Some(Locked { justification: p_just.clone() });
|
||||
commit_for = Some(digest);
|
||||
}
|
||||
|
||||
if let Some(digest) = commit_for {
|
||||
let message = Message::Commit(
|
||||
self.current_accumulator.round_number(),
|
||||
digest
|
||||
);
|
||||
|
||||
self.import_and_send_message(message, context, sending);
|
||||
self.local_state = LocalState::Committed;
|
||||
}
|
||||
}
|
||||
|
||||
fn vote_advance(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
|
||||
-> Result<(), <C::RoundTimeout as Future>::Error>
|
||||
{
|
||||
// we can vote for advancement under all circumstances unless we have already.
|
||||
if let LocalState::VoteAdvance = self.local_state { return Ok(()) }
|
||||
|
||||
// if we got f + 1 advance votes, or the timeout has fired, and we haven't
|
||||
// sent an AdvanceRound message yet, do so.
|
||||
let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty;
|
||||
|
||||
if let Async::Ready(_) = self.round_timeout.poll()? {
|
||||
attempt_advance = true;
|
||||
}
|
||||
|
||||
if attempt_advance {
|
||||
let message = Message::AdvanceRound(
|
||||
self.current_accumulator.round_number(),
|
||||
);
|
||||
|
||||
self.import_and_send_message(message, context, sending);
|
||||
self.local_state = LocalState::VoteAdvance;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn advance_to_round(&mut self, context: &C, round: usize) {
|
||||
assert!(round > self.current_accumulator.round_number());
|
||||
|
||||
let threshold = self.nodes - self.max_faulty;
|
||||
|
||||
self.fetching_proposal = None;
|
||||
self.round_timeout = context.begin_round_timeout(round).fuse();
|
||||
self.local_state = LocalState::Start;
|
||||
|
||||
let new_future = Accumulator::new(
|
||||
round + 1,
|
||||
threshold,
|
||||
context.round_proposer(round + 1),
|
||||
);
|
||||
|
||||
// when advancing from a round, store away the witnessed proposal.
|
||||
//
|
||||
// if we or other participants end up locked on that candidate,
|
||||
// we will have it.
|
||||
if let Some(proposal) = self.current_accumulator.proposal() {
|
||||
let digest = context.candidate_digest(proposal);
|
||||
self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone());
|
||||
}
|
||||
|
||||
// special case when advancing by a single round.
|
||||
if self.future_accumulator.round_number() == round {
|
||||
self.current_accumulator
|
||||
= ::std::mem::replace(&mut self.future_accumulator, new_future);
|
||||
} else {
|
||||
self.future_accumulator = new_future;
|
||||
self.current_accumulator = Accumulator::new(
|
||||
round,
|
||||
threshold,
|
||||
context.round_proposer(round),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn import_and_send_message(
|
||||
&mut self,
|
||||
message: Message<C::Candidate, C::Digest>,
|
||||
context: &C,
|
||||
sending: &mut Sending<ContextCommunication<C>>
|
||||
) {
|
||||
let signed_message = context.sign_local(message);
|
||||
self.import_message(signed_message.clone());
|
||||
sending.push(ContextCommunication(Communication::Consensus(signed_message)));
|
||||
}
|
||||
}
|
||||
|
||||
/// Future that resolves upon BFT agreement for a candidate.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct Agreement<C: Context, I, O> {
|
||||
context: C,
|
||||
input: I,
|
||||
output: O,
|
||||
concluded: Option<Committed<C::Candidate, C::Digest, C::Signature>>,
|
||||
sending: Sending<ContextCommunication<C>>,
|
||||
strategy: Strategy<C>,
|
||||
}
|
||||
|
||||
impl<C, I, O, E> Future for Agreement<C, I, O>
|
||||
where
|
||||
C: Context,
|
||||
C::RoundTimeout: Future<Error=E>,
|
||||
C::Proposal: Future<Error=E>,
|
||||
I: Stream<Item=ContextCommunication<C>,Error=E>,
|
||||
O: Sink<SinkItem=ContextCommunication<C>,SinkError=E>,
|
||||
E: From<InputStreamConcluded>,
|
||||
{
|
||||
type Item = Committed<C::Candidate, C::Digest, C::Signature>;
|
||||
type Error = E;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
// even if we've observed the conclusion, wait until all
|
||||
// pending outgoing messages are flushed.
|
||||
if let Some(just) = self.concluded.take() {
|
||||
return Ok(match self.sending.process_all(&mut self.output)? {
|
||||
Async::Ready(()) => Async::Ready(just),
|
||||
Async::NotReady => {
|
||||
self.concluded = Some(just);
|
||||
Async::NotReady
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = match self.input.poll()? {
|
||||
Async::Ready(msg) => msg.ok_or(InputStreamConcluded)?,
|
||||
Async::NotReady => break,
|
||||
};
|
||||
|
||||
match message.0 {
|
||||
Communication::Consensus(message) => self.strategy.import_message(message),
|
||||
Communication::Auxiliary(lock_proof)
|
||||
=> self.strategy.import_lock_proof(&self.context, lock_proof),
|
||||
}
|
||||
}
|
||||
|
||||
// try to process timeouts.
|
||||
let state_machine_res = self.strategy.poll(&self.context, &mut self.sending)?;
|
||||
|
||||
// make progress on flushing all pending messages.
|
||||
let _ = self.sending.process_all(&mut self.output)?;
|
||||
|
||||
match state_machine_res {
|
||||
Async::Ready(just) => {
|
||||
self.concluded = Some(just);
|
||||
self.poll()
|
||||
}
|
||||
Async::NotReady => {
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to reach BFT agreement on a candidate.
|
||||
///
|
||||
/// `nodes` is the number of nodes in the system.
|
||||
/// `max_faulty` is the maximum number of faulty nodes. Should be less than
|
||||
/// 1/3 of `nodes`, otherwise agreement may never be reached.
|
||||
///
|
||||
/// The input stream should never logically conclude. The logic here assumes
|
||||
/// that messages flushed to the output stream will eventually reach other nodes.
|
||||
///
|
||||
/// Note that it is possible to witness agreement being reached without ever
|
||||
/// seeing the candidate. Any candidates seen will be checked for validity.
|
||||
///
|
||||
/// Although technically the agreement will always complete (given the eventual
|
||||
/// delivery of messages), in practice it is possible for this future to
|
||||
/// conclude without having witnessed the conclusion.
|
||||
/// In general, this future should be pre-empted by the import of a justification
|
||||
/// set for this block height.
|
||||
pub fn agree<C: Context, I, O>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
|
||||
-> Agreement<C, I, O>
|
||||
{
|
||||
let strategy = Strategy::create(&context, nodes, max_faulty);
|
||||
Agreement {
|
||||
context,
|
||||
input,
|
||||
output,
|
||||
concluded: None,
|
||||
sending: Sending::with_capacity(4),
|
||||
strategy: strategy,
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,412 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Tests for the candidate agreement strategy.
|
||||
|
||||
use super::*;
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::sync::{oneshot, mpsc};
|
||||
use futures::future::FutureResult;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
struct Candidate(usize);
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
struct Digest(usize);
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
struct ValidatorId(usize);
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
struct Signature(Message<Candidate, Digest>, ValidatorId);
|
||||
|
||||
struct SharedContext {
|
||||
node_count: usize,
|
||||
current_round: usize,
|
||||
awaiting_round_timeouts: HashMap<usize, Vec<oneshot::Sender<()>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Error;
|
||||
|
||||
impl From<InputStreamConcluded> for Error {
|
||||
fn from(_: InputStreamConcluded) -> Error {
|
||||
Error
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedContext {
|
||||
fn new(node_count: usize) -> Self {
|
||||
SharedContext {
|
||||
node_count,
|
||||
current_round: 0,
|
||||
awaiting_round_timeouts: HashMap::new()
|
||||
}
|
||||
}
|
||||
|
||||
fn round_timeout(&mut self, round: usize) -> Box<Future<Item=(),Error=Error>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
if round < self.current_round {
|
||||
tx.send(()).unwrap();
|
||||
} else {
|
||||
self.awaiting_round_timeouts
|
||||
.entry(round)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(tx);
|
||||
}
|
||||
|
||||
Box::new(rx.map_err(|_| Error))
|
||||
}
|
||||
|
||||
fn bump_round(&mut self) {
|
||||
let awaiting_timeout = self.awaiting_round_timeouts
|
||||
.remove(&self.current_round)
|
||||
.unwrap_or_else(Vec::new);
|
||||
|
||||
for tx in awaiting_timeout {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
|
||||
self.current_round += 1;
|
||||
}
|
||||
|
||||
fn round_proposer(&self, round: usize) -> ValidatorId {
|
||||
ValidatorId(round % self.node_count)
|
||||
}
|
||||
}
|
||||
|
||||
struct TestContext {
|
||||
local_id: ValidatorId,
|
||||
proposal: Mutex<usize>,
|
||||
shared: Arc<Mutex<SharedContext>>,
|
||||
}
|
||||
|
||||
impl Context for TestContext {
|
||||
type Candidate = Candidate;
|
||||
type Digest = Digest;
|
||||
type ValidatorId = ValidatorId;
|
||||
type Signature = Signature;
|
||||
type RoundTimeout = Box<Future<Item=(), Error=Error>>;
|
||||
type Proposal = FutureResult<Candidate, Error>;
|
||||
|
||||
fn local_id(&self) -> ValidatorId {
|
||||
self.local_id.clone()
|
||||
}
|
||||
|
||||
fn proposal(&self) -> Self::Proposal {
|
||||
let proposal = {
|
||||
let mut p = self.proposal.lock().unwrap();
|
||||
let x = *p;
|
||||
*p = (*p * 2) + 1;
|
||||
x
|
||||
};
|
||||
|
||||
Ok(Candidate(proposal)).into_future()
|
||||
}
|
||||
|
||||
fn candidate_digest(&self, candidate: &Candidate) -> Digest {
|
||||
Digest(candidate.0)
|
||||
}
|
||||
|
||||
fn sign_local(&self, message: Message<Candidate, Digest>)
|
||||
-> LocalizedMessage<Candidate, Digest, ValidatorId, Signature>
|
||||
{
|
||||
let signature = Signature(message.clone(), self.local_id.clone());
|
||||
LocalizedMessage {
|
||||
message,
|
||||
signature,
|
||||
sender: self.local_id.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn round_proposer(&self, round: usize) -> ValidatorId {
|
||||
self.shared.lock().unwrap().round_proposer(round)
|
||||
}
|
||||
|
||||
fn candidate_valid(&self, candidate: &Candidate) -> bool {
|
||||
candidate.0 % 3 != 0
|
||||
}
|
||||
|
||||
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
|
||||
self.shared.lock().unwrap().round_timeout(round)
|
||||
}
|
||||
}
|
||||
|
||||
type Comm = ContextCommunication<TestContext>;
|
||||
|
||||
struct Network {
|
||||
endpoints: Vec<mpsc::UnboundedSender<Comm>>,
|
||||
input: mpsc::UnboundedReceiver<(usize, Comm)>,
|
||||
}
|
||||
|
||||
impl Network {
|
||||
fn new(nodes: usize)
|
||||
-> (Network, Vec<mpsc::UnboundedSender<(usize, Comm)>>, Vec<mpsc::UnboundedReceiver<Comm>>)
|
||||
{
|
||||
let mut inputs = Vec::with_capacity(nodes);
|
||||
let mut outputs = Vec::with_capacity(nodes);
|
||||
let mut endpoints = Vec::with_capacity(nodes);
|
||||
|
||||
let (in_tx, in_rx) = mpsc::unbounded();
|
||||
for _ in 0..nodes {
|
||||
let (out_tx, out_rx) = mpsc::unbounded();
|
||||
inputs.push(in_tx.clone());
|
||||
outputs.push(out_rx);
|
||||
endpoints.push(out_tx);
|
||||
}
|
||||
|
||||
let network = Network {
|
||||
endpoints,
|
||||
input: in_rx,
|
||||
};
|
||||
|
||||
(network, inputs, outputs)
|
||||
}
|
||||
|
||||
fn route_on_thread(self) {
|
||||
::std::thread::spawn(move || { let _ = self.wait(); });
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Network {
|
||||
type Item = ();
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<(), Error> {
|
||||
match self.input.poll() {
|
||||
Err(_) => Err(Error),
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Ok(Async::Ready(None)) => Ok(Async::Ready(())),
|
||||
Ok(Async::Ready(Some((sender, item)))) => {
|
||||
{
|
||||
let receiving_endpoints = self.endpoints
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|&(i, _)| i != sender)
|
||||
.map(|(_, x)| x);
|
||||
|
||||
for endpoint in receiving_endpoints {
|
||||
let _ = endpoint.unbounded_send(item.clone());
|
||||
}
|
||||
}
|
||||
|
||||
self.poll()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn timeout_in(t: Duration) -> oneshot::Receiver<()> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
::std::thread::spawn(move || {
|
||||
::std::thread::sleep(t);
|
||||
let _ = tx.send(());
|
||||
});
|
||||
|
||||
rx
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn consensus_completes_with_minimum_good() {
|
||||
let node_count = 10;
|
||||
let max_faulty = 3;
|
||||
|
||||
let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count)));
|
||||
|
||||
let (network, net_send, net_recv) = Network::new(node_count);
|
||||
network.route_on_thread();
|
||||
|
||||
let nodes = net_send
|
||||
.into_iter()
|
||||
.zip(net_recv)
|
||||
.take(node_count - max_faulty)
|
||||
.enumerate()
|
||||
.map(|(i, (tx, rx))| {
|
||||
let ctx = TestContext {
|
||||
local_id: ValidatorId(i),
|
||||
proposal: Mutex::new(i),
|
||||
shared: shared_context.clone(),
|
||||
};
|
||||
|
||||
agree(
|
||||
ctx,
|
||||
node_count,
|
||||
max_faulty,
|
||||
rx.map_err(|_| Error),
|
||||
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
::std::thread::spawn(move || {
|
||||
let mut timeout = ::std::time::Duration::from_millis(50);
|
||||
loop {
|
||||
::std::thread::sleep(timeout.clone());
|
||||
shared_context.lock().unwrap().bump_round();
|
||||
timeout *= 2;
|
||||
}
|
||||
});
|
||||
|
||||
let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
|
||||
let results = ::futures::future::join_all(nodes)
|
||||
.map(Some)
|
||||
.select(timeout.map(|_| None))
|
||||
.wait()
|
||||
.map(|(i, _)| i)
|
||||
.map_err(|(e, _)| e)
|
||||
.expect("to complete")
|
||||
.expect("to not time out");
|
||||
|
||||
for result in &results {
|
||||
assert_eq!(&result.justification.digest, &results[0].justification.digest);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn consensus_does_not_complete_without_enough_nodes() {
|
||||
let node_count = 10;
|
||||
let max_faulty = 3;
|
||||
|
||||
let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count)));
|
||||
|
||||
let (network, net_send, net_recv) = Network::new(node_count);
|
||||
network.route_on_thread();
|
||||
|
||||
let nodes = net_send
|
||||
.into_iter()
|
||||
.zip(net_recv)
|
||||
.take(node_count - max_faulty - 1)
|
||||
.enumerate()
|
||||
.map(|(i, (tx, rx))| {
|
||||
let ctx = TestContext {
|
||||
local_id: ValidatorId(i),
|
||||
proposal: Mutex::new(i),
|
||||
shared: shared_context.clone(),
|
||||
};
|
||||
|
||||
agree(
|
||||
ctx,
|
||||
node_count,
|
||||
max_faulty,
|
||||
rx.map_err(|_| Error),
|
||||
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
|
||||
let result = ::futures::future::join_all(nodes)
|
||||
.map(Some)
|
||||
.select(timeout.map(|_| None))
|
||||
.wait()
|
||||
.map(|(i, _)| i)
|
||||
.map_err(|(e, _)| e)
|
||||
.expect("to complete");
|
||||
|
||||
assert!(result.is_none(), "not enough online nodes");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
|
||||
let node_count = 10;
|
||||
let max_faulty = 3;
|
||||
|
||||
let locked_proposal = Candidate(999_999_999);
|
||||
let locked_digest = Digest(999_999_999);
|
||||
let locked_round = 1;
|
||||
let justification = UncheckedJustification {
|
||||
round_number: locked_round,
|
||||
digest: locked_digest.clone(),
|
||||
signatures: (0..7)
|
||||
.map(|i| Signature(Message::Prepare(locked_round, locked_digest.clone()), ValidatorId(i)))
|
||||
.collect()
|
||||
}.check(7, |_, _, s| Some(s.1.clone())).unwrap();
|
||||
|
||||
let mut shared_context = SharedContext::new(node_count);
|
||||
shared_context.current_round = locked_round + 1;
|
||||
let shared_context = Arc::new(Mutex::new(shared_context));
|
||||
|
||||
let (network, net_send, net_recv) = Network::new(node_count);
|
||||
network.route_on_thread();
|
||||
|
||||
let nodes = net_send
|
||||
.into_iter()
|
||||
.zip(net_recv)
|
||||
.enumerate()
|
||||
.map(|(i, (tx, rx))| {
|
||||
let ctx = TestContext {
|
||||
local_id: ValidatorId(i),
|
||||
proposal: Mutex::new(i),
|
||||
shared: shared_context.clone(),
|
||||
};
|
||||
|
||||
let mut agreement = agree(
|
||||
ctx,
|
||||
node_count,
|
||||
max_faulty,
|
||||
rx.map_err(|_| Error),
|
||||
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
|
||||
);
|
||||
|
||||
agreement.strategy.advance_to_round(
|
||||
&agreement.context,
|
||||
locked_round + 1
|
||||
);
|
||||
|
||||
if i <= max_faulty {
|
||||
agreement.strategy.locked = Some(Locked {
|
||||
justification: justification.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
if i == max_faulty {
|
||||
agreement.strategy.notable_candidates.insert(
|
||||
locked_digest.clone(),
|
||||
locked_proposal.clone(),
|
||||
);
|
||||
}
|
||||
|
||||
agreement
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
::std::thread::spawn(move || {
|
||||
let mut timeout = ::std::time::Duration::from_millis(50);
|
||||
loop {
|
||||
::std::thread::sleep(timeout.clone());
|
||||
shared_context.lock().unwrap().bump_round();
|
||||
timeout *= 2;
|
||||
}
|
||||
});
|
||||
|
||||
let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
|
||||
let results = ::futures::future::join_all(nodes)
|
||||
.map(Some)
|
||||
.select(timeout.map(|_| None))
|
||||
.wait()
|
||||
.map(|(i, _)| i)
|
||||
.map_err(|(e, _)| e)
|
||||
.expect("to complete")
|
||||
.expect("to not time out");
|
||||
|
||||
for result in &results {
|
||||
assert_eq!(&result.justification.digest, &locked_digest);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Propagation and agreement of candidates.
|
||||
//!
|
||||
//! Validators are split into groups by parachain, and each validator might come
|
||||
//! up its own candidate for their parachain. Within groups, validators pass around
|
||||
//! their candidates and produce statements of validity.
|
||||
//!
|
||||
//! Any candidate that receives majority approval by the validators in a group
|
||||
//! may be subject to inclusion, unless any validators flag that candidate as invalid.
|
||||
//!
|
||||
//! Wrongly flagging as invalid should be strongly disincentivized, so that in the
|
||||
//! equilibrium state it is not expected to happen. Likewise with the submission
|
||||
//! of invalid blocks.
|
||||
//!
|
||||
//! Groups themselves may be compromised by malicious validators.
|
||||
|
||||
extern crate futures;
|
||||
extern crate polkadot_primitives as primitives;
|
||||
|
||||
pub mod bft;
|
||||
pub mod table;
|
||||
@@ -0,0 +1,999 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! The statement table.
|
||||
//!
|
||||
//! This stores messages other validators issue about candidates.
|
||||
//!
|
||||
//! These messages are used to create a proposal submitted to a BFT consensus process.
|
||||
//!
|
||||
//! Proposals are formed of sets of candidates which have the requisite number of
|
||||
//! validity and availability votes.
|
||||
//!
|
||||
//! Each parachain is associated with two sets of validators: those which can
|
||||
//! propose and attest to validity of candidates, and those who can only attest
|
||||
//! to availability.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::hash_map::{HashMap, Entry};
|
||||
use std::hash::Hash;
|
||||
use std::fmt::Debug;
|
||||
|
||||
/// Context for the statement table.
|
||||
pub trait Context {
|
||||
/// A validator ID
|
||||
type ValidatorId: Hash + Eq + Clone + Debug;
|
||||
/// The digest (hash or other unique attribute) of a candidate.
|
||||
type Digest: Hash + Eq + Clone + Debug;
|
||||
/// Candidate type.
|
||||
type Candidate: Ord + Eq + Clone + Debug;
|
||||
/// The group ID type
|
||||
type GroupId: Hash + Ord + Eq + Clone + Debug;
|
||||
/// A signature type.
|
||||
type Signature: Eq + Clone + Debug;
|
||||
|
||||
/// get the digest of a candidate.
|
||||
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
|
||||
|
||||
/// get the group of a candidate.
|
||||
fn candidate_group(&self, candidate: &Self::Candidate) -> Self::GroupId;
|
||||
|
||||
/// Whether a validator is a member of a group.
|
||||
/// Members are meant to submit candidates and vote on validity.
|
||||
fn is_member_of(&self, validator: &Self::ValidatorId, group: &Self::GroupId) -> bool;
|
||||
|
||||
/// Whether a validator is an availability guarantor of a group.
|
||||
/// Guarantors are meant to vote on availability for candidates submitted
|
||||
/// in a group.
|
||||
fn is_availability_guarantor_of(
|
||||
&self,
|
||||
validator: &Self::ValidatorId,
|
||||
group: &Self::GroupId,
|
||||
) -> bool;
|
||||
|
||||
// requisite number of votes for validity and availability respectively from a group.
|
||||
fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize);
|
||||
}
|
||||
|
||||
/// Statements circulated among peers.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum Statement<C, D> {
|
||||
/// Broadcast by a validator to indicate that this is his candidate for
|
||||
/// inclusion.
|
||||
///
|
||||
/// Broadcasting two different candidate messages per round is not allowed.
|
||||
Candidate(C),
|
||||
/// Broadcast by a validator to attest that the candidate with given digest
|
||||
/// is valid.
|
||||
Valid(D),
|
||||
/// Broadcast by a validator to attest that the auxiliary data for a candidate
|
||||
/// with given digest is available.
|
||||
Available(D),
|
||||
/// Broadcast by a validator to attest that the candidate with given digest
|
||||
/// is invalid.
|
||||
Invalid(D),
|
||||
}
|
||||
|
||||
/// A signed statement.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub struct SignedStatement<C, D, V, S> {
|
||||
/// The statement.
|
||||
pub statement: Statement<C, D>,
|
||||
/// The signature.
|
||||
pub signature: S,
|
||||
/// The sender.
|
||||
pub sender: V,
|
||||
}
|
||||
|
||||
// A unique trace for a class of valid statements issued by a validator.
|
||||
//
|
||||
// We keep track of which statements we have received or sent to other validators
|
||||
// in order to prevent relaying the same data multiple times.
|
||||
//
|
||||
// The signature of the statement is replaced by the validator because the validator
|
||||
// is unique while signatures are not (at least under common schemes like
|
||||
// Schnorr or ECDSA).
|
||||
#[derive(Hash, PartialEq, Eq, Clone)]
|
||||
enum StatementTrace<V, D> {
|
||||
/// The candidate proposed by the validator.
|
||||
Candidate(V),
|
||||
/// A validity statement from that validator about the given digest.
|
||||
Valid(V, D),
|
||||
/// An invalidity statement from that validator about the given digest.
|
||||
Invalid(V, D),
|
||||
/// An availability statement from that validator about the given digest.
|
||||
Available(V, D),
|
||||
}
|
||||
|
||||
/// Misbehavior: voting more than one way on candidate validity.
|
||||
///
|
||||
/// Since there are three possible ways to vote, a double vote is possible in
|
||||
/// three possible combinations (unordered)
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum ValidityDoubleVote<C, D, S> {
|
||||
/// Implicit vote by issuing and explicity voting validity.
|
||||
IssuedAndValidity((C, S), (D, S)),
|
||||
/// Implicit vote by issuing and explicitly voting invalidity
|
||||
IssuedAndInvalidity((C, S), (D, S)),
|
||||
/// Direct votes for validity and invalidity
|
||||
ValidityAndInvalidity(D, S, S),
|
||||
}
|
||||
|
||||
/// Misbehavior: declaring multiple candidates.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub struct MultipleCandidates<C, S> {
|
||||
/// The first candidate seen.
|
||||
pub first: (C, S),
|
||||
/// The second candidate seen.
|
||||
pub second: (C, S),
|
||||
}
|
||||
|
||||
/// Misbehavior: submitted statement for wrong group.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub struct UnauthorizedStatement<C, D, V, S> {
|
||||
/// A signed statement which was submitted without proper authority.
|
||||
pub statement: SignedStatement<C, D, V, S>,
|
||||
}
|
||||
|
||||
/// Different kinds of misbehavior. All of these kinds of malicious misbehavior
|
||||
/// are easily provable and extremely disincentivized.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum Misbehavior<C, D, V, S> {
|
||||
/// Voted invalid and valid on validity.
|
||||
ValidityDoubleVote(ValidityDoubleVote<C, D, S>),
|
||||
/// Submitted multiple candidates.
|
||||
MultipleCandidates(MultipleCandidates<C, S>),
|
||||
/// Submitted a message withou
|
||||
UnauthorizedStatement(UnauthorizedStatement<C, D, V, S>),
|
||||
}
|
||||
|
||||
/// Fancy work-around for a type alias of context-based misbehavior
|
||||
/// without producing compiler warnings.
|
||||
pub trait ResolveMisbehavior {
|
||||
/// The misbehavior type.
|
||||
type Misbehavior;
|
||||
}
|
||||
|
||||
impl<C: Context + ?Sized> ResolveMisbehavior for C {
|
||||
type Misbehavior = Misbehavior<C::Candidate, C::Digest, C::ValidatorId, C::Signature>;
|
||||
}
|
||||
|
||||
// kinds of votes for validity
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
enum ValidityVote<S: Eq + Clone> {
|
||||
// implicit validity vote by issuing
|
||||
Issued(S),
|
||||
// direct validity vote
|
||||
Valid(S),
|
||||
// direct invalidity vote
|
||||
Invalid(S),
|
||||
}
|
||||
|
||||
/// A summary of import of a statement.
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
pub struct Summary<D, G> {
|
||||
/// The digest of the candidate referenced.
|
||||
pub candidate: D,
|
||||
/// The group that candidate is in.
|
||||
pub group_id: G,
|
||||
/// How many validity votes are currently witnessed.
|
||||
pub validity_votes: usize,
|
||||
/// How many availability votes are currently witnessed.
|
||||
pub availability_votes: usize,
|
||||
/// Whether this has been signalled bad by at least one participant.
|
||||
pub signalled_bad: bool,
|
||||
}
|
||||
|
||||
/// Stores votes and data about a candidate.
|
||||
pub struct CandidateData<C: Context> {
|
||||
group_id: C::GroupId,
|
||||
candidate: C::Candidate,
|
||||
validity_votes: HashMap<C::ValidatorId, ValidityVote<C::Signature>>,
|
||||
availability_votes: HashMap<C::ValidatorId, C::Signature>,
|
||||
indicated_bad_by: Vec<C::ValidatorId>,
|
||||
}
|
||||
|
||||
impl<C: Context> CandidateData<C> {
|
||||
/// whether this has been indicated bad by anyone.
|
||||
pub fn indicated_bad(&self) -> bool {
|
||||
!self.indicated_bad_by.is_empty()
|
||||
}
|
||||
|
||||
/// Get an iterator over those who have indicated this candidate valid.
|
||||
// TODO: impl trait
|
||||
pub fn voted_valid_by<'a>(&'a self) -> Box<Iterator<Item=C::ValidatorId> + 'a> {
|
||||
Box::new(self.validity_votes.iter().filter_map(|(v, vote)| {
|
||||
match *vote {
|
||||
ValidityVote::Issued(_) | ValidityVote::Valid(_) => Some(v.clone()),
|
||||
ValidityVote::Invalid(_) => None,
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
// Candidate data can be included in a proposal
|
||||
// if it has enough validity and availability votes
|
||||
// and no validators have called it bad.
|
||||
fn can_be_included(&self, validity_threshold: usize, availability_threshold: usize) -> bool {
|
||||
self.indicated_bad_by.is_empty()
|
||||
&& self.validity_votes.len() >= validity_threshold
|
||||
&& self.availability_votes.len() >= availability_threshold
|
||||
}
|
||||
|
||||
fn summary(&self, digest: C::Digest) -> Summary<C::Digest, C::GroupId> {
|
||||
Summary {
|
||||
candidate: digest,
|
||||
group_id: self.group_id.clone(),
|
||||
validity_votes: self.validity_votes.len() - self.indicated_bad_by.len(),
|
||||
availability_votes: self.availability_votes.len(),
|
||||
signalled_bad: self.indicated_bad(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// validator metadata
|
||||
struct ValidatorData<C: Context> {
|
||||
proposal: Option<(C::Digest, C::Signature)>,
|
||||
known_statements: HashSet<StatementTrace<C::ValidatorId, C::Digest>>,
|
||||
}
|
||||
|
||||
/// Create a new, empty statement table.
|
||||
pub fn create<C: Context>() -> Table<C> {
|
||||
Table {
|
||||
validator_data: HashMap::default(),
|
||||
detected_misbehavior: HashMap::default(),
|
||||
candidate_votes: HashMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores votes
|
||||
#[derive(Default)]
|
||||
pub struct Table<C: Context> {
|
||||
validator_data: HashMap<C::ValidatorId, ValidatorData<C>>,
|
||||
detected_misbehavior: HashMap<C::ValidatorId, <C as ResolveMisbehavior>::Misbehavior>,
|
||||
candidate_votes: HashMap<C::Digest, CandidateData<C>>,
|
||||
}
|
||||
|
||||
impl<C: Context> Table<C> {
|
||||
/// Produce a set of proposed candidates.
|
||||
///
|
||||
/// This will be at most one per group, consisting of the
|
||||
/// best candidate for each group with requisite votes for inclusion.
|
||||
pub fn proposed_candidates(&self, context: &C) -> Vec<C::Candidate> {
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::btree_map::Entry as BTreeEntry;
|
||||
|
||||
let mut best_candidates = BTreeMap::new();
|
||||
for candidate_data in self.candidate_votes.values() {
|
||||
let group_id = &candidate_data.group_id;
|
||||
let (validity_t, availability_t) = context.requisite_votes(group_id);
|
||||
|
||||
if !candidate_data.can_be_included(validity_t, availability_t) { continue }
|
||||
let candidate = &candidate_data.candidate;
|
||||
match best_candidates.entry(group_id.clone()) {
|
||||
BTreeEntry::Occupied(mut occ) => {
|
||||
let candidate_ref = occ.get_mut();
|
||||
if *candidate_ref < candidate {
|
||||
*candidate_ref = candidate;
|
||||
}
|
||||
}
|
||||
BTreeEntry::Vacant(vacant) => { vacant.insert(candidate); },
|
||||
}
|
||||
}
|
||||
|
||||
best_candidates.values().map(|v| C::Candidate::clone(v)).collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Get an iterator of all candidates with a given group.
|
||||
// TODO: impl iterator
|
||||
pub fn candidates_in_group<'a>(&'a self, group_id: C::GroupId)
|
||||
-> Box<Iterator<Item=&'a CandidateData<C>> + 'a>
|
||||
{
|
||||
Box::new(self.candidate_votes.values().filter(move |c| c.group_id == group_id))
|
||||
}
|
||||
|
||||
/// Drain all misbehavior observed up to this point.
|
||||
pub fn drain_misbehavior(&mut self) -> HashMap<C::ValidatorId, <C as ResolveMisbehavior>::Misbehavior> {
|
||||
::std::mem::replace(&mut self.detected_misbehavior, HashMap::new())
|
||||
}
|
||||
|
||||
/// Import a signed statement. Signatures should be checked for validity, and the
|
||||
/// sender should be checked to actually be a validator.
|
||||
///
|
||||
/// This can note the origin of the statement to indicate that he has
|
||||
/// seen it already.
|
||||
pub fn import_statement(
|
||||
&mut self,
|
||||
context: &C,
|
||||
statement: SignedStatement<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
|
||||
from: Option<C::ValidatorId>
|
||||
) -> Option<Summary<C::Digest, C::GroupId>> {
|
||||
let SignedStatement { statement, signature, sender: signer } = statement;
|
||||
|
||||
let trace = match statement {
|
||||
Statement::Candidate(_) => StatementTrace::Candidate(signer.clone()),
|
||||
Statement::Valid(ref d) => StatementTrace::Valid(signer.clone(), d.clone()),
|
||||
Statement::Invalid(ref d) => StatementTrace::Invalid(signer.clone(), d.clone()),
|
||||
Statement::Available(ref d) => StatementTrace::Available(signer.clone(), d.clone()),
|
||||
};
|
||||
|
||||
let (maybe_misbehavior, maybe_summary) = match statement {
|
||||
Statement::Candidate(candidate) => self.import_candidate(
|
||||
context,
|
||||
signer.clone(),
|
||||
candidate,
|
||||
signature
|
||||
),
|
||||
Statement::Valid(digest) => self.validity_vote(
|
||||
context,
|
||||
signer.clone(),
|
||||
digest,
|
||||
ValidityVote::Valid(signature),
|
||||
),
|
||||
Statement::Invalid(digest) => self.validity_vote(
|
||||
context,
|
||||
signer.clone(),
|
||||
digest,
|
||||
ValidityVote::Invalid(signature),
|
||||
),
|
||||
Statement::Available(digest) => self.availability_vote(
|
||||
context,
|
||||
signer.clone(),
|
||||
digest,
|
||||
signature,
|
||||
),
|
||||
};
|
||||
|
||||
if let Some(misbehavior) = maybe_misbehavior {
|
||||
// all misbehavior in agreement is provable and actively malicious.
|
||||
// punishments are not cumulative.
|
||||
self.detected_misbehavior.insert(signer, misbehavior);
|
||||
} else {
|
||||
if let Some(from) = from {
|
||||
self.note_trace_seen(trace.clone(), from);
|
||||
}
|
||||
|
||||
self.note_trace_seen(trace, signer);
|
||||
}
|
||||
|
||||
maybe_summary
|
||||
}
|
||||
|
||||
fn note_trace_seen(&mut self, trace: StatementTrace<C::ValidatorId, C::Digest>, known_by: C::ValidatorId) {
|
||||
self.validator_data.entry(known_by).or_insert_with(|| ValidatorData {
|
||||
proposal: None,
|
||||
known_statements: HashSet::default(),
|
||||
}).known_statements.insert(trace);
|
||||
}
|
||||
|
||||
fn import_candidate(
|
||||
&mut self,
|
||||
context: &C,
|
||||
from: C::ValidatorId,
|
||||
candidate: C::Candidate,
|
||||
signature: C::Signature,
|
||||
) -> (Option<<C as ResolveMisbehavior>::Misbehavior>, Option<Summary<C::Digest, C::GroupId>>) {
|
||||
let group = context.candidate_group(&candidate);
|
||||
if !context.is_member_of(&from, &group) {
|
||||
return (
|
||||
Some(Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
|
||||
statement: SignedStatement {
|
||||
signature,
|
||||
statement: Statement::Candidate(candidate),
|
||||
sender: from,
|
||||
},
|
||||
})),
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
// check that validator hasn't already specified another candidate.
|
||||
let digest = context.candidate_digest(&candidate);
|
||||
|
||||
let new_proposal = match self.validator_data.entry(from.clone()) {
|
||||
Entry::Occupied(mut occ) => {
|
||||
// if digest is different, fetch candidate and
|
||||
// note misbehavior.
|
||||
let existing = occ.get_mut();
|
||||
|
||||
if let Some((ref old_digest, ref old_sig)) = existing.proposal {
|
||||
if old_digest != &digest {
|
||||
const EXISTENCE_PROOF: &str =
|
||||
"when proposal first received from validator, candidate \
|
||||
votes entry is created. proposal here is `Some`, therefore \
|
||||
candidate votes entry exists; qed";
|
||||
|
||||
let old_candidate = self.candidate_votes.get(old_digest)
|
||||
.expect(EXISTENCE_PROOF)
|
||||
.candidate
|
||||
.clone();
|
||||
|
||||
return (
|
||||
Some(Misbehavior::MultipleCandidates(MultipleCandidates {
|
||||
first: (old_candidate, old_sig.clone()),
|
||||
second: (candidate, signature.clone()),
|
||||
})),
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
false
|
||||
} else {
|
||||
existing.proposal = Some((digest.clone(), signature.clone()));
|
||||
true
|
||||
}
|
||||
}
|
||||
Entry::Vacant(vacant) => {
|
||||
vacant.insert(ValidatorData {
|
||||
proposal: Some((digest.clone(), signature.clone())),
|
||||
known_statements: HashSet::new(),
|
||||
});
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: altering this code may affect the existence proof above. ensure it remains
|
||||
// valid.
|
||||
if new_proposal {
|
||||
self.candidate_votes.entry(digest.clone()).or_insert_with(move || CandidateData {
|
||||
group_id: group,
|
||||
candidate: candidate,
|
||||
validity_votes: HashMap::new(),
|
||||
availability_votes: HashMap::new(),
|
||||
indicated_bad_by: Vec::new(),
|
||||
});
|
||||
}
|
||||
|
||||
self.validity_vote(
|
||||
context,
|
||||
from,
|
||||
digest,
|
||||
ValidityVote::Issued(signature),
|
||||
)
|
||||
}
|
||||
|
||||
fn validity_vote(
|
||||
&mut self,
|
||||
context: &C,
|
||||
from: C::ValidatorId,
|
||||
digest: C::Digest,
|
||||
vote: ValidityVote<C::Signature>,
|
||||
) -> (Option<<C as ResolveMisbehavior>::Misbehavior>, Option<Summary<C::Digest, C::GroupId>>) {
|
||||
let votes = match self.candidate_votes.get_mut(&digest) {
|
||||
None => return (None, None), // TODO: queue up but don't get DoS'ed
|
||||
Some(votes) => votes,
|
||||
};
|
||||
|
||||
// check that this validator actually can vote in this group.
|
||||
if !context.is_member_of(&from, &votes.group_id) {
|
||||
let (sig, valid) = match vote {
|
||||
ValidityVote::Valid(s) => (s, true),
|
||||
ValidityVote::Invalid(s) => (s, false),
|
||||
ValidityVote::Issued(_) =>
|
||||
panic!("implicit issuance vote only cast from `import_candidate` after \
|
||||
checking group membership of issuer; qed"),
|
||||
};
|
||||
|
||||
return (
|
||||
Some(Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
|
||||
statement: SignedStatement {
|
||||
signature: sig,
|
||||
sender: from,
|
||||
statement: if valid {
|
||||
Statement::Valid(digest)
|
||||
} else {
|
||||
Statement::Invalid(digest)
|
||||
}
|
||||
}
|
||||
})),
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
// check for double votes.
|
||||
match votes.validity_votes.entry(from.clone()) {
|
||||
Entry::Occupied(occ) => {
|
||||
if occ.get() != &vote {
|
||||
let double_vote_proof = match (occ.get().clone(), vote) {
|
||||
(ValidityVote::Issued(iss), ValidityVote::Valid(good)) |
|
||||
(ValidityVote::Valid(good), ValidityVote::Issued(iss)) =>
|
||||
ValidityDoubleVote::IssuedAndValidity((votes.candidate.clone(), iss), (digest, good)),
|
||||
(ValidityVote::Issued(iss), ValidityVote::Invalid(bad)) |
|
||||
(ValidityVote::Invalid(bad), ValidityVote::Issued(iss)) =>
|
||||
ValidityDoubleVote::IssuedAndInvalidity((votes.candidate.clone(), iss), (digest, bad)),
|
||||
(ValidityVote::Valid(good), ValidityVote::Invalid(bad)) |
|
||||
(ValidityVote::Invalid(bad), ValidityVote::Valid(good)) =>
|
||||
ValidityDoubleVote::ValidityAndInvalidity(digest, good, bad),
|
||||
_ => {
|
||||
// this would occur if two different but valid signatures
|
||||
// on the same kind of vote occurred.
|
||||
return (None, None);
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
Some(Misbehavior::ValidityDoubleVote(double_vote_proof)),
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
return (None, None);
|
||||
}
|
||||
Entry::Vacant(vacant) => {
|
||||
if let ValidityVote::Invalid(_) = vote {
|
||||
votes.indicated_bad_by.push(from);
|
||||
}
|
||||
|
||||
vacant.insert(vote);
|
||||
}
|
||||
}
|
||||
|
||||
(None, Some(votes.summary(digest)))
|
||||
}
|
||||
|
||||
fn availability_vote(
|
||||
&mut self,
|
||||
context: &C,
|
||||
from: C::ValidatorId,
|
||||
digest: C::Digest,
|
||||
signature: C::Signature,
|
||||
) -> (Option<<C as ResolveMisbehavior>::Misbehavior>, Option<Summary<C::Digest, C::GroupId>>) {
|
||||
let votes = match self.candidate_votes.get_mut(&digest) {
|
||||
None => return (None, None), // TODO: queue up but don't get DoS'ed
|
||||
Some(votes) => votes,
|
||||
};
|
||||
|
||||
// check that this validator actually can vote in this group.
|
||||
if !context.is_availability_guarantor_of(&from, &votes.group_id) {
|
||||
return (
|
||||
Some(Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
|
||||
statement: SignedStatement {
|
||||
signature: signature.clone(),
|
||||
statement: Statement::Available(digest),
|
||||
sender: from,
|
||||
}
|
||||
})),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
votes.availability_votes.insert(from, signature);
|
||||
(None, Some(votes.summary(digest)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
|
||||
struct ValidatorId(usize);
|
||||
|
||||
#[derive(Debug, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq)]
|
||||
struct GroupId(usize);
|
||||
|
||||
// group, body
|
||||
#[derive(Debug, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq)]
|
||||
struct Candidate(usize, usize);
|
||||
|
||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
|
||||
struct Signature(usize);
|
||||
|
||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
|
||||
struct Digest(usize);
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
struct TestContext {
|
||||
// v -> (validity, availability)
|
||||
validators: HashMap<ValidatorId, (GroupId, GroupId)>
|
||||
}
|
||||
|
||||
impl Context for TestContext {
|
||||
type ValidatorId = ValidatorId;
|
||||
type Digest = Digest;
|
||||
type Candidate = Candidate;
|
||||
type GroupId = GroupId;
|
||||
type Signature = Signature;
|
||||
|
||||
fn candidate_digest(&self, candidate: &Candidate) -> Digest {
|
||||
Digest(candidate.1)
|
||||
}
|
||||
|
||||
fn candidate_group(&self, candidate: &Candidate) -> GroupId {
|
||||
GroupId(candidate.0)
|
||||
}
|
||||
|
||||
fn is_member_of(
|
||||
&self,
|
||||
validator: &ValidatorId,
|
||||
group: &GroupId
|
||||
) -> bool {
|
||||
self.validators.get(validator).map(|v| &v.0 == group).unwrap_or(false)
|
||||
}
|
||||
|
||||
fn is_availability_guarantor_of(
|
||||
&self,
|
||||
validator: &ValidatorId,
|
||||
group: &GroupId
|
||||
) -> bool {
|
||||
self.validators.get(validator).map(|v| &v.1 == group).unwrap_or(false)
|
||||
}
|
||||
|
||||
fn requisite_votes(&self, _id: &GroupId) -> (usize, usize) {
|
||||
(6, 34)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn submitting_two_candidates_is_misbehavior() {
|
||||
let context = TestContext {
|
||||
validators: {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
let mut table = create();
|
||||
let statement_a = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 100)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
|
||||
let statement_b = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 999)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
|
||||
table.import_statement(&context, statement_a, None);
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
|
||||
|
||||
table.import_statement(&context, statement_b, None);
|
||||
assert_eq!(
|
||||
table.detected_misbehavior.get(&ValidatorId(1)).unwrap(),
|
||||
&Misbehavior::MultipleCandidates(MultipleCandidates {
|
||||
first: (Candidate(2, 100), Signature(1)),
|
||||
second: (Candidate(2, 999), Signature(1)),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn submitting_candidate_from_wrong_group_is_misbehavior() {
|
||||
let context = TestContext {
|
||||
validators: {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(ValidatorId(1), (GroupId(3), GroupId(455)));
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
let mut table = create();
|
||||
let statement = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 100)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
|
||||
table.import_statement(&context, statement, None);
|
||||
|
||||
assert_eq!(
|
||||
table.detected_misbehavior.get(&ValidatorId(1)).unwrap(),
|
||||
&Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
|
||||
statement: SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 100)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
},
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unauthorized_votes() {
|
||||
let context = TestContext {
|
||||
validators: {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
|
||||
map.insert(ValidatorId(2), (GroupId(3), GroupId(222)));
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
let mut table = create();
|
||||
|
||||
let candidate_a = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 100)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
let candidate_a_digest = Digest(100);
|
||||
|
||||
let candidate_b = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(3, 987)),
|
||||
signature: Signature(2),
|
||||
sender: ValidatorId(2),
|
||||
};
|
||||
let candidate_b_digest = Digest(987);
|
||||
|
||||
table.import_statement(&context, candidate_a, None);
|
||||
table.import_statement(&context, candidate_b, None);
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2)));
|
||||
|
||||
// validator 1 votes for availability on 2's candidate.
|
||||
let bad_availability_vote = SignedStatement {
|
||||
statement: Statement::Available(candidate_b_digest.clone()),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
table.import_statement(&context, bad_availability_vote, None);
|
||||
|
||||
assert_eq!(
|
||||
table.detected_misbehavior.get(&ValidatorId(1)).unwrap(),
|
||||
&Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
|
||||
statement: SignedStatement {
|
||||
statement: Statement::Available(candidate_b_digest),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
// validator 2 votes for validity on 1's candidate.
|
||||
let bad_validity_vote = SignedStatement {
|
||||
statement: Statement::Valid(candidate_a_digest.clone()),
|
||||
signature: Signature(2),
|
||||
sender: ValidatorId(2),
|
||||
};
|
||||
table.import_statement(&context, bad_validity_vote, None);
|
||||
|
||||
assert_eq!(
|
||||
table.detected_misbehavior.get(&ValidatorId(2)).unwrap(),
|
||||
&Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
|
||||
statement: SignedStatement {
|
||||
statement: Statement::Valid(candidate_a_digest),
|
||||
signature: Signature(2),
|
||||
sender: ValidatorId(2),
|
||||
},
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validity_double_vote_is_misbehavior() {
|
||||
let context = TestContext {
|
||||
validators: {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
|
||||
map.insert(ValidatorId(2), (GroupId(2), GroupId(246)));
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
let mut table = create();
|
||||
let statement = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 100)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
let candidate_digest = Digest(100);
|
||||
|
||||
table.import_statement(&context, statement, None);
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
|
||||
|
||||
let valid_statement = SignedStatement {
|
||||
statement: Statement::Valid(candidate_digest.clone()),
|
||||
signature: Signature(2),
|
||||
sender: ValidatorId(2),
|
||||
};
|
||||
|
||||
let invalid_statement = SignedStatement {
|
||||
statement: Statement::Invalid(candidate_digest.clone()),
|
||||
signature: Signature(2),
|
||||
sender: ValidatorId(2),
|
||||
};
|
||||
|
||||
table.import_statement(&context, valid_statement, None);
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2)));
|
||||
|
||||
table.import_statement(&context, invalid_statement, None);
|
||||
|
||||
assert_eq!(
|
||||
table.detected_misbehavior.get(&ValidatorId(2)).unwrap(),
|
||||
&Misbehavior::ValidityDoubleVote(ValidityDoubleVote::ValidityAndInvalidity(
|
||||
candidate_digest,
|
||||
Signature(2),
|
||||
Signature(2),
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn issue_and_vote_is_misbehavior() {
|
||||
let context = TestContext {
|
||||
validators: {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
let mut table = create();
|
||||
let statement = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 100)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
let candidate_digest = Digest(100);
|
||||
|
||||
table.import_statement(&context, statement, None);
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
|
||||
|
||||
let extra_vote = SignedStatement {
|
||||
statement: Statement::Valid(candidate_digest.clone()),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
|
||||
table.import_statement(&context, extra_vote, None);
|
||||
assert_eq!(
|
||||
table.detected_misbehavior.get(&ValidatorId(1)).unwrap(),
|
||||
&Misbehavior::ValidityDoubleVote(ValidityDoubleVote::IssuedAndValidity(
|
||||
(Candidate(2, 100), Signature(1)),
|
||||
(Digest(100), Signature(1)),
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn candidate_can_be_included() {
|
||||
let validity_threshold = 6;
|
||||
let availability_threshold = 34;
|
||||
|
||||
let mut candidate = CandidateData::<TestContext> {
|
||||
group_id: GroupId(4),
|
||||
candidate: Candidate(4, 12345),
|
||||
validity_votes: HashMap::new(),
|
||||
availability_votes: HashMap::new(),
|
||||
indicated_bad_by: Vec::new(),
|
||||
};
|
||||
|
||||
assert!(!candidate.can_be_included(validity_threshold, availability_threshold));
|
||||
|
||||
for i in 0..validity_threshold {
|
||||
candidate.validity_votes.insert(ValidatorId(i + 100), ValidityVote::Valid(Signature(i + 100)));
|
||||
}
|
||||
|
||||
assert!(!candidate.can_be_included(validity_threshold, availability_threshold));
|
||||
|
||||
for i in 0..availability_threshold {
|
||||
candidate.availability_votes.insert(ValidatorId(i + 255), Signature(i + 255));
|
||||
}
|
||||
|
||||
assert!(candidate.can_be_included(validity_threshold, availability_threshold));
|
||||
|
||||
candidate.indicated_bad_by.push(ValidatorId(1024));
|
||||
|
||||
assert!(!candidate.can_be_included(validity_threshold, availability_threshold));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn candidate_import_gives_summary() {
|
||||
let context = TestContext {
|
||||
validators: {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
let mut table = create();
|
||||
let statement = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 100)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
|
||||
let summary = table.import_statement(&context, statement, None)
|
||||
.expect("candidate import to give summary");
|
||||
|
||||
assert_eq!(summary.candidate, Digest(100));
|
||||
assert_eq!(summary.group_id, GroupId(2));
|
||||
assert_eq!(summary.validity_votes, 1);
|
||||
assert_eq!(summary.availability_votes, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn candidate_vote_gives_summary() {
|
||||
let context = TestContext {
|
||||
validators: {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
|
||||
map.insert(ValidatorId(2), (GroupId(2), GroupId(455)));
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
let mut table = create();
|
||||
let statement = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 100)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
let candidate_digest = Digest(100);
|
||||
|
||||
table.import_statement(&context, statement, None);
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
|
||||
|
||||
let vote = SignedStatement {
|
||||
statement: Statement::Valid(candidate_digest.clone()),
|
||||
signature: Signature(2),
|
||||
sender: ValidatorId(2),
|
||||
};
|
||||
|
||||
let summary = table.import_statement(&context, vote, None)
|
||||
.expect("candidate vote to give summary");
|
||||
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2)));
|
||||
|
||||
assert_eq!(summary.candidate, Digest(100));
|
||||
assert_eq!(summary.group_id, GroupId(2));
|
||||
assert_eq!(summary.validity_votes, 2);
|
||||
assert_eq!(summary.availability_votes, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn availability_vote_gives_summary() {
|
||||
let context = TestContext {
|
||||
validators: {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
|
||||
map.insert(ValidatorId(2), (GroupId(5), GroupId(2)));
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
let mut table = create();
|
||||
let statement = SignedStatement {
|
||||
statement: Statement::Candidate(Candidate(2, 100)),
|
||||
signature: Signature(1),
|
||||
sender: ValidatorId(1),
|
||||
};
|
||||
let candidate_digest = Digest(100);
|
||||
|
||||
table.import_statement(&context, statement, None);
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
|
||||
|
||||
let vote = SignedStatement {
|
||||
statement: Statement::Available(candidate_digest.clone()),
|
||||
signature: Signature(2),
|
||||
sender: ValidatorId(2),
|
||||
};
|
||||
|
||||
let summary = table.import_statement(&context, vote, None)
|
||||
.expect("candidate vote to give summary");
|
||||
|
||||
assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2)));
|
||||
|
||||
assert_eq!(summary.candidate, Digest(100));
|
||||
assert_eq!(summary.group_id, GroupId(2));
|
||||
assert_eq!(summary.validity_votes, 1);
|
||||
assert_eq!(summary.availability_votes, 1);
|
||||
}
|
||||
}
|
||||
@@ -49,6 +49,25 @@ pub struct Candidate {
|
||||
pub block: BlockData,
|
||||
}
|
||||
|
||||
/// Candidate receipt type.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct CandidateReceipt {
|
||||
/// The ID of the parachain this is a candidate for.
|
||||
pub parachain_index: Id,
|
||||
/// The collator's account ID
|
||||
pub collator: ::Address,
|
||||
/// The head-data
|
||||
pub head_data: HeadData,
|
||||
/// Balance uploads to the relay chain.
|
||||
pub balance_uploads: Vec<(::Address, ::uint::U256)>,
|
||||
/// Egress queue roots.
|
||||
pub egress_queue_roots: Vec<(Id, ::hash::H256)>,
|
||||
/// Fees paid from the chain to the relay chain validators
|
||||
pub fees: ::uint::U256,
|
||||
}
|
||||
|
||||
/// Parachain ingress queue message.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct Message(#[serde(with="bytes")] pub Vec<u8>);
|
||||
@@ -57,7 +76,7 @@ pub struct Message(#[serde(with="bytes")] pub Vec<u8>);
|
||||
///
|
||||
/// This is just an ordered vector of other parachains' egress queues,
|
||||
/// obtained according to the routing rules.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct ConsolidatedIngress(pub Vec<(Id, Vec<Message>)>);
|
||||
|
||||
/// Parachain block data.
|
||||
@@ -71,7 +90,7 @@ pub struct BlockData(#[serde(with="bytes")] pub Vec<u8>);
|
||||
pub struct Header(#[serde(with="bytes")] pub Vec<u8>);
|
||||
|
||||
/// Parachain head data included in the chain.
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct HeadData(#[serde(with="bytes")] pub Vec<u8>);
|
||||
|
||||
/// Parachain validation code.
|
||||
@@ -92,10 +111,10 @@ mod tests {
|
||||
assert_eq!(ser::to_string_pretty(&Candidate {
|
||||
parachain_index: 5.into(),
|
||||
collator_signature: 10.into(),
|
||||
unprocessed_ingress: vec![
|
||||
(1, vec![Message(vec![2])]),
|
||||
(2, vec![Message(vec![2]), Message(vec![3])]),
|
||||
],
|
||||
unprocessed_ingress: ConsolidatedIngress(vec![
|
||||
(Id(1), vec![Message(vec![2])]),
|
||||
(Id(2), vec![Message(vec![2]), Message(vec![3])]),
|
||||
]),
|
||||
block: BlockData(vec![1, 2, 3]),
|
||||
}), r#"{
|
||||
"parachainIndex": 5,
|
||||
|
||||
Reference in New Issue
Block a user