accept bft agreement on proposal not locally submitted

This commit is contained in:
Robert Habermeier
2017-12-18 15:49:35 +01:00
parent dd7061e0d7
commit dcce39c441
+141 -27
View File
@@ -20,24 +20,37 @@
//! known by each node. The proposals they have may differ, so the agreement
//! may never complete.
use std::collections::HashSet;
use std::collections::HashMap;
use std::hash::Hash;
use futures::{Future, Stream, Sink};
use futures::future::{ok, loop_fn, Loop};
/// Messages over the proposal.
#[derive(Debug, Clone)]
pub enum Message<P> {
/// Prepare to vote for proposal P.
Prepare(P),
}
/// A localized message, including the sender.
pub struct LocalizedMessage<P, V> {
#[derive(Debug, Clone)]
pub struct LocalizedMessage<P, V, S> {
/// The message received.
pub message: Message<P>,
/// The sender of the message
pub sender: V,
/// The signature of the message.
pub signature: S,
}
/// The agreed-upon data.
#[derive(Debug, Clone)]
pub struct Agreed<P, V, S> {
/// The agreed-upon proposal.
pub proposal: P,
/// The justification for the proposal.
pub justification: Vec<LocalizedMessage<P, V, S>>,
}
/// Reach BFT agreement. Input the local proposal, message input stream, message output stream,
@@ -57,35 +70,83 @@ pub struct LocalizedMessage<P, V> {
/// view, the commit phase is not necessary.
// TODO: consider cross-view committing
// TODO: impl future.
pub fn agree<'a, P, I, O, V>(local_proposal: P, input: I, output: O, max_faulty: usize)
-> Box<Future<Item=P, Error=I::Error> + Send + 'a>
pub fn agree<'a, P, V, S, F, I, O>(
local_proposal: P,
local_id: V,
mut sign_local: F,
input: I,
output: O,
max_faulty: usize,
) -> Box<Future<Item=Agreed<P, V, S>, Error=I::Error> + Send + 'a>
where
P: 'a + Send + Eq + Clone,
V: 'a + Send + Hash + Eq,
I: 'a + Send + Stream<Item=LocalizedMessage<P, V>>,
O: 'a + Send + Sink<SinkItem=Message<P>,SinkError=I::Error>,
P: 'a + Send + Hash + Eq + Clone,
V: 'a + Send + Hash + Eq + Clone,
S: 'a + Send + Eq + Clone,
F: 'a + Send + FnMut(&Message<P>) -> S,
I: 'a + Send + Stream<Item=LocalizedMessage<P, V, S>>,
O: 'a + Send + Sink<SinkItem=LocalizedMessage<P, V, S>,SinkError=I::Error>,
I::Error: Send
{
let prepared = HashSet::new();
use std::collections::hash_map::Entry;
let broadcast_message = output.send(Message::Prepare(local_proposal.clone()));
let voting_for = HashMap::new();
let prepared = HashMap::new();
let wait_for_prepares = loop_fn((input, prepared), move |(input, mut prepared)| {
let local_proposal = local_proposal.clone();
let local_prepare = {
let local_prepare = Message::Prepare(local_proposal);
let local_signature = sign_local(&local_prepare);
LocalizedMessage {
message: local_prepare,
sender: local_id,
signature: local_signature,
}
};
// broadcast out our local prepare message and shortcut it into our input
// stream.
let broadcast_message = output.send(local_prepare.clone());
let input = ::futures::stream::once(Ok(local_prepare)).chain(input);
let wait_for_prepares = loop_fn((input, voting_for, prepared), move |(input, mut voting_for, mut prepared)| {
input.into_future().and_then(move |(msg, remainder)| {
let msg = msg.expect("input stream never concludes; qed");
let LocalizedMessage { message: Message::Prepare(p), sender } = msg;
let LocalizedMessage { message: Message::Prepare(p), sender, signature } = msg;
if p == local_proposal {
prepared.insert(sender);
// the threshold is 2f + 1, but this node makes up the one.
if prepared.len() >= max_faulty * 2 {
return ok(Loop::Break(p))
let is_complete = match voting_for.entry(sender) {
Entry::Occupied(_) => {
// TODO: handle double vote.
false
}
Entry::Vacant(vacant) => {
vacant.insert((p.clone(), signature));
let n = prepared.entry(p.clone()).or_insert(0);
*n += 1;
*n > max_faulty * 2
}
};
if is_complete {
let justification = voting_for.into_iter().filter_map(|(v, (x, s))| {
if x == p {
Some(LocalizedMessage {
message: Message::Prepare(x),
sender: v,
signature: s,
})
} else {
None
}
}).collect();
ok(Loop::Break(Agreed {
justification,
proposal: p,
}))
} else {
ok(Loop::Continue((remainder, voting_for, prepared)))
}
ok(Loop::Continue((remainder, prepared)))
}).map_err(|(e, _)| e)
});
@@ -99,12 +160,14 @@ mod tests {
#[test]
fn broadcasts_message() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel::<LocalizedMessage<usize, usize>>(10);
let (i_tx, i_rx) = ::futures::sync::mpsc::channel::<LocalizedMessage<usize, usize, bool>>(10);
let (o_tx, o_rx) = ::futures::sync::mpsc::channel(10);
let max_faulty = 3;
let agreement = agree(
100_000,
255,
|_msg| true,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
@@ -119,13 +182,14 @@ mod tests {
.next()
.expect("to have a next item")
.expect("not to have an error");
let Message::Prepare(p) = sent_message;
let Message::Prepare(p) = sent_message.message;
assert_eq!(p, 100_000);
assert_eq!(sent_message.sender, 255);
}
#[test]
fn concludes_on_2f_prepares() {
fn concludes_on_2f_prepares_for_local_proposal() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
@@ -133,6 +197,8 @@ mod tests {
let agreement = agree(
100_000,
255,
|_msg| true,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
@@ -142,6 +208,7 @@ mod tests {
LocalizedMessage {
message: Message::Prepare(100_000),
sender: i,
signature: true,
}
});
@@ -159,11 +226,11 @@ mod tests {
.expect("not to have an error")
.expect("not to fail to agree");
assert_eq!(agreed_value, 100_000);
assert_eq!(agreed_value.proposal, 100_000);
}
#[test]
fn never_concludes_on_less_than_2f_prepares() {
fn concludes_on_2f_plus_one_prepares_for_alternate_proposal() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
@@ -171,6 +238,49 @@ mod tests {
let agreement = agree(
100_000,
255,
|_msg| true,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
);
let iter = (0..(max_faulty * 2 + 1)).map(|i| {
LocalizedMessage {
message: Message::Prepare(100_001),
sender: i,
signature: true,
}
});
let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
::std::thread::spawn(move || {
::std::thread::sleep(::std::time::Duration::from_secs(5));
timeout_tx.send(None).unwrap();
});
let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
.wait()
.map(|(r, _)| r)
.map_err(|(e, _)| e)
.expect("not to have an error")
.expect("not to fail to agree");
assert_eq!(agreed_value.proposal, 100_001);
}
#[test]
fn never_concludes_on_less_than_2f_prepares_for_local() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
let max_faulty = 3;
let agreement = agree(
100_000,
255,
|_msg| true,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
@@ -180,6 +290,7 @@ mod tests {
LocalizedMessage {
message: Message::Prepare(100_000),
sender: i,
signature: true,
}
});
@@ -200,7 +311,7 @@ mod tests {
}
#[test]
fn never_concludes_on_2f_prepares_for_different_proposal() {
fn never_concludes_on_less_than_2f_plus_one_prepares_for_alternate() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
@@ -208,15 +319,18 @@ mod tests {
let agreement = agree(
100_000,
255,
|_msg| true,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
);
let iter = (0..(max_faulty * 2)).map(|i| {
let iter = (1..(max_faulty * 2 + 1)).map(|i| {
LocalizedMessage {
message: Message::Prepare(100_001),
sender: i,
signature: true,
}
});