diff --git a/substrate/candidate-agreement/src/bft.rs b/substrate/candidate-agreement/src/bft.rs
index 03c67fcb0f..f41bfd4606 100644
--- a/substrate/candidate-agreement/src/bft.rs
+++ b/substrate/candidate-agreement/src/bft.rs
@@ -58,12 +58,13 @@ pub struct LocalizedMessage
{
// TODO: consider cross-view committing?
// TODO: impl future.
pub fn agree<'a, P, I, O, V>(local_proposal: P, input: I, output: O, max_faulty: usize)
- -> Box + 'a>
+ -> Box + Send + 'a>
where
- P: 'a + Eq + Clone,
- V: 'a + Hash + Eq,
- I: 'a + Stream- >,
- O: 'a + Sink,SinkError=I::Error>,
+ P: 'a + Send + Eq + Clone,
+ V: 'a + Send + Hash + Eq,
+ I: 'a + Send + Stream
- >,
+ O: 'a + Send + Sink,SinkError=I::Error>,
+ I::Error: Send
{
let prepared = HashSet::new();
@@ -90,3 +91,111 @@ pub fn agree<'a, P, I, O, V>(local_proposal: P, input: I, output: O, max_faulty:
Box::new(broadcast_message.and_then(move |_| wait_for_prepares))
}
+
+#[cfg(test)]
+mod tests {
+ use futures::{Future, Stream, Sink};
+ use super::*;
+
+ #[test]
+ fn broadcasts_message() {
+ let (i_tx, i_rx) = ::futures::sync::mpsc::channel::>(10);
+ let (o_tx, o_rx) = ::futures::sync::mpsc::channel(10);
+ let max_faulty = 3;
+
+ let agreement = agree(
+ 100_000,
+ i_rx.map_err(|_| ()),
+ o_tx.sink_map_err(|_| ()),
+ max_faulty,
+ );
+
+ ::std::thread::spawn(move || {
+ let _i_tx = i_tx;
+ let _ = agreement.wait();
+ });
+
+ let sent_message = o_rx.wait()
+ .next()
+ .expect("to have a next item")
+ .expect("not to have an error");
+ let Message::Prepare(p) = sent_message;
+
+ assert_eq!(p, 100_000);
+ }
+
+ #[test]
+ fn concludes_on_2f_prepares() {
+ let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
+ let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
+ let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
+ let max_faulty = 3;
+
+ let agreement = agree(
+ 100_000,
+ i_rx.map_err(|_| ()),
+ o_tx.sink_map_err(|_| ()),
+ max_faulty,
+ );
+
+ let iter = (0..(max_faulty * 2)).map(|i| {
+ LocalizedMessage {
+ message: Message::Prepare(100_000),
+ sender: i,
+ }
+ });
+
+ let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
+
+ ::std::thread::spawn(move || {
+ ::std::thread::sleep(::std::time::Duration::from_secs(5));
+ timeout_tx.send(None).unwrap();
+ });
+
+ let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
+ .wait()
+ .map(|(r, _)| r)
+ .map_err(|(e, _)| e)
+ .expect("not to have an error")
+ .expect("not to fail to agree");
+
+ assert_eq!(agreed_value, 100_000);
+ }
+
+ #[test]
+ fn never_concludes_on_less_than_2f_prepares() {
+ let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
+ let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
+ let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
+ let max_faulty = 3;
+
+ let agreement = agree(
+ 100_000,
+ i_rx.map_err(|_| ()),
+ o_tx.sink_map_err(|_| ()),
+ max_faulty,
+ );
+
+ let iter = (1..(max_faulty * 2)).map(|i| {
+ LocalizedMessage {
+ message: Message::Prepare(100_000),
+ sender: i,
+ }
+ });
+
+ let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
+
+ ::std::thread::spawn(move || {
+ ::std::thread::sleep(::std::time::Duration::from_millis(250));
+ timeout_tx.send(None).unwrap();
+ });
+
+ let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
+ .wait()
+ .map(|(r, _)| r)
+ .map_err(|(e, _)| e)
+ .expect("not to have an error");
+
+ assert!(agreed_value.is_none());
+ }
+}