From 43b68b52bf5ed6a8a882133d0ff03e03f655ef6c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 29 Dec 2017 14:26:41 +0100 Subject: [PATCH] address some review grumbles --- .../src/bft/accumulator.rs | 118 +++++++++--------- substrate/candidate-agreement/src/bft/mod.rs | 15 ++- .../candidate-agreement/src/bft/tests.rs | 10 +- 3 files changed, 75 insertions(+), 68 deletions(-) diff --git a/substrate/candidate-agreement/src/bft/accumulator.rs b/substrate/candidate-agreement/src/bft/accumulator.rs index 3bd5147e98..42e5459392 100644 --- a/substrate/candidate-agreement/src/bft/accumulator.rs +++ b/substrate/candidate-agreement/src/bft/accumulator.rs @@ -22,7 +22,7 @@ use std::hash::Hash; use super::{Message, LocalizedMessage}; -/// Justification at a given round. +/// Justification for some state at a given round. #[derive(PartialEq, Eq, Debug, Clone)] pub struct Justification { /// The round. @@ -40,8 +40,10 @@ impl Justification { /// digest. /// /// The closure should return true iff the round number, digest, and signature - /// represent a valid prepare message and the signer was authorized to issue + /// represent a valid message and the signer was authorized to issue /// it. + /// + /// The `check_message` closure may vary based on context. pub fn check(&self, threshold: usize, check_message: F) -> bool where F: Fn(usize, &D, &S) -> Option, @@ -49,22 +51,18 @@ impl Justification { { let mut voted = HashSet::new(); - let mut good = false; for signature in &self.signatures { match check_message(self.round_number, &self.digest, signature) { None => return false, Some(v) => { if !voted.insert(v) { return false; - } else if voted.len() >= threshold { - // don't return just yet since later signatures may be invalid. - good = true; } } } } - good + voted.len() >= threshold } } @@ -73,48 +71,54 @@ pub type PrepareJustification = Justification; /// The round's state, based on imported messages. #[derive(PartialEq, Eq, Debug)] -pub enum State { +pub enum State { /// No proposal yet. Begin, /// Proposal received. - Proposed(C), - /// Seen 2f + 1 prepares for this digest. - Prepared(PrepareJustification), - /// Seen 2f + 1 commits for a digest. - Committed(Justification), - /// Seen 2f + 1 round-advancement messages. - Advanced(Option>), + Proposed(Candidate), + /// Seen n - f prepares for this digest. + Prepared(PrepareJustification), + /// Seen n - f commits for a digest. + Committed(Justification), + /// Seen n - f round-advancement messages. + Advanced(Option>), +} + +#[derive(Debug, Default)] +struct VoteCounts { + prepared: usize, + committed: usize, } /// Accumulates messages for a given round of BFT consensus. #[derive(Debug)] -pub struct Accumulator +pub struct Accumulator where - C: Eq + Clone, - D: Hash + Eq + Clone, - V: Hash + Eq, - S: Eq + Clone, + Candidate: Eq + Clone, + Digest: Hash + Eq + Clone, + ValidatorId: Hash + Eq, + Signature: Eq + Clone, { round_number: usize, threshold: usize, - round_proposer: V, - proposal: Option, - prepares: HashMap, - commits: HashMap, - vote_counts: HashMap, - advance_round: HashSet, - state: State, + round_proposer: ValidatorId, + proposal: Option, + prepares: HashMap, + commits: HashMap, + vote_counts: HashMap, + advance_round: HashSet, + state: State, } -impl Accumulator +impl Accumulator where - C: Eq + Clone, - D: Hash + Eq + Clone, - V: Hash + Eq, - S: Eq + Clone, + Candidate: Eq + Clone, + Digest: Hash + Eq + Clone, + ValidatorId: Hash + Eq, + Signature: Eq + Clone, { /// Create a new state accumulator. - pub fn new(round_number: usize, threshold: usize, round_proposer: V) -> Self { + pub fn new(round_number: usize, threshold: usize, round_proposer: ValidatorId) -> Self { Accumulator { round_number, threshold, @@ -139,16 +143,16 @@ impl Accumulator } /// Get the round proposer. - pub fn round_proposer(&self) -> &V { + pub fn round_proposer(&self) -> &ValidatorId { &self.round_proposer } - pub fn proposal(&self) -> Option<&C> { + pub fn proposal(&self) -> Option<&Candidate> { self.proposal.as_ref() } /// Inspect the current consensus state. - pub fn state(&self) -> &State { + pub fn state(&self) -> &State { &self.state } @@ -156,10 +160,10 @@ impl Accumulator /// and authorization should have already been checked. pub fn import_message( &mut self, - message: LocalizedMessage, + message: LocalizedMessage, ) { - // old message. + // message from different round. if message.message.round_number() != self.round_number { return; } @@ -176,8 +180,8 @@ impl Accumulator fn import_proposal( &mut self, - proposal: C, - sender: V, + proposal: Candidate, + sender: ValidatorId, ) { if sender != self.round_proposer || self.proposal.is_some() { return } @@ -187,19 +191,19 @@ impl Accumulator fn import_prepare( &mut self, - candidate: D, - sender: V, - signature: S, + digest: Digest, + sender: ValidatorId, + signature: Signature, ) { // ignore any subsequent prepares by the same sender. // TODO: if digest is different, that's misbehavior. let prepared_for = if let Entry::Vacant(vacant) = self.prepares.entry(sender) { - vacant.insert((candidate.clone(), signature)); - let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0)); - count.0 += 1; + vacant.insert((digest.clone(), signature)); + let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default); + count.prepared += 1; - if count.0 == self.threshold { - Some(candidate) + if count.prepared == self.threshold { + Some(digest) } else { None } @@ -230,19 +234,19 @@ impl Accumulator fn import_commit( &mut self, - candidate: D, - sender: V, - signature: S, + digest: Digest, + sender: ValidatorId, + signature: Signature, ) { // ignore any subsequent commits by the same sender. // TODO: if digest is different, that's misbehavior. let committed_for = if let Entry::Vacant(vacant) = self.commits.entry(sender) { - vacant.insert((candidate.clone(), signature)); - let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0)); - count.1 += 1; + vacant.insert((digest.clone(), signature)); + let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default); + count.committed += 1; - if count.1 == self.threshold { - Some(candidate) + if count.committed == self.threshold { + Some(digest) } else { None } @@ -271,7 +275,7 @@ impl Accumulator fn import_advance_round( &mut self, - sender: V, + sender: ValidatorId, ) { self.advance_round.insert(sender); diff --git a/substrate/candidate-agreement/src/bft/mod.rs b/substrate/candidate-agreement/src/bft/mod.rs index bad684820e..5c6b126a1e 100644 --- a/substrate/candidate-agreement/src/bft/mod.rs +++ b/substrate/candidate-agreement/src/bft/mod.rs @@ -34,9 +34,9 @@ pub use self::accumulator::{Accumulator, Justification, PrepareJustification}; /// Messages over the proposal. /// Each message carries an associated round number. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum Message { +pub enum Message { /// Send a full proposal. - Propose(usize, P), + Propose(usize, C), /// Prepare to vote for proposal with digest D. Prepare(usize, D), /// Commit to proposal with digest D.. @@ -45,7 +45,7 @@ pub enum Message { AdvanceRound(usize), } -impl Message { +impl Message { fn round_number(&self) -> usize { match *self { Message::Propose(round, _) => round, @@ -58,9 +58,9 @@ impl Message { /// A localized message, including the sender. #[derive(Debug, Clone)] -pub struct LocalizedMessage { +pub struct LocalizedMessage { /// The message received. - pub message: Message, + pub message: Message, /// The sender of the message pub sender: V, /// The signature of the message. @@ -68,6 +68,9 @@ pub struct LocalizedMessage { } /// Context necessary for agreement. +/// +/// Provides necessary types for protocol messages, and functions necessary for a +/// participant to evaluate and create those messages. pub trait Context { /// Candidate proposed. type Candidate: Debug + Eq + Clone; @@ -162,7 +165,7 @@ impl Sending { } } - while self.flushing { + if self.flushing { match sink.poll_complete() { Err(e) => return Err(e), Ok(Async::NotReady) => return Ok(Async::NotReady), diff --git a/substrate/candidate-agreement/src/bft/tests.rs b/substrate/candidate-agreement/src/bft/tests.rs index 2228ab2604..165166d76a 100644 --- a/substrate/candidate-agreement/src/bft/tests.rs +++ b/substrate/candidate-agreement/src/bft/tests.rs @@ -92,13 +92,13 @@ impl SharedContext { } } -struct Ctx { +struct TestContext { local_id: ValidatorId, proposal: Mutex, shared: Arc>, } -impl Context for Ctx { +impl Context for TestContext { type Candidate = Candidate; type Digest = Digest; type ValidatorId = ValidatorId; @@ -149,7 +149,7 @@ impl Context for Ctx { } } -type Comm = ContextCommunication; +type Comm = ContextCommunication; struct Network { endpoints: Vec>, @@ -239,7 +239,7 @@ fn consensus_completes_with_minimum_good() { .take(node_count - max_faulty) .enumerate() .map(|(i, (tx, rx))| { - let ctx = Ctx { + let ctx = TestContext { local_id: ValidatorId(i), proposal: Mutex::new(i), shared: shared_context.clone(), @@ -295,7 +295,7 @@ fn consensus_does_not_complete_without_enough_nodes() { .take(node_count - max_faulty - 1) .enumerate() .map(|(i, (tx, rx))| { - let ctx = Ctx { + let ctx = TestContext { local_id: ValidatorId(i), proposal: Mutex::new(i), shared: shared_context.clone(),