From 224efd656b22ac1024fbfd693e2e2b9cdcef4991 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 18 Dec 2017 11:47:56 +0100 Subject: [PATCH] tests for BFT agreement --- substrate/candidate-agreement/src/bft.rs | 119 ++++++++++++++++++++++- 1 file changed, 114 insertions(+), 5 deletions(-) diff --git a/substrate/candidate-agreement/src/bft.rs b/substrate/candidate-agreement/src/bft.rs index 03c67fcb0f..f41bfd4606 100644 --- a/substrate/candidate-agreement/src/bft.rs +++ b/substrate/candidate-agreement/src/bft.rs @@ -58,12 +58,13 @@ pub struct LocalizedMessage { // TODO: consider cross-view committing? // TODO: impl future. pub fn agree<'a, P, I, O, V>(local_proposal: P, input: I, output: O, max_faulty: usize) - -> Box + 'a> + -> Box + Send + 'a> where - P: 'a + Eq + Clone, - V: 'a + Hash + Eq, - I: 'a + Stream>, - O: 'a + Sink,SinkError=I::Error>, + P: 'a + Send + Eq + Clone, + V: 'a + Send + Hash + Eq, + I: 'a + Send + Stream>, + O: 'a + Send + Sink,SinkError=I::Error>, + I::Error: Send { let prepared = HashSet::new(); @@ -90,3 +91,111 @@ pub fn agree<'a, P, I, O, V>(local_proposal: P, input: I, output: O, max_faulty: Box::new(broadcast_message.and_then(move |_| wait_for_prepares)) } + +#[cfg(test)] +mod tests { + use futures::{Future, Stream, Sink}; + use super::*; + + #[test] + fn broadcasts_message() { + let (i_tx, i_rx) = ::futures::sync::mpsc::channel::>(10); + let (o_tx, o_rx) = ::futures::sync::mpsc::channel(10); + let max_faulty = 3; + + let agreement = agree( + 100_000, + i_rx.map_err(|_| ()), + o_tx.sink_map_err(|_| ()), + max_faulty, + ); + + ::std::thread::spawn(move || { + let _i_tx = i_tx; + let _ = agreement.wait(); + }); + + let sent_message = o_rx.wait() + .next() + .expect("to have a next item") + .expect("not to have an error"); + let Message::Prepare(p) = sent_message; + + assert_eq!(p, 100_000); + } + + #[test] + fn concludes_on_2f_prepares() { + let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10); + let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10); + let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel(); + let max_faulty = 3; + + let agreement = agree( + 100_000, + i_rx.map_err(|_| ()), + o_tx.sink_map_err(|_| ()), + max_faulty, + ); + + let iter = (0..(max_faulty * 2)).map(|i| { + LocalizedMessage { + message: Message::Prepare(100_000), + sender: i, + } + }); + + let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap(); + + ::std::thread::spawn(move || { + ::std::thread::sleep(::std::time::Duration::from_secs(5)); + timeout_tx.send(None).unwrap(); + }); + + let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ())) + .wait() + .map(|(r, _)| r) + .map_err(|(e, _)| e) + .expect("not to have an error") + .expect("not to fail to agree"); + + assert_eq!(agreed_value, 100_000); + } + + #[test] + fn never_concludes_on_less_than_2f_prepares() { + let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10); + let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10); + let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel(); + let max_faulty = 3; + + let agreement = agree( + 100_000, + i_rx.map_err(|_| ()), + o_tx.sink_map_err(|_| ()), + max_faulty, + ); + + let iter = (1..(max_faulty * 2)).map(|i| { + LocalizedMessage { + message: Message::Prepare(100_000), + sender: i, + } + }); + + let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap(); + + ::std::thread::spawn(move || { + ::std::thread::sleep(::std::time::Duration::from_millis(250)); + timeout_tx.send(None).unwrap(); + }); + + let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ())) + .wait() + .map(|(r, _)| r) + .map_err(|(e, _)| e) + .expect("not to have an error"); + + assert!(agreed_value.is_none()); + } +}