From 4b3b1249dcc166ebc6d23891b32eac60fb0f1686 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 27 Dec 2017 21:35:19 +0100 Subject: [PATCH] basic tests for the strategy --- substrate/candidate-agreement/src/bft/mod.rs | 54 ++- .../candidate-agreement/src/bft/tests.rs | 324 ++++++++++++++++++ substrate/candidate-agreement/src/lib.rs | 1 - 3 files changed, 359 insertions(+), 20 deletions(-) create mode 100644 substrate/candidate-agreement/src/bft/tests.rs diff --git a/substrate/candidate-agreement/src/bft/mod.rs b/substrate/candidate-agreement/src/bft/mod.rs index 819f7a5da3..bad684820e 100644 --- a/substrate/candidate-agreement/src/bft/mod.rs +++ b/substrate/candidate-agreement/src/bft/mod.rs @@ -18,7 +18,11 @@ mod accumulator; +#[cfg(test)] +mod tests; + use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; use std::hash::Hash; use futures::{future, Future, Stream, Sink, Poll, Async, AsyncSink}; @@ -66,13 +70,13 @@ pub struct LocalizedMessage { /// Context necessary for agreement. pub trait Context { /// Candidate proposed. - type Candidate: Eq + Clone; + type Candidate: Debug + Eq + Clone; /// Candidate digest. - type Digest: Hash + Eq + Clone; + type Digest: Debug + Hash + Eq + Clone; /// Validator ID. - type ValidatorId: Hash + Eq + Clone; + type ValidatorId: Debug + Hash + Eq + Clone; /// Signature. - type Signature: Eq + Clone; + type Signature: Debug + Eq + Clone; /// A future that resolves when a round timeout is concluded. type RoundTimeout: Future; /// A future that resolves when a proposal is ready. @@ -193,6 +197,7 @@ fn bft_threshold(nodes: usize, max_faulty: usize) -> usize { } /// Committed successfully. +#[derive(Debug, Clone)] pub struct Committed { /// The candidate committed for. This will be unknown if /// we never witnessed the proposal of the last round. @@ -214,7 +219,7 @@ impl Locked { // the state of the local node during the current state of consensus. // // behavior is different when locked on a proposal. -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum LocalState { Start, Proposed, @@ -486,7 +491,7 @@ impl Strategy { prepare_for = Some(digest) } None => if context.candidate_valid(candidate) { - prepare_for = Some(digest); + prepare_for = Some(digest); }, } } @@ -641,23 +646,34 @@ impl Future for Agreement }) } + loop { + let message = match self.input.poll()? { + Async::Ready(msg) => msg.ok_or(InputStreamConcluded)?, + Async::NotReady => break, + }; + + match message.0 { + Communication::Message(message) => self.strategy.import_message(message), + Communication::Locked(proof) => self.strategy.import_lock_proof(&self.context, proof), + } + } + + // try to process timeouts. + let state_machine_res = self.strategy.poll(&self.context, &mut self.sending)?; + // make progress on flushing all pending messages. let _ = self.sending.process_all(&mut self.output)?; - // try to process timeouts. - if let Async::Ready(just) = self.strategy.poll(&self.context, &mut self.sending)? { - self.concluded = Some(just); - return self.poll(); + match state_machine_res { + Async::Ready(just) => { + self.concluded = Some(just); + self.poll() + } + Async::NotReady => { + + Ok(Async::NotReady) + } } - - let message = try_ready!(self.input.poll()).ok_or(InputStreamConcluded)?; - - match message.0 { - Communication::Message(message) => self.strategy.import_message(message), - Communication::Locked(proof) => self.strategy.import_lock_proof(&self.context, proof), - } - - self.poll() } } diff --git a/substrate/candidate-agreement/src/bft/tests.rs b/substrate/candidate-agreement/src/bft/tests.rs new file mode 100644 index 0000000000..2228ab2604 --- /dev/null +++ b/substrate/candidate-agreement/src/bft/tests.rs @@ -0,0 +1,324 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Tests for the candidate agreement strategy. + +use super::*; + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use futures::prelude::*; +use futures::sync::{oneshot, mpsc}; +use futures::future::FutureResult; + +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +struct Candidate(usize); + +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +struct Digest(usize); + +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +struct ValidatorId(usize); + +#[derive(Debug, PartialEq, Eq, Clone)] +struct Signature(Message, ValidatorId); + +struct SharedContext { + node_count: usize, + current_round: usize, + awaiting_round_timeouts: HashMap>>, +} + +#[derive(Debug)] +struct Error; + +impl From for Error { + fn from(_: InputStreamConcluded) -> Error { + Error + } +} + +impl SharedContext { + fn new(node_count: usize) -> Self { + SharedContext { + node_count, + current_round: 0, + awaiting_round_timeouts: HashMap::new() + } + } + + fn round_timeout(&mut self, round: usize) -> Box> { + let (tx, rx) = oneshot::channel(); + if round < self.current_round { + tx.send(()).unwrap() + } else { + self.awaiting_round_timeouts + .entry(round) + .or_insert_with(Vec::new) + .push(tx); + } + + Box::new(rx.map_err(|_| Error)) + } + + fn bump_round(&mut self) { + let awaiting_timeout = self.awaiting_round_timeouts + .remove(&self.current_round) + .unwrap_or_else(Vec::new); + + for tx in awaiting_timeout { + let _ = tx.send(()); + } + + self.current_round += 1; + } + + fn round_proposer(&self, round: usize) -> ValidatorId { + ValidatorId(round % self.node_count) + } +} + +struct Ctx { + local_id: ValidatorId, + proposal: Mutex, + shared: Arc>, +} + +impl Context for Ctx { + type Candidate = Candidate; + type Digest = Digest; + type ValidatorId = ValidatorId; + type Signature = Signature; + type RoundTimeout = Box>; + type Proposal = FutureResult; + + fn local_id(&self) -> ValidatorId { + self.local_id.clone() + } + + fn proposal(&self) -> Self::Proposal { + let proposal = { + let mut p = self.proposal.lock().unwrap(); + let x = *p; + *p = (*p * 2) + 1; + x + }; + + Ok(Candidate(proposal)).into_future() + } + + fn candidate_digest(&self, candidate: &Candidate) -> Digest { + Digest(candidate.0) + } + + fn sign_local(&self, message: Message) + -> LocalizedMessage + { + let signature = Signature(message.clone(), self.local_id.clone()); + LocalizedMessage { + message, + signature, + sender: self.local_id.clone() + } + } + + fn round_proposer(&self, round: usize) -> ValidatorId { + self.shared.lock().unwrap().round_proposer(round) + } + + fn candidate_valid(&self, candidate: &Candidate) -> bool { + candidate.0 % 3 != 0 + } + + fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { + self.shared.lock().unwrap().round_timeout(round) + } +} + +type Comm = ContextCommunication; + +struct Network { + endpoints: Vec>, + input: mpsc::UnboundedReceiver<(usize, Comm)>, +} + +impl Network { + fn new(nodes: usize) + -> (Network, Vec>, Vec>) + { + let mut inputs = Vec::with_capacity(nodes); + let mut outputs = Vec::with_capacity(nodes); + let mut endpoints = Vec::with_capacity(nodes); + + let (in_tx, in_rx) = mpsc::unbounded(); + for _ in 0..nodes { + let (out_tx, out_rx) = mpsc::unbounded(); + inputs.push(in_tx.clone()); + outputs.push(out_rx); + endpoints.push(out_tx); + } + + let network = Network { + endpoints, + input: in_rx, + }; + + (network, inputs, outputs) + } + + fn route_on_thread(self) { + ::std::thread::spawn(move || { let _ = self.wait(); }); + } +} + +impl Future for Network { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll<(), Error> { + match self.input.poll() { + Err(_) => Err(Error), + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => Ok(Async::Ready(())), + Ok(Async::Ready(Some((sender, item)))) => { + { + let receiving_endpoints = self.endpoints + .iter() + .enumerate() + .filter(|&(i, _)| i != sender) + .map(|(_, x)| x); + + for endpoint in receiving_endpoints { + let _ = endpoint.unbounded_send(item.clone()); + } + } + + self.poll() + } + } + } +} + +fn timeout_in(t: Duration) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); + ::std::thread::spawn(move || { + ::std::thread::sleep(t); + let _ = tx.send(()); + }); + + rx +} + +#[test] +fn consensus_completes_with_minimum_good() { + let node_count = 10; + let max_faulty = 3; + + let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count))); + + let (network, net_send, net_recv) = Network::new(node_count); + network.route_on_thread(); + + let nodes = net_send + .into_iter() + .zip(net_recv) + .take(node_count - max_faulty) + .enumerate() + .map(|(i, (tx, rx))| { + let ctx = Ctx { + local_id: ValidatorId(i), + proposal: Mutex::new(i), + shared: shared_context.clone(), + }; + + agree( + ctx, + node_count, + max_faulty, + rx.map_err(|_| Error), + tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), + ) + }) + .collect::>(); + + ::std::thread::spawn(move || { + let mut timeout = ::std::time::Duration::from_millis(50); + loop { + ::std::thread::sleep(timeout.clone()); + shared_context.lock().unwrap().bump_round(); + timeout *= 2; + } + }); + + let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); + let results = ::futures::future::join_all(nodes) + .map(Some) + .select(timeout.map(|_| None)) + .wait() + .map(|(i, _)| i) + .map_err(|(e, _)| e) + .expect("to complete") + .expect("to not time out"); + + for result in &results { + assert_eq!(&result.justification.digest, &results[0].justification.digest); + } +} + +#[test] +fn consensus_does_not_complete_without_enough_nodes() { + let node_count = 10; + let max_faulty = 3; + + let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count))); + + let (network, net_send, net_recv) = Network::new(node_count); + network.route_on_thread(); + + let nodes = net_send + .into_iter() + .zip(net_recv) + .take(node_count - max_faulty - 1) + .enumerate() + .map(|(i, (tx, rx))| { + let ctx = Ctx { + local_id: ValidatorId(i), + proposal: Mutex::new(i), + shared: shared_context.clone(), + }; + + agree( + ctx, + node_count, + max_faulty, + rx.map_err(|_| Error), + tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), + ) + }) + .collect::>(); + + let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); + let result = ::futures::future::join_all(nodes) + .map(Some) + .select(timeout.map(|_| None)) + .wait() + .map(|(i, _)| i) + .map_err(|(e, _)| e) + .expect("to complete"); + + assert!(result.is_none(), "not enough online nodes"); +} diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index b23b9c606c..09dd56f5f0 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -29,7 +29,6 @@ //! //! Groups themselves may be compromised by malicious validators. -#[macro_use] extern crate futures; extern crate polkadot_primitives as primitives;