import and broadcast lock proofs

This commit is contained in:
Robert Habermeier
2017-12-27 18:27:18 +01:00
parent cb7d407880
commit a16c06da5b
+77 -21
View File
@@ -89,7 +89,7 @@ pub trait Context {
/// Sign a message using the local validator ID. /// Sign a message using the local validator ID.
fn sign_local(&self, message: Message<Self::Candidate, Self::Digest>) fn sign_local(&self, message: Message<Self::Candidate, Self::Digest>)
-> ContextLocalizedMessage<Self>; -> LocalizedMessage<Self::Candidate, Self::Digest, Self::ValidatorId, Self::Signature>;
/// Get the proposer for a given round of consensus. /// Get the proposer for a given round of consensus.
fn round_proposer(&self, round: usize) -> Self::ValidatorId; fn round_proposer(&self, round: usize) -> Self::ValidatorId;
@@ -103,16 +103,26 @@ pub trait Context {
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout; fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout;
} }
/// Communication that can occur between participants in consensus.
#[derive(Debug, Clone)]
pub enum Communication<C, D, V, S> {
/// A consensus message (proposal or vote)
Message(LocalizedMessage<C, D, V, S>),
/// A proof-of-lock.
Locked(PrepareJustification<D, S>),
}
/// Type alias for a localized message using only type parameters from `Context`. /// Type alias for a localized message using only type parameters from `Context`.
// TODO: actual type alias when it's no longer a warning. // TODO: actual type alias when it's no longer a warning.
#[derive(Debug)] pub struct ContextCommunication<C: Context + ?Sized>(pub Communication<C::Candidate, C::Digest, C::ValidatorId, C::Signature>);
pub struct ContextLocalizedMessage<C: Context + ?Sized>(pub LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>);
impl<C: Context + ?Sized> Clone for ContextLocalizedMessage<C> impl<C: Context + ?Sized> Clone for ContextCommunication<C>
where LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>: Clone where
LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>: Clone,
PrepareJustification<C::Digest, C::Signature>: Clone,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
ContextLocalizedMessage(self.0.clone()) ContextCommunication(self.0.clone())
} }
} }
@@ -275,8 +285,10 @@ impl<C: Context> Strategy<C> {
} }
} }
fn import_message(&mut self, msg: ContextLocalizedMessage<C>) { fn import_message(
let msg = msg.0; &mut self,
msg: LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>
) {
let round_number = msg.message.round_number(); let round_number = msg.message.round_number();
if round_number == self.current_accumulator.round_number() { if round_number == self.current_accumulator.round_number() {
@@ -286,11 +298,31 @@ impl<C: Context> Strategy<C> {
} }
} }
fn import_lock_proof(
&mut self,
context: &C,
justification: PrepareJustification<C::Digest, C::Signature>,
) {
// TODO: find a way to avoid processing of the signatures if the sender is
// not the primary or the round number is low.
let current_round_number = self.current_accumulator.round_number();
if justification.round_number < current_round_number {
return
} else if justification.round_number == current_round_number {
self.locked = Some(Locked { justification });
} else {
// jump ahead to the prior round as this is an indication of a supermajority
// good nodes being at least on that round.
self.advance_to_round(context, justification.round_number);
self.locked = Some(Locked { justification });
}
}
// poll the strategy: this will queue messages to be sent and advance // poll the strategy: this will queue messages to be sent and advance
// rounds if necessary. // rounds if necessary.
// //
// only call within the context of a `Task`. // only call within the context of a `Task`.
fn poll<E>(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>) fn poll<E>(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E> -> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
where where
C::RoundTimeout: Future<Error=E>, C::RoundTimeout: Future<Error=E>,
@@ -312,7 +344,20 @@ impl<C: Context> Strategy<C> {
Some(round_number + 1) Some(round_number + 1)
} }
&State::Committed(ref just) => { &State::Committed(ref just) => {
let candidate = self.notable_candidates.get(&just.digest).cloned(); // fetch the agreed-upon candidate:
// - we may not have received the proposal in the first place
// - there is no guarantee that the proposal we got was agreed upon
// (can happen if faulty primary)
// - look in the candidates of prior rounds just in case.
let candidate = self.current_accumulator
.proposal()
.and_then(|c| if context.candidate_digest(c) == just.digest {
Some(c.clone())
} else {
None
})
.or_else(|| self.notable_candidates.get(&just.digest).cloned());
let committed = Committed { let committed = Committed {
candidate, candidate,
justification: just.clone() justification: just.clone()
@@ -330,7 +375,7 @@ impl<C: Context> Strategy<C> {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
fn propose(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>) fn propose(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
-> Result<(), <C::Proposal as Future>::Error> -> Result<(), <C::Proposal as Future>::Error>
{ {
if let LocalState::Start = self.local_state { if let LocalState::Start = self.local_state {
@@ -375,15 +420,22 @@ impl<C: Context> Strategy<C> {
); );
self.import_and_send_message(message, context, sending); self.import_and_send_message(message, context, sending);
// broadcast the justification along with the proposal if we are locked.
if let Some(ref locked) = self.locked {
sending.push(
ContextCommunication(Communication::Locked(locked.justification.clone()))
);
}
self.local_state = LocalState::Proposed; self.local_state = LocalState::Proposed;
} }
} }
Ok(()) Ok(())
} }
fn prepare(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>) { fn prepare(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>) {
// prepare only upon start or having proposed. // prepare only upon start or having proposed.
match self.local_state { match self.local_state {
LocalState::Start | LocalState::Proposed => {}, LocalState::Start | LocalState::Proposed => {},
@@ -419,7 +471,7 @@ impl<C: Context> Strategy<C> {
} }
} }
fn commit(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>) { fn commit(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>) {
// commit only if we haven't voted to advance or committed already // commit only if we haven't voted to advance or committed already
match self.local_state { match self.local_state {
LocalState::Committed | LocalState::VoteAdvance => return, LocalState::Committed | LocalState::VoteAdvance => return,
@@ -446,7 +498,7 @@ impl<C: Context> Strategy<C> {
} }
} }
fn vote_advance(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>) fn vote_advance(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
-> Result<(), <C::RoundTimeout as Future>::Error> -> Result<(), <C::RoundTimeout as Future>::Error>
{ {
// we can vote for advancement under all circumstances unless we have already. // we can vote for advancement under all circumstances unless we have already.
@@ -526,11 +578,11 @@ impl<C: Context> Strategy<C> {
&mut self, &mut self,
message: Message<C::Candidate, C::Digest>, message: Message<C::Candidate, C::Digest>,
context: &C, context: &C,
sending: &mut Sending<ContextLocalizedMessage<C>> sending: &mut Sending<ContextCommunication<C>>
) { ) {
let signed_message = context.sign_local(message); let signed_message = context.sign_local(message);
self.import_message(signed_message.clone()); self.import_message(signed_message.clone());
sending.push(signed_message); sending.push(ContextCommunication(Communication::Message(signed_message)));
} }
} }
@@ -541,7 +593,7 @@ pub struct Agreement<C: Context, I, O> {
input: I, input: I,
output: O, output: O,
concluded: Option<Committed<C::Candidate, C::Digest, C::Signature>>, concluded: Option<Committed<C::Candidate, C::Digest, C::Signature>>,
sending: Sending<ContextLocalizedMessage<C>>, sending: Sending<ContextCommunication<C>>,
strategy: Strategy<C>, strategy: Strategy<C>,
} }
@@ -550,8 +602,8 @@ impl<C, I, O, E> Future for Agreement<C, I, O>
C: Context, C: Context,
C::RoundTimeout: Future<Error=E>, C::RoundTimeout: Future<Error=E>,
C::Proposal: Future<Error=E>, C::Proposal: Future<Error=E>,
I: Stream<Item=ContextLocalizedMessage<C>,Error=E>, I: Stream<Item=ContextCommunication<C>,Error=E>,
O: Sink<SinkItem=ContextLocalizedMessage<C>,SinkError=E>, O: Sink<SinkItem=ContextCommunication<C>,SinkError=E>,
E: From<InputStreamConcluded>, E: From<InputStreamConcluded>,
{ {
type Item = Committed<C::Candidate, C::Digest, C::Signature>; type Item = Committed<C::Candidate, C::Digest, C::Signature>;
@@ -580,7 +632,11 @@ impl<C, I, O, E> Future for Agreement<C, I, O>
} }
let message = try_ready!(self.input.poll()).ok_or(InputStreamConcluded)?; let message = try_ready!(self.input.poll()).ok_or(InputStreamConcluded)?;
self.strategy.import_message(message);
match message.0 {
Communication::Message(message) => self.strategy.import_message(message),
Communication::Locked(proof) => self.strategy.import_lock_proof(&self.context, proof),
}
self.poll() self.poll()
} }