ensure proposal evaluation occurs at most once per round (#125)

* ensure proposal evaluation occurs at most once per round

* add test

* remove println
This commit is contained in:
Robert Habermeier
2018-04-15 13:23:59 +02:00
committed by Gav Wood
parent 2a53d414a3
commit 40c7820c31
2 changed files with 79 additions and 4 deletions
+10 -2
View File
@@ -293,7 +293,7 @@ impl<D, S> Locked<D, S> {
enum LocalState {
Start,
Proposed,
Prepared,
Prepared(bool), // whether we thought it valid.
Committed,
VoteAdvance,
}
@@ -582,6 +582,7 @@ impl<C: Context> Strategy<C> {
Some(_) => {
// don't check validity if we are locked.
// this is necessary to preserve the liveness property.
self.local_state = LocalState::Prepared(true);
prepare_for = Some(digest);
}
None => {
@@ -591,6 +592,8 @@ impl<C: Context> Strategy<C> {
if let Async::Ready(valid) = res {
self.evaluating_proposal = None;
self.local_state = LocalState::Prepared(valid);
if valid {
prepare_for = Some(digest);
}
@@ -606,7 +609,6 @@ impl<C: Context> Strategy<C> {
).into();
self.import_and_send_message(message, context, sending);
self.local_state = LocalState::Prepared;
}
Ok(())
@@ -657,6 +659,12 @@ impl<C: Context> Strategy<C> {
// sent an AdvanceRound message yet, do so.
let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty;
// if we evaluated the proposal and it was bad, vote to advance round.
if let LocalState::Prepared(false) = self.local_state {
attempt_advance = true;
}
// if the timeout has fired, vote to advance round.
if let Async::Ready(_) = self.round_timeout.poll()? {
attempt_advance = true;
}
+69 -2
View File
@@ -18,6 +18,7 @@
use super::*;
use std::collections::BTreeSet;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
@@ -117,6 +118,7 @@ struct TestContext {
node_count: usize,
current_round: Arc<AtomicUsize>,
timer: Timer,
evaluated: Mutex<BTreeSet<usize>>,
}
impl Context for TestContext {
@@ -137,7 +139,7 @@ impl Context for TestContext {
let proposal = {
let mut p = self.proposal.lock().unwrap();
let x = *p;
*p = (*p * 2) + 1;
*p += self.node_count;
x
};
@@ -175,6 +177,10 @@ impl Context for TestContext {
}
fn proposal_valid(&self, proposal: &Candidate) -> FutureResult<bool, Error> {
if !self.evaluated.lock().unwrap().insert(proposal.0) {
panic!("Evaluated proposal {:?} twice", proposal.0);
}
Ok(proposal.0 % 3 != 0).into_future()
}
@@ -230,6 +236,64 @@ fn consensus_completes_with_minimum_good() {
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
agree(
ctx,
node_count,
max_faulty,
rx.map_err(|_| Error),
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
)
})
.collect::<Vec<_>>();
let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
let results = ::futures::future::join_all(nodes)
.map(Some)
.select(timeout.map(|_| None))
.wait()
.map(|(i, _)| i)
.map_err(|(e, _)| e)
.expect("to complete")
.expect("to not time out");
for result in &results {
assert_eq!(&result.justification.digest, &results[0].justification.digest);
}
}
#[test]
fn consensus_completes_with_minimum_good_all_initial_proposals_bad() {
let node_count = 10;
let max_faulty = 3;
let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();
let (network, net_send, net_recv) = Network::new(node_count);
network.route_on_thread();
let nodes = net_send
.into_iter()
.zip(net_recv)
.take(node_count - max_faulty)
.enumerate()
.map(|(i, (tx, rx))| {
// the first 5 proposals are going to be bad.
let proposal = if i < 5 {
i * 3 // proposals considered bad in the tests if they are % 3
} else {
(i * 3) + 1
};
let ctx = TestContext {
local_id: AuthorityId(i),
proposal: Mutex::new(proposal),
current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
@@ -279,6 +343,7 @@ fn consensus_does_not_complete_without_enough_nodes() {
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
@@ -335,6 +400,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(locked_round + 1)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
let mut agreement = agree(
@@ -367,7 +433,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
})
.collect::<Vec<_>>();
let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
let timeout = timeout_in(Duration::from_millis(1000)).map_err(|_| Error);
let results = ::futures::future::join_all(nodes)
.map(Some)
.select(timeout.map(|_| None))
@@ -404,6 +470,7 @@ fn consensus_completes_even_when_nodes_start_with_a_delay() {
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};