store all future messages in BFT (#228)

This commit is contained in:
Robert Habermeier
2018-06-22 17:52:23 +02:00
committed by Gav Wood
parent 6a56c65a5e
commit 4082d660ba
+65 -69
View File
@@ -17,7 +17,7 @@
//! BFT Agreement based on a rotating proposer in different rounds.
//! Very general implementation.
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, BTreeMap, VecDeque};
use std::collections::hash_map;
use std::fmt::Debug;
use std::hash::Hash;
@@ -321,7 +321,7 @@ struct Strategy<C: Context> {
locked: Option<Locked<C::Digest, C::Signature>>,
notable_candidates: HashMap<C::Digest, C::Candidate>,
current_accumulator: Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
future_accumulator: Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
future_accumulators: BTreeMap<usize, Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>>,
local_id: C::AuthorityId,
misbehavior: HashMap<C::AuthorityId, Misbehavior<C::Digest, C::Signature>>,
}
@@ -337,17 +337,11 @@ impl<C: Context> Strategy<C> {
context.round_proposer(0),
);
let future_accumulator = Accumulator::new(
1,
threshold,
context.round_proposer(1),
);
Strategy {
nodes,
max_faulty,
current_accumulator,
future_accumulator,
future_accumulators: BTreeMap::new(),
fetching_proposal: None,
evaluating_proposal: None,
local_state: LocalState::Start,
@@ -359,17 +353,33 @@ impl<C: Context> Strategy<C> {
}
}
fn current_round(&self) -> usize {
self.current_accumulator.round_number()
}
fn import_message(
&mut self,
context: &C,
msg: LocalizedMessage<C::Candidate, C::Digest, C::AuthorityId, C::Signature>
) {
let round_number = msg.round_number();
let sender = msg.sender().clone();
let misbehavior = if round_number == self.current_accumulator.round_number() {
let current_round = self.current_round();
let misbehavior = if round_number == current_round {
self.current_accumulator.import_message(msg)
} else if round_number == self.future_accumulator.round_number() {
self.future_accumulator.import_message(msg)
} else if round_number > current_round {
let threshold = bft_threshold(self.nodes, self.max_faulty);
let mut future_acc = self.future_accumulators.entry(round_number).or_insert_with(|| {
Accumulator::new(
round_number,
threshold,
context.round_proposer(round_number),
)
});
future_acc.import_message(msg)
} else {
Ok(())
};
@@ -386,7 +396,7 @@ impl<C: Context> Strategy<C> {
) {
// TODO: find a way to avoid processing of the signatures if the sender is
// not the primary or the round number is low.
if justification.round_number > self.current_accumulator.round_number() {
if justification.round_number > self.current_round() {
// jump ahead to the prior round as this is an indication of a supermajority
// good nodes being at least on that round.
self.advance_to_round(context, justification.round_number);
@@ -411,10 +421,7 @@ impl<C: Context> Strategy<C> {
)
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, C::Error>
{
let mut last_watermark = (
self.current_accumulator.round_number(),
self.local_state
);
let mut last_watermark = (self.current_round(), self.local_state);
// poll until either completion or state doesn't change.
loop {
@@ -422,10 +429,7 @@ impl<C: Context> Strategy<C> {
match self.poll_once(context, sending)? {
Async::Ready(x) => return Ok(Async::Ready(x)),
Async::NotReady => {
let new_watermark = (
self.current_accumulator.round_number(),
self.local_state
);
let new_watermark = (self.current_round(), self.local_state);
if new_watermark == last_watermark {
return Ok(Async::NotReady)
@@ -458,7 +462,7 @@ impl<C: Context> Strategy<C> {
self.locked = Some(Locked { justification: p_just.clone() });
}
let round_number = self.current_accumulator.round_number();
let round_number = self.current_round();
Some(round_number + 1)
}
&State::Committed(ref just) => {
@@ -503,7 +507,7 @@ impl<C: Context> Strategy<C> {
if let LocalState::Start = self.local_state {
let mut propose = false;
if let &State::Begin = self.current_accumulator.state() {
let round_number = self.current_accumulator.round_number();
let round_number = self.current_round();
let primary = context.round_proposer(round_number);
propose = self.local_id == primary;
};
@@ -537,7 +541,7 @@ impl<C: Context> Strategy<C> {
self.fetching_proposal = None;
let message = Message::Propose(
self.current_accumulator.round_number(),
self.current_round(),
proposal
);
@@ -605,7 +609,7 @@ impl<C: Context> Strategy<C> {
if let Some(digest) = prepare_for {
let message = Vote::Prepare(
self.current_accumulator.round_number(),
self.current_round(),
digest
).into();
@@ -637,7 +641,7 @@ impl<C: Context> Strategy<C> {
if let Some(digest) = commit_for {
let message = Vote::Commit(
self.current_accumulator.round_number(),
self.current_round(),
digest
).into();
@@ -672,7 +676,7 @@ impl<C: Context> Strategy<C> {
if attempt_advance {
let message = Vote::AdvanceRound(
self.current_accumulator.round_number(),
self.current_round(),
).into();
self.import_and_send_message(message, context, sending);
@@ -683,22 +687,14 @@ impl<C: Context> Strategy<C> {
}
fn advance_to_round(&mut self, context: &C, round: usize) {
assert!(round > self.current_accumulator.round_number());
assert!(round > self.current_round());
trace!(target: "bft", "advancing to round {}", round);
let threshold = self.nodes - self.max_faulty;
self.fetching_proposal = None;
self.evaluating_proposal = None;
self.round_timeout = context.begin_round_timeout(round).fuse();
self.local_state = LocalState::Start;
let new_future = Accumulator::new(
round + 1,
threshold,
context.round_proposer(round + 1),
);
// when advancing from a round, store away the witnessed proposal.
//
// if we or other participants end up locked on that candidate,
@@ -708,18 +704,20 @@ impl<C: Context> Strategy<C> {
self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone());
}
// special case when advancing by a single round.
if self.future_accumulator.round_number() == round {
self.current_accumulator
= ::std::mem::replace(&mut self.future_accumulator, new_future);
} else {
self.future_accumulator = new_future;
self.current_accumulator = Accumulator::new(
round,
threshold,
context.round_proposer(round),
);
// if we jump ahead more than one round, get rid of the ones in between.
for irrelevant in (self.current_round() + 1)..round {
self.future_accumulators.remove(&irrelevant);
}
// use stored future accumulator for given round or create if it doesn't exist.
self.current_accumulator = match self.future_accumulators.remove(&round) {
Some(x) => x,
None => Accumulator::new(
round,
bft_threshold(self.nodes, self.max_faulty),
context.round_proposer(round),
),
};
}
fn import_and_send_message(
@@ -729,7 +727,7 @@ impl<C: Context> Strategy<C> {
sending: &mut Sending<<C as TypeResolve>::Communication>
) {
let signed_message = context.sign_local(message);
self.import_message(signed_message.clone());
self.import_message(context, signed_message.clone());
sending.push(Communication::Consensus(signed_message));
}
}
@@ -767,34 +765,32 @@ impl<C, I, O> Future for Agreement<C, I, O>
})
}
loop {
let message = match self.input.poll()? {
Async::Ready(msg) => msg.ok_or(InputStreamConcluded)?,
Async::NotReady => break,
// drive state machine as long as there are new messages.
let mut driving = true;
while driving {
driving = match self.input.poll()? {
Async::Ready(msg) => {
match msg.ok_or(InputStreamConcluded)? {
Communication::Consensus(message) => self.strategy.import_message(&self.context, message),
Communication::Auxiliary(lock_proof)
=> self.strategy.import_lock_proof(&self.context, lock_proof),
}
true
}
Async::NotReady => false,
};
match message {
Communication::Consensus(message) => self.strategy.import_message(message),
Communication::Auxiliary(lock_proof)
=> self.strategy.import_lock_proof(&self.context, lock_proof),
// drive state machine after handling new input.
if let Async::Ready(just) = self.strategy.poll(&self.context, &mut self.sending)? {
self.concluded = Some(just);
return self.poll();
}
}
// try to process timeouts.
let state_machine_res = self.strategy.poll(&self.context, &mut self.sending)?;
// make progress on flushing all pending messages.
let _ = self.sending.process_all(&mut self.output)?;
match state_machine_res {
Async::Ready(just) => {
self.concluded = Some(just);
self.poll()
}
Async::NotReady => {
Ok(Async::NotReady)
}
}
Ok(Async::NotReady)
}
}