From a16c06da5bedf15b5fe2b7df32f38a9ae5ba32f7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 27 Dec 2017 18:27:18 +0100 Subject: [PATCH] import and broadcast lock proofs --- substrate/candidate-agreement/src/bft/mod.rs | 98 +++++++++++++++----- 1 file changed, 77 insertions(+), 21 deletions(-) diff --git a/substrate/candidate-agreement/src/bft/mod.rs b/substrate/candidate-agreement/src/bft/mod.rs index 0c78029ecf..e2322cc38b 100644 --- a/substrate/candidate-agreement/src/bft/mod.rs +++ b/substrate/candidate-agreement/src/bft/mod.rs @@ -89,7 +89,7 @@ pub trait Context { /// Sign a message using the local validator ID. fn sign_local(&self, message: Message) - -> ContextLocalizedMessage; + -> LocalizedMessage; /// Get the proposer for a given round of consensus. fn round_proposer(&self, round: usize) -> Self::ValidatorId; @@ -103,16 +103,26 @@ pub trait Context { fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout; } +/// Communication that can occur between participants in consensus. +#[derive(Debug, Clone)] +pub enum Communication { + /// A consensus message (proposal or vote) + Message(LocalizedMessage), + /// A proof-of-lock. + Locked(PrepareJustification), +} + /// Type alias for a localized message using only type parameters from `Context`. // TODO: actual type alias when it's no longer a warning. -#[derive(Debug)] -pub struct ContextLocalizedMessage(pub LocalizedMessage); +pub struct ContextCommunication(pub Communication); -impl Clone for ContextLocalizedMessage - where LocalizedMessage: Clone +impl Clone for ContextCommunication + where + LocalizedMessage: Clone, + PrepareJustification: Clone, { fn clone(&self) -> Self { - ContextLocalizedMessage(self.0.clone()) + ContextCommunication(self.0.clone()) } } @@ -275,8 +285,10 @@ impl Strategy { } } - fn import_message(&mut self, msg: ContextLocalizedMessage) { - let msg = msg.0; + fn import_message( + &mut self, + msg: LocalizedMessage + ) { let round_number = msg.message.round_number(); if round_number == self.current_accumulator.round_number() { @@ -286,11 +298,31 @@ impl Strategy { } } + fn import_lock_proof( + &mut self, + context: &C, + justification: PrepareJustification, + ) { + // TODO: find a way to avoid processing of the signatures if the sender is + // not the primary or the round number is low. + let current_round_number = self.current_accumulator.round_number(); + if justification.round_number < current_round_number { + return + } else if justification.round_number == current_round_number { + self.locked = Some(Locked { justification }); + } else { + // jump ahead to the prior round as this is an indication of a supermajority + // good nodes being at least on that round. + self.advance_to_round(context, justification.round_number); + self.locked = Some(Locked { justification }); + } + } + // poll the strategy: this will queue messages to be sent and advance // rounds if necessary. // // only call within the context of a `Task`. - fn poll(&mut self, context: &C, sending: &mut Sending>) + fn poll(&mut self, context: &C, sending: &mut Sending>) -> Poll, E> where C::RoundTimeout: Future, @@ -312,7 +344,20 @@ impl Strategy { Some(round_number + 1) } &State::Committed(ref just) => { - let candidate = self.notable_candidates.get(&just.digest).cloned(); + // fetch the agreed-upon candidate: + // - we may not have received the proposal in the first place + // - there is no guarantee that the proposal we got was agreed upon + // (can happen if faulty primary) + // - look in the candidates of prior rounds just in case. + let candidate = self.current_accumulator + .proposal() + .and_then(|c| if context.candidate_digest(c) == just.digest { + Some(c.clone()) + } else { + None + }) + .or_else(|| self.notable_candidates.get(&just.digest).cloned()); + let committed = Committed { candidate, justification: just.clone() @@ -330,7 +375,7 @@ impl Strategy { Ok(Async::NotReady) } - fn propose(&mut self, context: &C, sending: &mut Sending>) + fn propose(&mut self, context: &C, sending: &mut Sending>) -> Result<(), ::Error> { if let LocalState::Start = self.local_state { @@ -375,15 +420,22 @@ impl Strategy { ); self.import_and_send_message(message, context, sending); + + // broadcast the justification along with the proposal if we are locked. + if let Some(ref locked) = self.locked { + sending.push( + ContextCommunication(Communication::Locked(locked.justification.clone())) + ); + } + self.local_state = LocalState::Proposed; } - } Ok(()) } - fn prepare(&mut self, context: &C, sending: &mut Sending>) { + fn prepare(&mut self, context: &C, sending: &mut Sending>) { // prepare only upon start or having proposed. match self.local_state { LocalState::Start | LocalState::Proposed => {}, @@ -419,7 +471,7 @@ impl Strategy { } } - fn commit(&mut self, context: &C, sending: &mut Sending>) { + fn commit(&mut self, context: &C, sending: &mut Sending>) { // commit only if we haven't voted to advance or committed already match self.local_state { LocalState::Committed | LocalState::VoteAdvance => return, @@ -446,7 +498,7 @@ impl Strategy { } } - fn vote_advance(&mut self, context: &C, sending: &mut Sending>) + fn vote_advance(&mut self, context: &C, sending: &mut Sending>) -> Result<(), ::Error> { // we can vote for advancement under all circumstances unless we have already. @@ -526,11 +578,11 @@ impl Strategy { &mut self, message: Message, context: &C, - sending: &mut Sending> + sending: &mut Sending> ) { let signed_message = context.sign_local(message); self.import_message(signed_message.clone()); - sending.push(signed_message); + sending.push(ContextCommunication(Communication::Message(signed_message))); } } @@ -541,7 +593,7 @@ pub struct Agreement { input: I, output: O, concluded: Option>, - sending: Sending>, + sending: Sending>, strategy: Strategy, } @@ -550,8 +602,8 @@ impl Future for Agreement C: Context, C::RoundTimeout: Future, C::Proposal: Future, - I: Stream,Error=E>, - O: Sink,SinkError=E>, + I: Stream,Error=E>, + O: Sink,SinkError=E>, E: From, { type Item = Committed; @@ -580,7 +632,11 @@ impl Future for Agreement } let message = try_ready!(self.input.poll()).ok_or(InputStreamConcluded)?; - self.strategy.import_message(message); + + match message.0 { + Communication::Message(message) => self.strategy.import_message(message), + Communication::Locked(proof) => self.strategy.import_lock_proof(&self.context, proof), + } self.poll() }