diff --git a/substrate/candidate-agreement/src/bft.rs b/substrate/candidate-agreement/src/bft.rs
deleted file mode 100644
index ae240d524f..0000000000
--- a/substrate/candidate-agreement/src/bft.rs
+++ /dev/null
@@ -1,409 +0,0 @@
-// Copyright 2017 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-//! BFT Agreement based on a proposal.
-//!
-//! This is based off of PBFT with an assumption that a proposal is already
-//! known by each node. The proposals they have may differ, so the agreement
-//! may never complete.
-
-use std::collections::{HashMap, HashSet};
-use std::hash::Hash;
-
-use futures::{Future, Stream, Sink};
-use futures::future::{ok, loop_fn, Loop};
-
-/// Messages over the proposal.
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub enum Message
{
- /// Prepare to vote for proposal P.
- Prepare(P),
-}
-
-/// A localized message, including the sender.
-#[derive(Debug, Clone)]
-pub struct LocalizedMessage
{
- /// The message received.
- pub message: Message
,
- /// The sender of the message
- pub sender: V,
- /// The signature of the message.
- pub signature: S,
-}
-
-/// The agreed-upon data.
-#[derive(Debug, Clone)]
-pub struct Agreed
{
- /// The agreed-upon proposal.
- pub proposal: P,
- /// The justification for the proposal.
- pub justification: Vec>,
-}
-
-/// Check validity and compactness justification set for a proposal.
-///
-/// Validity checks whether the set of signed messages is enough to justify
-/// the agreement of the proposal by the validators.
-///
-/// Compactness enforces that no extraneous messages are included.
-///
-/// Provide the proposal, the justification set to check, and a closure for
-/// extracting validator IDs from signatures. Should return true only if the
-/// signature is valid and the signer was a validator at that time.
-pub fn check_justification(
- proposal: P,
- justification: &[LocalizedMessage
],
- max_faulty: usize,
- check_sig: C,
-) -> bool
- where
- P: Eq,
- V: Hash + Eq,
- C: Fn(&Message
, &S) -> Option
-{
- let mut prepared = HashSet::new();
-
- for message in justification {
- let signer = match check_sig(&message.message, &message.signature) {
- Some(signer) => signer,
- None => return false, // compactness.
- };
-
- if signer != message.sender { return false }
-
- match message.message {
- Message::Prepare(ref p) if p == &proposal => {},
- _ => return false,
- };
-
- // compactness
- if !prepared.insert(signer) { return false }
-
- if prepared.len() > max_faulty * 2 { return true }
- }
-
- false
-}
-
-/// Reach BFT agreement. Input the local proposal, message input stream, message output stream,
-/// and maximum number of faulty participants.
-///
-/// Messages should only be yielded from the input stream if the sender is authorized
-/// to send messages.
-///
-/// The input stream also may never conclude or the agreement code will panic.
-/// Duplicate messages are allowed.
-///
-/// The output stream assumes that messages will eventually be delivered to all
-/// honest participants, either by repropagation, gossip, or some reliable
-/// broadcast mechanism.
-///
-/// This will collect 2f + 1 "prepare" messages. Since this is all within a single
-/// view, the commit phase is not necessary.
-// TODO: consider cross-view committing
-// TODO: impl future.
-pub fn agree<'a, P, V, S, F, I, O>(
- local_proposal: P,
- local_id: V,
- mut sign_local: F,
- input: I,
- output: O,
- max_faulty: usize,
-) -> Box, Error=I::Error> + Send + 'a>
- where
- P: 'a + Send + Hash + Eq + Clone,
- V: 'a + Send + Hash + Eq + Clone,
- S: 'a + Send + Eq + Clone,
- F: 'a + Send + FnMut(&Message) -> S,
- I: 'a + Send + Stream- >,
- O: 'a + Send + Sink,SinkError=I::Error>,
- I::Error: Send
-{
- use std::collections::hash_map::Entry;
-
- let voting_for = HashMap::new();
- let prepared = HashMap::new();
-
- let local_prepare = {
- let local_prepare = Message::Prepare(local_proposal);
- let local_signature = sign_local(&local_prepare);
-
- LocalizedMessage {
- message: local_prepare,
- sender: local_id,
- signature: local_signature,
- }
- };
-
- // broadcast out our local prepare message and shortcut it into our input
- // stream.
- let broadcast_message = output.send(local_prepare.clone());
- let input = ::futures::stream::once(Ok(local_prepare)).chain(input);
-
- let wait_for_prepares = loop_fn((input, voting_for, prepared), move |(input, mut voting_for, mut prepared)| {
- input.into_future().and_then(move |(msg, remainder)| {
- let msg = msg.expect("input stream never concludes; qed");
- let LocalizedMessage { message: Message::Prepare(p), sender, signature } = msg;
-
- let is_complete = match voting_for.entry(sender) {
- Entry::Occupied(_) => {
- // TODO: handle double vote.
- false
- }
- Entry::Vacant(vacant) => {
- vacant.insert((p.clone(), signature));
- let n = prepared.entry(p.clone()).or_insert(0);
- *n += 1;
- *n > max_faulty * 2
- }
- };
-
- if is_complete {
- let justification = voting_for.into_iter().filter_map(|(v, (x, s))| {
- if x == p {
- Some(LocalizedMessage {
- message: Message::Prepare(x),
- sender: v,
- signature: s,
- })
- } else {
- None
- }
- }).collect();
-
- ok(Loop::Break(Agreed {
- justification,
- proposal: p,
- }))
- } else {
- ok(Loop::Continue((remainder, voting_for, prepared)))
- }
-
- }).map_err(|(e, _)| e)
- });
-
- Box::new(broadcast_message.and_then(move |_| wait_for_prepares))
-}
-
-#[cfg(test)]
-mod tests {
- use futures::{Future, Stream, Sink};
- use super::*;
-
- #[test]
- fn broadcasts_message() {
- let (i_tx, i_rx) = ::futures::sync::mpsc::channel::>(10);
- let (o_tx, o_rx) = ::futures::sync::mpsc::channel(10);
- let max_faulty = 3;
-
- let agreement = agree(
- 100_000,
- 255,
- |_msg| true,
- i_rx.map_err(|_| ()),
- o_tx.sink_map_err(|_| ()),
- max_faulty,
- );
-
- ::std::thread::spawn(move || {
- let _i_tx = i_tx;
- let _ = agreement.wait();
- });
-
- let sent_message = o_rx.wait()
- .next()
- .expect("to have a next item")
- .expect("not to have an error");
-
- let Message::Prepare(p) = sent_message.message;
- assert_eq!(p, 100_000);
- assert_eq!(sent_message.sender, 255);
- }
-
- #[test]
- fn concludes_on_2f_prepares_for_local_proposal() {
- let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
- let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
- let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
- let max_faulty = 3;
-
- let agreement = agree(
- 100_000,
- 255,
- |msg| (msg.clone(), 255),
- i_rx.map_err(|_| ()),
- o_tx.sink_map_err(|_| ()),
- max_faulty,
- );
-
- let iter = (0..(max_faulty * 2)).map(|i| {
- LocalizedMessage {
- message: Message::Prepare(100_000),
- sender: i,
- signature: (Message::Prepare(100_000), i),
- }
- });
-
- let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
-
- ::std::thread::spawn(move || {
- ::std::thread::sleep(::std::time::Duration::from_secs(5));
- timeout_tx.send(None).unwrap();
- });
-
- let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
- .wait()
- .map(|(r, _)| r)
- .map_err(|(e, _)| e)
- .expect("not to have an error")
- .expect("not to fail to agree");
-
- assert_eq!(agreed_value.proposal, 100_000);
- assert!(check_justification(
- agreed_value.proposal,
- &agreed_value.justification,
- max_faulty,
- |msg, sig| if msg == &sig.0 { Some(sig.1) } else { None }
- ));
- }
-
- #[test]
- fn concludes_on_2f_plus_one_prepares_for_alternate_proposal() {
- let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
- let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
- let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
- let max_faulty = 3;
-
- let agreement = agree(
- 100_000,
- 255,
- |msg| (msg.clone(), 255),
- i_rx.map_err(|_| ()),
- o_tx.sink_map_err(|_| ()),
- max_faulty,
- );
-
- let iter = (0..(max_faulty * 2 + 1)).map(|i| {
- LocalizedMessage {
- message: Message::Prepare(100_001),
- sender: i,
- signature: (Message::Prepare(100_001), i),
- }
- });
-
- let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
-
- ::std::thread::spawn(move || {
- ::std::thread::sleep(::std::time::Duration::from_secs(5));
- timeout_tx.send(None).unwrap();
- });
-
- let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
- .wait()
- .map(|(r, _)| r)
- .map_err(|(e, _)| e)
- .expect("not to have an error")
- .expect("not to fail to agree");
-
- assert_eq!(agreed_value.proposal, 100_001);
- assert!(check_justification(
- agreed_value.proposal,
- &agreed_value.justification,
- max_faulty,
- |msg, sig| if msg == &sig.0 { Some(sig.1) } else { None }
- ));
- }
-
- #[test]
- fn never_concludes_on_less_than_2f_prepares_for_local() {
- let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
- let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
- let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
- let max_faulty = 3;
-
- let agreement = agree(
- 100_000,
- 255,
- |_msg| true,
- i_rx.map_err(|_| ()),
- o_tx.sink_map_err(|_| ()),
- max_faulty,
- );
-
- let iter = (1..(max_faulty * 2)).map(|i| {
- LocalizedMessage {
- message: Message::Prepare(100_000),
- sender: i,
- signature: true,
- }
- });
-
- let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
-
- ::std::thread::spawn(move || {
- ::std::thread::sleep(::std::time::Duration::from_millis(250));
- timeout_tx.send(None).unwrap();
- });
-
- let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
- .wait()
- .map(|(r, _)| r)
- .map_err(|(e, _)| e)
- .expect("not to have an error");
-
- assert!(agreed_value.is_none());
- }
-
- #[test]
- fn never_concludes_on_less_than_2f_plus_one_prepares_for_alternate() {
- let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10);
- let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10);
- let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel();
- let max_faulty = 3;
-
- let agreement = agree(
- 100_000,
- 255,
- |_msg| true,
- i_rx.map_err(|_| ()),
- o_tx.sink_map_err(|_| ()),
- max_faulty,
- );
-
- let iter = (1..(max_faulty * 2 + 1)).map(|i| {
- LocalizedMessage {
- message: Message::Prepare(100_001),
- sender: i,
- signature: true,
- }
- });
-
- let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap();
-
- ::std::thread::spawn(move || {
- ::std::thread::sleep(::std::time::Duration::from_millis(250));
- timeout_tx.send(None).unwrap();
- });
-
- let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ()))
- .wait()
- .map(|(r, _)| r)
- .map_err(|(e, _)| e)
- .expect("not to have an error");
-
- assert!(agreed_value.is_none());
- }
-}
diff --git a/substrate/candidate-agreement/src/bft/accumulator.rs b/substrate/candidate-agreement/src/bft/accumulator.rs
new file mode 100644
index 0000000000..69d6329f9b
--- /dev/null
+++ b/substrate/candidate-agreement/src/bft/accumulator.rs
@@ -0,0 +1,528 @@
+// Copyright 2017 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Message accumulator for each round of BFT consensus.
+
+use std::collections::{HashMap, HashSet};
+use std::collections::hash_map::Entry;
+use std::hash::Hash;
+
+use super::{Message, LocalizedMessage};
+
+/// Context necessary to execute a round of BFT.
+pub trait Context {
+ /// A full candidate.
+ type Candidate: Clone;
+ /// Unique digest of a proposed candidate (think hash).
+ type Digest: Hash + Eq + Clone;
+ /// Validator ID.
+ type ValidatorId: Hash + Eq;
+ /// A signature.
+ type Signature: Eq + Clone;
+}
+
+/// Justification at a given round.
+#[derive(PartialEq, Eq, Debug)]
+pub struct Justification {
+ /// The round.
+ pub round_number: usize,
+ /// The digest prepared for.
+ pub digest: D,
+ /// Signatures for the prepare messages.
+ pub signatures: Vec
,
+}
+
+impl Justification {
+ /// Fails if there are duplicate signatures or invalid.
+ ///
+ /// Provide a closure for checking whether the signature is valid on a
+ /// digest.
+ ///
+ /// The closure should return true iff the round number, digest, and signature
+ /// represent a valid prepare message and the signer was authorized to issue
+ /// it.
+ pub fn check(&self, max_faulty: usize, check_message: F) -> bool
+ where
+ F: Fn(usize, &D, &S) -> Option,
+ V: Hash + Eq,
+ {
+ let mut prepared = HashSet::new();
+
+ let mut good = false;
+ for signature in &self.signatures {
+ match check_message(self.round_number, &self.digest, signature) {
+ None => return false,
+ Some(v) => {
+ if !prepared.insert(v) {
+ return false;
+ } else if prepared.len() > max_faulty * 2 {
+ // don't return just yet since later signatures may be invalid.
+ good = true;
+ }
+ }
+ }
+ }
+
+ good
+ }
+}
+
+/// Type alias to represent a justification specifically for a prepare.
+pub type PrepareJustification = Justification;
+
+/// The round's state, based on imported messages.
+#[derive(PartialEq, Eq, Debug)]
+pub enum State {
+ /// No proposal yet.
+ Begin,
+ /// Proposal received.
+ Proposed(C),
+ /// Seen 2f + 1 prepares for this digest.
+ Prepared(PrepareJustification),
+ /// Seen 2f + 1 commits for a digest.
+ Concluded(Justification),
+ /// Seen 2f + 1 round-advancement messages.
+ Advanced(Option>),
+}
+
+/// Accumulates messages for a given round of BFT consensus.
+pub struct Accumulator {
+ round_number: usize,
+ max_faulty: usize,
+ round_proposer: V,
+ proposal: Option,
+ prepares: HashMap,
+ commits: HashMap,
+ vote_counts: HashMap,
+ advance_round: HashSet,
+ state: State,
+}
+
+impl Accumulator
+ where
+ C: Eq + Clone,
+ D: Hash + Clone + Eq,
+ V: Hash + Eq,
+ S: Eq + Clone,
+{
+ /// Create a new state accumulator.
+ pub fn new(round_number: usize, max_faulty: usize, round_proposer: V) -> Self {
+ Accumulator {
+ round_number,
+ max_faulty,
+ round_proposer,
+ proposal: None,
+ prepares: HashMap::new(),
+ commits: HashMap::new(),
+ vote_counts: HashMap::new(),
+ advance_round: HashSet::new(),
+ state: State::Begin,
+ }
+ }
+
+ /// How advance votes we have seen.
+ pub fn advance_votes(&self) -> usize {
+ self.advance_round.len()
+ }
+
+ /// Inspect the current consensus state.
+ pub fn state(&self) -> &State {
+ &self.state
+ }
+
+ /// Import a message. Importing duplicates is fine, but the signature
+ /// and authorization should have already been checked.
+ pub fn import_message(
+ &mut self,
+ message: LocalizedMessage,
+ )
+ {
+ // old message.
+ if message.message.round_number() != self.round_number {
+ return;
+ }
+
+ let (sender, signature) = (message.sender, message.signature);
+
+ match message.message {
+ Message::Propose(_, p) => self.import_proposal(p, sender, signature),
+ Message::Prepare(_, d) => self.import_prepare(d, sender, signature),
+ Message::Commit(_, d) => self.import_commit(d, sender, signature),
+ Message::AdvanceRound(_) => self.import_advance_round(sender),
+ }
+ }
+
+ fn import_proposal(
+ &mut self,
+ proposal: C,
+ sender: V,
+ signature: S,
+ ) {
+ if sender != self.round_proposer || self.proposal.is_some() { return }
+
+ self.proposal = Some(proposal.clone());
+ self.state = State::Proposed(proposal);
+ }
+
+ fn import_prepare(
+ &mut self,
+ candidate: D,
+ sender: V,
+ signature: S,
+ ) {
+ // ignore any subsequent prepares by the same sender.
+ // TODO: if digest is different, that's misbehavior.
+ let prepared_for = if let Entry::Vacant(vacant) = self.prepares.entry(sender) {
+ vacant.insert((candidate.clone(), signature));
+ let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0));
+ count.0 += 1;
+
+ if count.0 == self.max_faulty * 2 + 1 {
+ Some(candidate)
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
+ // only allow transition to prepare from begin or proposed state.
+ let valid_transition = match self.state {
+ State::Begin | State::Proposed(_) => true,
+ _ => false,
+ };
+
+ if let (true, Some(prepared_for)) = (valid_transition, prepared_for) {
+ let signatures = self.prepares
+ .values()
+ .filter(|&&(ref d, _)| d == &prepared_for)
+ .map(|&(_, ref s)| s.clone())
+ .collect();
+
+ self.state = State::Prepared(PrepareJustification {
+ round_number: self.round_number,
+ digest: prepared_for,
+ signatures: signatures,
+ });
+ }
+ }
+
+ fn import_commit(
+ &mut self,
+ candidate: D,
+ sender: V,
+ signature: S,
+ ) {
+ // ignore any subsequent commits by the same sender.
+ // TODO: if digest is different, that's misbehavior.
+ let committed_for = if let Entry::Vacant(vacant) = self.commits.entry(sender) {
+ vacant.insert((candidate.clone(), signature));
+ let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0));
+ count.1 += 1;
+
+ if count.1 == self.max_faulty * 2 + 1 {
+ Some(candidate)
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
+ // transition to concluded state always valid.
+ // only weird case is if the prior state was "advanced",
+ // but technically it's the same behavior as if the order of receiving
+ // the last "advance round" and "commit" messages were reversed.
+ if let Some(committed_for) = committed_for {
+ let signatures = self.commits
+ .values()
+ .filter(|&&(ref d, _)| d == &committed_for)
+ .map(|&(_, ref s)| s.clone())
+ .collect();
+
+ self.state = State::Concluded(Justification {
+ round_number: self.round_number,
+ digest: committed_for,
+ signatures: signatures,
+ });
+ }
+ }
+
+ fn import_advance_round(
+ &mut self,
+ sender: V,
+ ) {
+ self.advance_round.insert(sender);
+
+ if self.advance_round.len() != self.max_faulty * 2 + 1 { return }
+
+ // allow transition to new round only if we haven't produced a justification
+ // yet.
+ self.state = match ::std::mem::replace(&mut self.state, State::Begin) {
+ State::Concluded(j) => State::Concluded(j),
+ State::Prepared(j) => State::Advanced(Some(j)),
+ State::Advanced(j) => State::Advanced(j),
+ State::Begin | State::Proposed(_) => State::Advanced(None),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[derive(Clone, PartialEq, Eq, Debug)]
+ pub struct Candidate(usize);
+
+ #[derive(Hash, PartialEq, Eq, Clone, Debug)]
+ pub struct Digest(usize);
+
+ #[derive(Hash, PartialEq, Eq, Debug)]
+ pub struct ValidatorId(usize);
+
+ #[derive(PartialEq, Eq, Clone, Debug)]
+ pub struct Signature(usize, usize);
+
+ #[test]
+ fn justification_checks_out() {
+ let mut justification = Justification {
+ round_number: 2,
+ digest: Digest(600),
+ signatures: (0..10).map(|i| Signature(600, i)).collect(),
+ };
+
+ let check_message = |r, d: &Digest, s: &Signature| {
+ if r == 2 && d.0 == 600 && s.0 == 600 {
+ Some(ValidatorId(s.1))
+ } else {
+ None
+ }
+ };
+
+ assert!(justification.check(3, &check_message));
+ assert!(!justification.check(5, &check_message));
+
+ {
+ // one bad signature is enough to spoil it.
+ justification.signatures.push(Signature(1001, 255));
+ assert!(!justification.check(3, &check_message));
+
+ justification.signatures.pop();
+ }
+ // duplicates not allowed.
+ justification.signatures.extend((0..10).map(|i| Signature(600, i)));
+ assert!(!justification.check(3, &check_message));
+ }
+
+ #[test]
+ fn accepts_proposal_from_proposer_only() {
+ let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 3, ValidatorId(8));
+ assert_eq!(accumulator.state(), &State::Begin);
+
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(5),
+ signature: Signature(999, 5),
+ message: Message::Propose(1, Candidate(999)),
+ });
+
+ assert_eq!(accumulator.state(), &State::Begin);
+
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(8),
+ signature: Signature(999, 8),
+ message: Message::Propose(1, Candidate(999)),
+ });
+
+ assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
+ }
+
+ #[test]
+ fn reaches_prepare_phase() {
+ let mut accumulator = Accumulator::new(1, 3, ValidatorId(8));
+ assert_eq!(accumulator.state(), &State::Begin);
+
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(8),
+ signature: Signature(999, 8),
+ message: Message::Propose(1, Candidate(999)),
+ });
+
+ assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
+
+ for i in 0..6 {
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(i),
+ signature: Signature(999, i),
+ message: Message::Prepare(1, Digest(999)),
+ });
+
+ assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
+ }
+
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(7),
+ signature: Signature(999, 7),
+ message: Message::Prepare(1, Digest(999)),
+ });
+
+ match accumulator.state() {
+ &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
+ s => panic!("wrong state: {:?}", s),
+ }
+ }
+
+ #[test]
+ fn prepare_to_commit() {
+ let mut accumulator = Accumulator::new(1, 3, ValidatorId(8));
+ assert_eq!(accumulator.state(), &State::Begin);
+
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(8),
+ signature: Signature(999, 8),
+ message: Message::Propose(1, Candidate(999)),
+ });
+
+ assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
+
+ for i in 0..6 {
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(i),
+ signature: Signature(999, i),
+ message: Message::Prepare(1, Digest(999)),
+ });
+
+ assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
+ }
+
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(7),
+ signature: Signature(999, 7),
+ message: Message::Prepare(1, Digest(999)),
+ });
+
+ match accumulator.state() {
+ &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
+ s => panic!("wrong state: {:?}", s),
+ }
+
+ for i in 0..6 {
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(i),
+ signature: Signature(999, i),
+ message: Message::Commit(1, Digest(999)),
+ });
+
+ match accumulator.state() {
+ &State::Prepared(_) => {},
+ s => panic!("wrong state: {:?}", s),
+ }
+ }
+
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(7),
+ signature: Signature(999, 7),
+ message: Message::Commit(1, Digest(999)),
+ });
+
+ match accumulator.state() {
+ &State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)),
+ s => panic!("wrong state: {:?}", s),
+ }
+ }
+
+ #[test]
+ fn prepare_to_advance() {
+ let mut accumulator = Accumulator::new(1, 3, ValidatorId(8));
+ assert_eq!(accumulator.state(), &State::Begin);
+
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(8),
+ signature: Signature(999, 8),
+ message: Message::Propose(1, Candidate(999)),
+ });
+
+ assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
+
+ for i in 0..7 {
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(i),
+ signature: Signature(999, i),
+ message: Message::Prepare(1, Digest(999)),
+ });
+ }
+
+ match accumulator.state() {
+ &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
+ s => panic!("wrong state: {:?}", s),
+ }
+
+ for i in 0..6 {
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(i),
+ signature: Signature(999, i),
+ message: Message::AdvanceRound(1),
+ });
+
+ match accumulator.state() {
+ &State::Prepared(_) => {},
+ s => panic!("wrong state: {:?}", s),
+ }
+ }
+
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(7),
+ signature: Signature(999, 7),
+ message: Message::AdvanceRound(1),
+ });
+
+ match accumulator.state() {
+ &State::Advanced(Some(_)) => {},
+ s => panic!("wrong state: {:?}", s),
+ }
+ }
+
+ #[test]
+ fn conclude_different_than_proposed() {
+ let mut accumulator = Accumulator::::new(1, 3, ValidatorId(8));
+ assert_eq!(accumulator.state(), &State::Begin);
+
+ for i in 0..7 {
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(i),
+ signature: Signature(999, i),
+ message: Message::Prepare(1, Digest(999)),
+ });
+ }
+
+ match accumulator.state() {
+ &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
+ s => panic!("wrong state: {:?}", s),
+ }
+
+ for i in 0..7 {
+ accumulator.import_message(LocalizedMessage {
+ sender: ValidatorId(i),
+ signature: Signature(999, i),
+ message: Message::Commit(1, Digest(999)),
+ });
+ }
+
+ match accumulator.state() {
+ &State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)),
+ s => panic!("wrong state: {:?}", s),
+ }
+ }
+}
diff --git a/substrate/candidate-agreement/src/bft/mod.rs b/substrate/candidate-agreement/src/bft/mod.rs
new file mode 100644
index 0000000000..da4b6fe91d
--- /dev/null
+++ b/substrate/candidate-agreement/src/bft/mod.rs
@@ -0,0 +1,100 @@
+// Copyright 2017 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! BFT Agreement based on a rotating proposer in different rounds.
+
+use std::collections::{HashMap, HashSet};
+use std::hash::Hash;
+
+use futures::{IntoFuture, Future, Stream, Sink};
+use futures::future::{ok, loop_fn, Loop};
+
+mod accumulator;
+
+/// Messages over the proposal.
+/// Each message carries an associated round number.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Message {
+ /// Send a full proposal.
+ Propose(usize, P),
+ /// Prepare to vote for proposal with digest D.
+ Prepare(usize, D),
+ /// Commit to proposal with digest D..
+ Commit(usize, D),
+ /// Propose advancement to a new round.
+ AdvanceRound(usize),
+}
+
+impl
Message
{
+ fn round_number(&self) -> usize {
+ match *self {
+ Message::Propose(round, _) => round,
+ Message::Prepare(round, _) => round,
+ Message::Commit(round, _) => round,
+ Message::AdvanceRound(round) => round,
+ }
+ }
+}
+
+/// A localized message, including the sender.
+#[derive(Debug, Clone)]
+pub struct LocalizedMessage {
+ /// The message received.
+ pub message: Message,
+ /// The sender of the message
+ pub sender: V,
+ /// The signature of the message.
+ pub signature: S,
+}
+
+/// The agreed-upon data.
+#[derive(Debug, Clone)]
+pub struct Agreed {
+ /// The agreed-upon proposal.
+ pub proposal: P,
+ /// The justification for the proposal.
+ pub justification: Vec>,
+}
+
+/// Parameters to agreement.
+pub struct Params<
+ Validator,
+ SignLocal,
+ Timeout,
+ CanInclude,
+ MessagesIn,
+ MessagesOut,
+> {
+ /// The ID of the current view's primary.
+ pub primary: Validator,
+ /// The local ID.
+ pub local_id: Validator,
+ /// A closure for signing local messages.
+ pub sign_local: SignLocal,
+ /// A timeout that fires when the view change should begin.
+ pub begin_view_change: Timeout,
+ /// A function for checking if a proposal can be voted for.
+ pub can_include: CanInclude,
+ /// The input stream. Should never conclude, and should yield only messages
+ /// sent by validators and which have been authenticated properly.
+ pub input: MessagesIn,
+ /// The output message sink. This assumes that messages will eventually
+ /// be delivered to all honest participants, either by repropagation, gossip,
+ /// or some reliable broadcast mechanism.
+ pub output: MessagesOut,
+ /// The maximum number of faulty nodes.
+ pub max_faulty: usize,
+}
diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs
index c072be3ee0..68025c3708 100644
--- a/substrate/candidate-agreement/src/table.rs
+++ b/substrate/candidate-agreement/src/table.rs
@@ -32,6 +32,49 @@ use std::collections::hash_map::{HashMap, Entry};
use std::hash::Hash;
use std::fmt::Debug;
+/// Context for the statement table.
+pub trait Context {
+ /// A validator ID
+ type ValidatorId: Hash + Eq + Clone + Debug;
+ /// The digest (hash or other unique attribute) of a candidate.
+ type Digest: Hash + Eq + Clone + Debug;
+ /// Candidate type.
+ type Candidate: Ord + Eq + Clone + Debug;
+ /// The group ID type
+ type GroupId: Hash + Ord + Eq + Clone + Debug;
+ /// A signature type.
+ type Signature: Eq + Clone + Debug;
+
+ /// get the digest of a candidate.
+ fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
+
+ /// get the group of a candidate.
+ fn candidate_group(&self, candidate: &Self::Candidate) -> Self::GroupId;
+
+ /// Whether a validator is a member of a group.
+ /// Members are meant to submit candidates and vote on validity.
+ fn is_member_of(&self, validator: &Self::ValidatorId, group: &Self::GroupId) -> bool;
+
+ /// Whether a validator is an availability guarantor of a group.
+ /// Guarantors are meant to vote on availability for candidates submitted
+ /// in a group.
+ fn is_availability_guarantor_of(
+ &self,
+ validator: &Self::ValidatorId,
+ group: &Self::GroupId,
+ ) -> bool;
+
+ // recover signer of statement and ensure the signature corresponds to the
+ // statement.
+ fn statement_signer(
+ &self,
+ statement: &SignedStatement,
+ ) -> Option;
+
+ // requisite number of votes for validity and availability respectively from a group.
+ fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize);
+}
+
/// Statements circulated among peers.
#[derive(PartialEq, Eq, Debug)]
pub enum Statement {
@@ -80,49 +123,6 @@ enum StatementTrace {
Available(V, D),
}
-/// Context for the statement table.
-pub trait Context {
- /// A validator ID
- type ValidatorId: Hash + Eq + Clone + Debug;
- /// The digest (hash or other unique attribute) of a candidate.
- type Digest: Hash + Eq + Clone + Debug;
- /// Candidate type.
- type Candidate: Ord + Eq + Clone + Debug;
- /// The group ID type
- type GroupId: Hash + Ord + Eq + Clone + Debug;
- /// A signature type.
- type Signature: Eq + Clone + Debug;
-
- /// get the digest of a candidate.
- fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
-
- /// get the group of a candidate.
- fn candidate_group(&self, candidate: &Self::Candidate) -> Self::GroupId;
-
- /// Whether a validator is a member of a group.
- /// Members are meant to submit candidates and vote on validity.
- fn is_member_of(&self, validator: &Self::ValidatorId, group: &Self::GroupId) -> bool;
-
- /// Whether a validator is an availability guarantor of a group.
- /// Guarantors are meant to vote on availability for candidates submitted
- /// in a group.
- fn is_availability_guarantor_of(
- &self,
- validator: &Self::ValidatorId,
- group: &Self::GroupId,
- ) -> bool;
-
- // recover signer of statement and ensure the signature corresponds to the
- // statement.
- fn statement_signer(
- &self,
- statement: &SignedStatement,
- ) -> Option;
-
- // requisite number of votes for validity and availability respectively from a group.
- fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize);
-}
-
/// Misbehavior: voting more than one way on candidate validity.
///
/// Since there are three possible ways to vote, a double vote is possible in