address some review grumbles

This commit is contained in:
Robert Habermeier
2017-12-29 14:26:41 +01:00
parent 136645aa4c
commit 43b68b52bf
3 changed files with 75 additions and 68 deletions
@@ -22,7 +22,7 @@ use std::hash::Hash;
use super::{Message, LocalizedMessage}; use super::{Message, LocalizedMessage};
/// Justification at a given round. /// Justification for some state at a given round.
#[derive(PartialEq, Eq, Debug, Clone)] #[derive(PartialEq, Eq, Debug, Clone)]
pub struct Justification<D, S> { pub struct Justification<D, S> {
/// The round. /// The round.
@@ -40,8 +40,10 @@ impl<D, S> Justification<D, S> {
/// digest. /// digest.
/// ///
/// The closure should return true iff the round number, digest, and signature /// The closure should return true iff the round number, digest, and signature
/// represent a valid prepare message and the signer was authorized to issue /// represent a valid message and the signer was authorized to issue
/// it. /// it.
///
/// The `check_message` closure may vary based on context.
pub fn check<F, V>(&self, threshold: usize, check_message: F) -> bool pub fn check<F, V>(&self, threshold: usize, check_message: F) -> bool
where where
F: Fn(usize, &D, &S) -> Option<V>, F: Fn(usize, &D, &S) -> Option<V>,
@@ -49,22 +51,18 @@ impl<D, S> Justification<D, S> {
{ {
let mut voted = HashSet::new(); let mut voted = HashSet::new();
let mut good = false;
for signature in &self.signatures { for signature in &self.signatures {
match check_message(self.round_number, &self.digest, signature) { match check_message(self.round_number, &self.digest, signature) {
None => return false, None => return false,
Some(v) => { Some(v) => {
if !voted.insert(v) { if !voted.insert(v) {
return false; return false;
} else if voted.len() >= threshold {
// don't return just yet since later signatures may be invalid.
good = true;
} }
} }
} }
} }
good voted.len() >= threshold
} }
} }
@@ -73,48 +71,54 @@ pub type PrepareJustification<D, S> = Justification<D, S>;
/// The round's state, based on imported messages. /// The round's state, based on imported messages.
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
pub enum State<C, D, S> { pub enum State<Candidate, Digest, Signature> {
/// No proposal yet. /// No proposal yet.
Begin, Begin,
/// Proposal received. /// Proposal received.
Proposed(C), Proposed(Candidate),
/// Seen 2f + 1 prepares for this digest. /// Seen n - f prepares for this digest.
Prepared(PrepareJustification<D, S>), Prepared(PrepareJustification<Digest, Signature>),
/// Seen 2f + 1 commits for a digest. /// Seen n - f commits for a digest.
Committed(Justification<D, S>), Committed(Justification<Digest, Signature>),
/// Seen 2f + 1 round-advancement messages. /// Seen n - f round-advancement messages.
Advanced(Option<PrepareJustification<D, S>>), Advanced(Option<PrepareJustification<Digest, Signature>>),
}
#[derive(Debug, Default)]
struct VoteCounts {
prepared: usize,
committed: usize,
} }
/// Accumulates messages for a given round of BFT consensus. /// Accumulates messages for a given round of BFT consensus.
#[derive(Debug)] #[derive(Debug)]
pub struct Accumulator<C, D, V, S> pub struct Accumulator<Candidate, Digest, ValidatorId, Signature>
where where
C: Eq + Clone, Candidate: Eq + Clone,
D: Hash + Eq + Clone, Digest: Hash + Eq + Clone,
V: Hash + Eq, ValidatorId: Hash + Eq,
S: Eq + Clone, Signature: Eq + Clone,
{ {
round_number: usize, round_number: usize,
threshold: usize, threshold: usize,
round_proposer: V, round_proposer: ValidatorId,
proposal: Option<C>, proposal: Option<Candidate>,
prepares: HashMap<V, (D, S)>, prepares: HashMap<ValidatorId, (Digest, Signature)>,
commits: HashMap<V, (D, S)>, commits: HashMap<ValidatorId, (Digest, Signature)>,
vote_counts: HashMap<D, (usize, usize)>, vote_counts: HashMap<Digest, VoteCounts>,
advance_round: HashSet<V>, advance_round: HashSet<ValidatorId>,
state: State<C, D, S>, state: State<Candidate, Digest, Signature>,
} }
impl<C, D, V, S> Accumulator<C, D, V, S> impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, ValidatorId, Signature>
where where
C: Eq + Clone, Candidate: Eq + Clone,
D: Hash + Eq + Clone, Digest: Hash + Eq + Clone,
V: Hash + Eq, ValidatorId: Hash + Eq,
S: Eq + Clone, Signature: Eq + Clone,
{ {
/// Create a new state accumulator. /// Create a new state accumulator.
pub fn new(round_number: usize, threshold: usize, round_proposer: V) -> Self { pub fn new(round_number: usize, threshold: usize, round_proposer: ValidatorId) -> Self {
Accumulator { Accumulator {
round_number, round_number,
threshold, threshold,
@@ -139,16 +143,16 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
} }
/// Get the round proposer. /// Get the round proposer.
pub fn round_proposer(&self) -> &V { pub fn round_proposer(&self) -> &ValidatorId {
&self.round_proposer &self.round_proposer
} }
pub fn proposal(&self) -> Option<&C> { pub fn proposal(&self) -> Option<&Candidate> {
self.proposal.as_ref() self.proposal.as_ref()
} }
/// Inspect the current consensus state. /// Inspect the current consensus state.
pub fn state(&self) -> &State<C, D, S> { pub fn state(&self) -> &State<Candidate, Digest, Signature> {
&self.state &self.state
} }
@@ -156,10 +160,10 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
/// and authorization should have already been checked. /// and authorization should have already been checked.
pub fn import_message( pub fn import_message(
&mut self, &mut self,
message: LocalizedMessage<C, D, V, S>, message: LocalizedMessage<Candidate, Digest, ValidatorId, Signature>,
) )
{ {
// old message. // message from different round.
if message.message.round_number() != self.round_number { if message.message.round_number() != self.round_number {
return; return;
} }
@@ -176,8 +180,8 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
fn import_proposal( fn import_proposal(
&mut self, &mut self,
proposal: C, proposal: Candidate,
sender: V, sender: ValidatorId,
) { ) {
if sender != self.round_proposer || self.proposal.is_some() { return } if sender != self.round_proposer || self.proposal.is_some() { return }
@@ -187,19 +191,19 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
fn import_prepare( fn import_prepare(
&mut self, &mut self,
candidate: D, digest: Digest,
sender: V, sender: ValidatorId,
signature: S, signature: Signature,
) { ) {
// ignore any subsequent prepares by the same sender. // ignore any subsequent prepares by the same sender.
// TODO: if digest is different, that's misbehavior. // TODO: if digest is different, that's misbehavior.
let prepared_for = if let Entry::Vacant(vacant) = self.prepares.entry(sender) { let prepared_for = if let Entry::Vacant(vacant) = self.prepares.entry(sender) {
vacant.insert((candidate.clone(), signature)); vacant.insert((digest.clone(), signature));
let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0)); let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default);
count.0 += 1; count.prepared += 1;
if count.0 == self.threshold { if count.prepared == self.threshold {
Some(candidate) Some(digest)
} else { } else {
None None
} }
@@ -230,19 +234,19 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
fn import_commit( fn import_commit(
&mut self, &mut self,
candidate: D, digest: Digest,
sender: V, sender: ValidatorId,
signature: S, signature: Signature,
) { ) {
// ignore any subsequent commits by the same sender. // ignore any subsequent commits by the same sender.
// TODO: if digest is different, that's misbehavior. // TODO: if digest is different, that's misbehavior.
let committed_for = if let Entry::Vacant(vacant) = self.commits.entry(sender) { let committed_for = if let Entry::Vacant(vacant) = self.commits.entry(sender) {
vacant.insert((candidate.clone(), signature)); vacant.insert((digest.clone(), signature));
let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0)); let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default);
count.1 += 1; count.committed += 1;
if count.1 == self.threshold { if count.committed == self.threshold {
Some(candidate) Some(digest)
} else { } else {
None None
} }
@@ -271,7 +275,7 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
fn import_advance_round( fn import_advance_round(
&mut self, &mut self,
sender: V, sender: ValidatorId,
) { ) {
self.advance_round.insert(sender); self.advance_round.insert(sender);
+9 -6
View File
@@ -34,9 +34,9 @@ pub use self::accumulator::{Accumulator, Justification, PrepareJustification};
/// Messages over the proposal. /// Messages over the proposal.
/// Each message carries an associated round number. /// Each message carries an associated round number.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum Message<P, D> { pub enum Message<C, D> {
/// Send a full proposal. /// Send a full proposal.
Propose(usize, P), Propose(usize, C),
/// Prepare to vote for proposal with digest D. /// Prepare to vote for proposal with digest D.
Prepare(usize, D), Prepare(usize, D),
/// Commit to proposal with digest D.. /// Commit to proposal with digest D..
@@ -45,7 +45,7 @@ pub enum Message<P, D> {
AdvanceRound(usize), AdvanceRound(usize),
} }
impl<P, D> Message<P, D> { impl<C, D> Message<C, D> {
fn round_number(&self) -> usize { fn round_number(&self) -> usize {
match *self { match *self {
Message::Propose(round, _) => round, Message::Propose(round, _) => round,
@@ -58,9 +58,9 @@ impl<P, D> Message<P, D> {
/// A localized message, including the sender. /// A localized message, including the sender.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct LocalizedMessage<T, P, V, S> { pub struct LocalizedMessage<C, D, V, S> {
/// The message received. /// The message received.
pub message: Message<T, P>, pub message: Message<C, D>,
/// The sender of the message /// The sender of the message
pub sender: V, pub sender: V,
/// The signature of the message. /// The signature of the message.
@@ -68,6 +68,9 @@ pub struct LocalizedMessage<T, P, V, S> {
} }
/// Context necessary for agreement. /// Context necessary for agreement.
///
/// Provides necessary types for protocol messages, and functions necessary for a
/// participant to evaluate and create those messages.
pub trait Context { pub trait Context {
/// Candidate proposed. /// Candidate proposed.
type Candidate: Debug + Eq + Clone; type Candidate: Debug + Eq + Clone;
@@ -162,7 +165,7 @@ impl<T> Sending<T> {
} }
} }
while self.flushing { if self.flushing {
match sink.poll_complete() { match sink.poll_complete() {
Err(e) => return Err(e), Err(e) => return Err(e),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
@@ -92,13 +92,13 @@ impl SharedContext {
} }
} }
struct Ctx { struct TestContext {
local_id: ValidatorId, local_id: ValidatorId,
proposal: Mutex<usize>, proposal: Mutex<usize>,
shared: Arc<Mutex<SharedContext>>, shared: Arc<Mutex<SharedContext>>,
} }
impl Context for Ctx { impl Context for TestContext {
type Candidate = Candidate; type Candidate = Candidate;
type Digest = Digest; type Digest = Digest;
type ValidatorId = ValidatorId; type ValidatorId = ValidatorId;
@@ -149,7 +149,7 @@ impl Context for Ctx {
} }
} }
type Comm = ContextCommunication<Ctx>; type Comm = ContextCommunication<TestContext>;
struct Network { struct Network {
endpoints: Vec<mpsc::UnboundedSender<Comm>>, endpoints: Vec<mpsc::UnboundedSender<Comm>>,
@@ -239,7 +239,7 @@ fn consensus_completes_with_minimum_good() {
.take(node_count - max_faulty) .take(node_count - max_faulty)
.enumerate() .enumerate()
.map(|(i, (tx, rx))| { .map(|(i, (tx, rx))| {
let ctx = Ctx { let ctx = TestContext {
local_id: ValidatorId(i), local_id: ValidatorId(i),
proposal: Mutex::new(i), proposal: Mutex::new(i),
shared: shared_context.clone(), shared: shared_context.clone(),
@@ -295,7 +295,7 @@ fn consensus_does_not_complete_without_enough_nodes() {
.take(node_count - max_faulty - 1) .take(node_count - max_faulty - 1)
.enumerate() .enumerate()
.map(|(i, (tx, rx))| { .map(|(i, (tx, rx))| {
let ctx = Ctx { let ctx = TestContext {
local_id: ValidatorId(i), local_id: ValidatorId(i),
proposal: Mutex::new(i), proposal: Mutex::new(i),
shared: shared_context.clone(), shared: shared_context.clone(),