tests for BFT agreement

This commit is contained in:
Robert Habermeier
2017-12-18 11:47:56 +01:00
parent 6da40e8a7a
commit 224efd656b
+114 -5
View File
@@ -58,12 +58,13 @@ pub struct LocalizedMessage<P, V> {
// TODO: consider cross-view committing?
// TODO: impl future.
pub fn agree<'a, P, I, O, V>(local_proposal: P, input: I, output: O, max_faulty: usize)
-> Box<Future<Item=P, Error=I::Error> + 'a>
-> Box<Future<Item=P, Error=I::Error> + Send + 'a>
where
P: 'a + Eq + Clone,
V: 'a + Hash + Eq,
I: 'a + Stream<Item=LocalizedMessage<P, V>>,
O: 'a + Sink<SinkItem=Message<P>,SinkError=I::Error>,
P: 'a + Send + Eq + Clone,
V: 'a + Send + Hash + Eq,
I: 'a + Send + Stream<Item=LocalizedMessage<P, V>>,
O: 'a + Send + Sink<SinkItem=Message<P>,SinkError=I::Error>,
I::Error: Send
{
let prepared = HashSet::new();
@@ -90,3 +91,111 @@ pub fn agree<'a, P, I, O, V>(local_proposal: P, input: I, output: O, max_faulty:
Box::new(broadcast_message.and_then(move |_| wait_for_prepares))
}
#[cfg(test)]
mod tests {
use futures::{Future, Stream, Sink};
use super::*;
#[test]
fn broadcasts_message() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel::<LocalizedMessage<usize, usize>>(10);
let (o_tx, o_rx) = ::futures::sync::mpsc::channel(10);
let max_faulty = 3;
let agreement = agree(
100_000,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
);
::std::thread::spawn(move || {
let _i_tx = i_tx;
let _ = agreement.wait();
});
let sent_message = o_rx.wait()
.next()
.expect("to have a next item")
.expect("not to have an error");
let Message::Prepare(p) = sent_message;
assert_eq!(p, 100_000);
}
#[test]
fn concludes_on_2f_prepares() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
let max_faulty = 3;
let agreement = agree(
100_000,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
);
let iter = (0..(max_faulty * 2)).map(|i| {
LocalizedMessage {
message: Message::Prepare(100_000),
sender: i,
}
});
let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
::std::thread::spawn(move || {
::std::thread::sleep(::std::time::Duration::from_secs(5));
timeout_tx.send(None).unwrap();
});
let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
.wait()
.map(|(r, _)| r)
.map_err(|(e, _)| e)
.expect("not to have an error")
.expect("not to fail to agree");
assert_eq!(agreed_value, 100_000);
}
#[test]
fn never_concludes_on_less_than_2f_prepares() {
let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
let max_faulty = 3;
let agreement = agree(
100_000,
i_rx.map_err(|_| ()),
o_tx.sink_map_err(|_| ()),
max_faulty,
);
let iter = (1..(max_faulty * 2)).map(|i| {
LocalizedMessage {
message: Message::Prepare(100_000),
sender: i,
}
});
let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
::std::thread::spawn(move || {
::std::thread::sleep(::std::time::Duration::from_millis(250));
timeout_tx.send(None).unwrap();
});
let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
.wait()
.map(|(r, _)| r)
.map_err(|(e, _)| e)
.expect("not to have an error");
assert!(agreed_value.is_none());
}
}