diff --git a/substrate/substrate/bft/src/generic/mod.rs b/substrate/substrate/bft/src/generic/mod.rs index 0795cbfc62..6d4715e806 100644 --- a/substrate/substrate/bft/src/generic/mod.rs +++ b/substrate/substrate/bft/src/generic/mod.rs @@ -17,7 +17,7 @@ //! BFT Agreement based on a rotating proposer in different rounds. //! Very general implementation. -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, BTreeMap, VecDeque}; use std::collections::hash_map; use std::fmt::Debug; use std::hash::Hash; @@ -321,7 +321,7 @@ struct Strategy { locked: Option>, notable_candidates: HashMap, current_accumulator: Accumulator, - future_accumulator: Accumulator, + future_accumulators: BTreeMap>, local_id: C::AuthorityId, misbehavior: HashMap>, } @@ -337,17 +337,11 @@ impl Strategy { context.round_proposer(0), ); - let future_accumulator = Accumulator::new( - 1, - threshold, - context.round_proposer(1), - ); - Strategy { nodes, max_faulty, current_accumulator, - future_accumulator, + future_accumulators: BTreeMap::new(), fetching_proposal: None, evaluating_proposal: None, local_state: LocalState::Start, @@ -359,17 +353,33 @@ impl Strategy { } } + fn current_round(&self) -> usize { + self.current_accumulator.round_number() + } + fn import_message( &mut self, + context: &C, msg: LocalizedMessage ) { let round_number = msg.round_number(); let sender = msg.sender().clone(); - let misbehavior = if round_number == self.current_accumulator.round_number() { + let current_round = self.current_round(); + let misbehavior = if round_number == current_round { self.current_accumulator.import_message(msg) - } else if round_number == self.future_accumulator.round_number() { - self.future_accumulator.import_message(msg) + } else if round_number > current_round { + let threshold = bft_threshold(self.nodes, self.max_faulty); + + let mut future_acc = self.future_accumulators.entry(round_number).or_insert_with(|| { + Accumulator::new( + round_number, + threshold, + context.round_proposer(round_number), + ) + }); + + future_acc.import_message(msg) } else { Ok(()) }; @@ -386,7 +396,7 @@ impl Strategy { ) { // TODO: find a way to avoid processing of the signatures if the sender is // not the primary or the round number is low. - if justification.round_number > self.current_accumulator.round_number() { + if justification.round_number > self.current_round() { // jump ahead to the prior round as this is an indication of a supermajority // good nodes being at least on that round. self.advance_to_round(context, justification.round_number); @@ -411,10 +421,7 @@ impl Strategy { ) -> Poll, C::Error> { - let mut last_watermark = ( - self.current_accumulator.round_number(), - self.local_state - ); + let mut last_watermark = (self.current_round(), self.local_state); // poll until either completion or state doesn't change. loop { @@ -422,10 +429,7 @@ impl Strategy { match self.poll_once(context, sending)? { Async::Ready(x) => return Ok(Async::Ready(x)), Async::NotReady => { - let new_watermark = ( - self.current_accumulator.round_number(), - self.local_state - ); + let new_watermark = (self.current_round(), self.local_state); if new_watermark == last_watermark { return Ok(Async::NotReady) @@ -458,7 +462,7 @@ impl Strategy { self.locked = Some(Locked { justification: p_just.clone() }); } - let round_number = self.current_accumulator.round_number(); + let round_number = self.current_round(); Some(round_number + 1) } &State::Committed(ref just) => { @@ -503,7 +507,7 @@ impl Strategy { if let LocalState::Start = self.local_state { let mut propose = false; if let &State::Begin = self.current_accumulator.state() { - let round_number = self.current_accumulator.round_number(); + let round_number = self.current_round(); let primary = context.round_proposer(round_number); propose = self.local_id == primary; }; @@ -537,7 +541,7 @@ impl Strategy { self.fetching_proposal = None; let message = Message::Propose( - self.current_accumulator.round_number(), + self.current_round(), proposal ); @@ -605,7 +609,7 @@ impl Strategy { if let Some(digest) = prepare_for { let message = Vote::Prepare( - self.current_accumulator.round_number(), + self.current_round(), digest ).into(); @@ -637,7 +641,7 @@ impl Strategy { if let Some(digest) = commit_for { let message = Vote::Commit( - self.current_accumulator.round_number(), + self.current_round(), digest ).into(); @@ -672,7 +676,7 @@ impl Strategy { if attempt_advance { let message = Vote::AdvanceRound( - self.current_accumulator.round_number(), + self.current_round(), ).into(); self.import_and_send_message(message, context, sending); @@ -683,22 +687,14 @@ impl Strategy { } fn advance_to_round(&mut self, context: &C, round: usize) { - assert!(round > self.current_accumulator.round_number()); + assert!(round > self.current_round()); trace!(target: "bft", "advancing to round {}", round); - let threshold = self.nodes - self.max_faulty; - self.fetching_proposal = None; self.evaluating_proposal = None; self.round_timeout = context.begin_round_timeout(round).fuse(); self.local_state = LocalState::Start; - let new_future = Accumulator::new( - round + 1, - threshold, - context.round_proposer(round + 1), - ); - // when advancing from a round, store away the witnessed proposal. // // if we or other participants end up locked on that candidate, @@ -708,18 +704,20 @@ impl Strategy { self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone()); } - // special case when advancing by a single round. - if self.future_accumulator.round_number() == round { - self.current_accumulator - = ::std::mem::replace(&mut self.future_accumulator, new_future); - } else { - self.future_accumulator = new_future; - self.current_accumulator = Accumulator::new( - round, - threshold, - context.round_proposer(round), - ); + // if we jump ahead more than one round, get rid of the ones in between. + for irrelevant in (self.current_round() + 1)..round { + self.future_accumulators.remove(&irrelevant); } + + // use stored future accumulator for given round or create if it doesn't exist. + self.current_accumulator = match self.future_accumulators.remove(&round) { + Some(x) => x, + None => Accumulator::new( + round, + bft_threshold(self.nodes, self.max_faulty), + context.round_proposer(round), + ), + }; } fn import_and_send_message( @@ -729,7 +727,7 @@ impl Strategy { sending: &mut Sending<::Communication> ) { let signed_message = context.sign_local(message); - self.import_message(signed_message.clone()); + self.import_message(context, signed_message.clone()); sending.push(Communication::Consensus(signed_message)); } } @@ -767,34 +765,32 @@ impl Future for Agreement }) } - loop { - let message = match self.input.poll()? { - Async::Ready(msg) => msg.ok_or(InputStreamConcluded)?, - Async::NotReady => break, + // drive state machine as long as there are new messages. + let mut driving = true; + while driving { + driving = match self.input.poll()? { + Async::Ready(msg) => { + match msg.ok_or(InputStreamConcluded)? { + Communication::Consensus(message) => self.strategy.import_message(&self.context, message), + Communication::Auxiliary(lock_proof) + => self.strategy.import_lock_proof(&self.context, lock_proof), + } + + true + } + Async::NotReady => false, }; - match message { - Communication::Consensus(message) => self.strategy.import_message(message), - Communication::Auxiliary(lock_proof) - => self.strategy.import_lock_proof(&self.context, lock_proof), + // drive state machine after handling new input. + if let Async::Ready(just) = self.strategy.poll(&self.context, &mut self.sending)? { + self.concluded = Some(just); + return self.poll(); } } - // try to process timeouts. - let state_machine_res = self.strategy.poll(&self.context, &mut self.sending)?; - // make progress on flushing all pending messages. let _ = self.sending.process_all(&mut self.output)?; - - match state_machine_res { - Async::Ready(just) => { - self.concluded = Some(just); - self.poll() - } - Async::NotReady => { - Ok(Async::NotReady) - } - } + Ok(Async::NotReady) } }