diff --git a/substrate/candidate-agreement/src/bft.rs b/substrate/candidate-agreement/src/bft.rs
index b536dc9f2f..1cc9158127 100644
--- a/substrate/candidate-agreement/src/bft.rs
+++ b/substrate/candidate-agreement/src/bft.rs
@@ -20,24 +20,37 @@
//! known by each node. The proposals they have may differ, so the agreement
//! may never complete.
-use std::collections::HashSet;
+use std::collections::HashMap;
use std::hash::Hash;
use futures::{Future, Stream, Sink};
use futures::future::{ok, loop_fn, Loop};
/// Messages over the proposal.
+#[derive(Debug, Clone)]
pub enum Message
{
/// Prepare to vote for proposal P.
Prepare(P),
}
/// A localized message, including the sender.
-pub struct LocalizedMessage
{
+#[derive(Debug, Clone)]
+pub struct LocalizedMessage
{
/// The message received.
pub message: Message
,
/// The sender of the message
pub sender: V,
+ /// The signature of the message.
+ pub signature: S,
+}
+
+/// The agreed-upon data.
+#[derive(Debug, Clone)]
+pub struct Agreed
{
+ /// The agreed-upon proposal.
+ pub proposal: P,
+ /// The justification for the proposal.
+ pub justification: Vec>,
}
/// Reach BFT agreement. Input the local proposal, message input stream, message output stream,
@@ -57,35 +70,83 @@ pub struct LocalizedMessage {
/// view, the commit phase is not necessary.
// TODO: consider cross-view committing
// TODO: impl future.
-pub fn agree<'a, P, I, O, V>(local_proposal: P, input: I, output: O, max_faulty: usize)
- -> Box + Send + 'a>
+pub fn agree<'a, P, V, S, F, I, O>(
+ local_proposal: P,
+ local_id: V,
+ mut sign_local: F,
+ input: I,
+ output: O,
+ max_faulty: usize,
+) -> Box, Error=I::Error> + Send + 'a>
where
- P: 'a + Send + Eq + Clone,
- V: 'a + Send + Hash + Eq,
- I: 'a + Send + Stream- >,
- O: 'a + Send + Sink,SinkError=I::Error>,
+ P: 'a + Send + Hash + Eq + Clone,
+ V: 'a + Send + Hash + Eq + Clone,
+ S: 'a + Send + Eq + Clone,
+ F: 'a + Send + FnMut(&Message
) -> S,
+ I: 'a + Send + Stream- >,
+ O: 'a + Send + Sink,SinkError=I::Error>,
I::Error: Send
{
- let prepared = HashSet::new();
+ use std::collections::hash_map::Entry;
- let broadcast_message = output.send(Message::Prepare(local_proposal.clone()));
+ let voting_for = HashMap::new();
+ let prepared = HashMap::new();
- let wait_for_prepares = loop_fn((input, prepared), move |(input, mut prepared)| {
- let local_proposal = local_proposal.clone();
+ let local_prepare = {
+ let local_prepare = Message::Prepare(local_proposal);
+ let local_signature = sign_local(&local_prepare);
+
+ LocalizedMessage {
+ message: local_prepare,
+ sender: local_id,
+ signature: local_signature,
+ }
+ };
+
+ // broadcast out our local prepare message and shortcut it into our input
+ // stream.
+ let broadcast_message = output.send(local_prepare.clone());
+ let input = ::futures::stream::once(Ok(local_prepare)).chain(input);
+
+ let wait_for_prepares = loop_fn((input, voting_for, prepared), move |(input, mut voting_for, mut prepared)| {
input.into_future().and_then(move |(msg, remainder)| {
let msg = msg.expect("input stream never concludes; qed");
- let LocalizedMessage { message: Message::Prepare(p), sender } = msg;
+ let LocalizedMessage { message: Message::Prepare(p), sender, signature } = msg;
- if p == local_proposal {
- prepared.insert(sender);
-
- // the threshold is 2f + 1, but this node makes up the one.
- if prepared.len() >= max_faulty * 2 {
- return ok(Loop::Break(p))
+ let is_complete = match voting_for.entry(sender) {
+ Entry::Occupied(_) => {
+ // TODO: handle double vote.
+ false
}
+ Entry::Vacant(vacant) => {
+ vacant.insert((p.clone(), signature));
+ let n = prepared.entry(p.clone()).or_insert(0);
+ *n += 1;
+ *n > max_faulty * 2
+ }
+ };
+
+ if is_complete {
+ let justification = voting_for.into_iter().filter_map(|(v, (x, s))| {
+ if x == p {
+ Some(LocalizedMessage {
+ message: Message::Prepare(x),
+ sender: v,
+ signature: s,
+ })
+ } else {
+ None
+ }
+ }).collect();
+
+ ok(Loop::Break(Agreed {
+ justification,
+ proposal: p,
+ }))
+ } else {
+ ok(Loop::Continue((remainder, voting_for, prepared)))
}
- ok(Loop::Continue((remainder, prepared)))
}).map_err(|(e, _)| e)
});
@@ -99,12 +160,14 @@ mod tests {
#[test]
fn broadcasts_message() {
- let (i_tx, i_rx) = ::futures::sync::mpsc::channel::>(10);
+ let (i_tx, i_rx) = ::futures::sync::mpsc::channel::>(10);
let (o_tx, o_rx) = ::futures::sync::mpsc::channel(10);
let max_faulty = 3;
let agreement = agree(
100_000,
+ 255,
+ |_msg| true,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
@@ -119,13 +182,14 @@ mod tests {
.next()
.expect("to have a next item")
.expect("not to have an error");
- let Message::Prepare(p) = sent_message;
+ let Message::Prepare(p) = sent_message.message;
assert_eq!(p, 100_000);
+ assert_eq!(sent_message.sender, 255);
}
#[test]
- fn concludes_on_2f_prepares() {
+ fn concludes_on_2f_prepares_for_local_proposal() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
@@ -133,6 +197,8 @@ mod tests {
let agreement = agree(
100_000,
+ 255,
+ |_msg| true,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
@@ -142,6 +208,7 @@ mod tests {
LocalizedMessage {
message: Message::Prepare(100_000),
sender: i,
+ signature: true,
}
});
@@ -159,11 +226,11 @@ mod tests {
.expect("not to have an error")
.expect("not to fail to agree");
- assert_eq!(agreed_value, 100_000);
+ assert_eq!(agreed_value.proposal, 100_000);
}
#[test]
- fn never_concludes_on_less_than_2f_prepares() {
+ fn concludes_on_2f_plus_one_prepares_for_alternate_proposal() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
@@ -171,6 +238,49 @@ mod tests {
let agreement = agree(
100_000,
+ 255,
+ |_msg| true,
+ i_rx.map_err(|_| ()),
+ o_tx.sink_map_err(|_| ()),
+ max_faulty,
+ );
+
+ let iter = (0..(max_faulty * 2 + 1)).map(|i| {
+ LocalizedMessage {
+ message: Message::Prepare(100_001),
+ sender: i,
+ signature: true,
+ }
+ });
+
+ let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
+
+ ::std::thread::spawn(move || {
+ ::std::thread::sleep(::std::time::Duration::from_secs(5));
+ timeout_tx.send(None).unwrap();
+ });
+
+ let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
+ .wait()
+ .map(|(r, _)| r)
+ .map_err(|(e, _)| e)
+ .expect("not to have an error")
+ .expect("not to fail to agree");
+
+ assert_eq!(agreed_value.proposal, 100_001);
+ }
+
+ #[test]
+ fn never_concludes_on_less_than_2f_prepares_for_local() {
+ let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
+ let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
+ let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
+ let max_faulty = 3;
+
+ let agreement = agree(
+ 100_000,
+ 255,
+ |_msg| true,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
@@ -180,6 +290,7 @@ mod tests {
LocalizedMessage {
message: Message::Prepare(100_000),
sender: i,
+ signature: true,
}
});
@@ -200,7 +311,7 @@ mod tests {
}
#[test]
- fn never_concludes_on_2f_prepares_for_different_proposal() {
+ fn never_concludes_on_less_than_2f_plus_one_prepares_for_alternate() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
@@ -208,15 +319,18 @@ mod tests {
let agreement = agree(
100_000,
+ 255,
+ |_msg| true,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
);
- let iter = (0..(max_faulty * 2)).map(|i| {
+ let iter = (1..(max_faulty * 2 + 1)).map(|i| {
LocalizedMessage {
message: Message::Prepare(100_001),
sender: i,
+ signature: true,
}
});