diff --git a/substrate/candidate-agreement/src/bft/mod.rs b/substrate/candidate-agreement/src/bft/mod.rs index 5c6b126a1e..6beff362e4 100644 --- a/substrate/candidate-agreement/src/bft/mod.rs +++ b/substrate/candidate-agreement/src/bft/mod.rs @@ -308,16 +308,17 @@ impl Strategy { ) { // TODO: find a way to avoid processing of the signatures if the sender is // not the primary or the round number is low. - let current_round_number = self.current_accumulator.round_number(); - if justification.round_number < current_round_number { - return - } else if justification.round_number == current_round_number { - self.locked = Some(Locked { justification }); - } else { + if justification.round_number > self.current_accumulator.round_number() { // jump ahead to the prior round as this is an indication of a supermajority // good nodes being at least on that round. self.advance_to_round(context, justification.round_number); - self.locked = Some(Locked { justification }); + } + + let lock_to_new = self.locked.as_ref() + .map_or(true, |l| l.justification.round_number < justification.round_number); + + if lock_to_new { + self.locked = Some(Locked { justification }) } } diff --git a/substrate/candidate-agreement/src/bft/tests.rs b/substrate/candidate-agreement/src/bft/tests.rs index 165166d76a..86857ba5de 100644 --- a/substrate/candidate-agreement/src/bft/tests.rs +++ b/substrate/candidate-agreement/src/bft/tests.rs @@ -322,3 +322,91 @@ fn consensus_does_not_complete_without_enough_nodes() { assert!(result.is_none(), "not enough online nodes"); } + +#[test] +fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { + let node_count = 10; + let max_faulty = 3; + + let locked_proposal = Candidate(999_999_999); + let locked_digest = Digest(999_999_999); + let locked_round = 1; + let justification = PrepareJustification { + round_number: locked_round, + digest: locked_digest.clone(), + signatures: (0..7) + .map(|i| Signature(Message::Prepare(locked_round, locked_digest.clone()), ValidatorId(i))) + .collect() + }; + + let mut shared_context = SharedContext::new(node_count); + shared_context.current_round = locked_round + 1; + let shared_context = Arc::new(Mutex::new(shared_context)); + + let (network, net_send, net_recv) = Network::new(node_count); + network.route_on_thread(); + + let nodes = net_send + .into_iter() + .zip(net_recv) + .enumerate() + .map(|(i, (tx, rx))| { + let ctx = TestContext { + local_id: ValidatorId(i), + proposal: Mutex::new(i), + shared: shared_context.clone(), + }; + + let mut agreement = agree( + ctx, + node_count, + max_faulty, + rx.map_err(|_| Error), + tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), + ); + + agreement.strategy.advance_to_round( + &agreement.context, + locked_round + 1 + ); + + if i <= max_faulty { + agreement.strategy.locked = Some(Locked { + justification: justification.clone(), + }) + } + + if i == max_faulty { + agreement.strategy.notable_candidates.insert( + locked_digest.clone(), + locked_proposal.clone(), + ); + } + + agreement + }) + .collect::>(); + + ::std::thread::spawn(move || { + let mut timeout = ::std::time::Duration::from_millis(50); + loop { + ::std::thread::sleep(timeout.clone()); + shared_context.lock().unwrap().bump_round(); + timeout *= 2; + } + }); + + let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); + let results = ::futures::future::join_all(nodes) + .map(Some) + .select(timeout.map(|_| None)) + .wait() + .map(|(i, _)| i) + .map_err(|(e, _)| e) + .expect("to complete") + .expect("to not time out"); + + for result in &results { + assert_eq!(&result.justification.digest, &locked_digest); + } +}