From 38b4dc2f2272b2414eeb9bff5929f9ab46f09ea8 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 21 Dec 2017 22:53:17 +0100 Subject: [PATCH] BFT rewrite: vote accumulator with tests --- substrate/candidate-agreement/src/bft.rs | 409 -------------- .../src/bft/accumulator.rs | 528 ++++++++++++++++++ substrate/candidate-agreement/src/bft/mod.rs | 100 ++++ substrate/candidate-agreement/src/table.rs | 86 +-- 4 files changed, 671 insertions(+), 452 deletions(-) delete mode 100644 substrate/candidate-agreement/src/bft.rs create mode 100644 substrate/candidate-agreement/src/bft/accumulator.rs create mode 100644 substrate/candidate-agreement/src/bft/mod.rs diff --git a/substrate/candidate-agreement/src/bft.rs b/substrate/candidate-agreement/src/bft.rs deleted file mode 100644 index ae240d524f..0000000000 --- a/substrate/candidate-agreement/src/bft.rs +++ /dev/null @@ -1,409 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! BFT Agreement based on a proposal. -//! -//! This is based off of PBFT with an assumption that a proposal is already -//! known by each node. The proposals they have may differ, so the agreement -//! may never complete. - -use std::collections::{HashMap, HashSet}; -use std::hash::Hash; - -use futures::{Future, Stream, Sink}; -use futures::future::{ok, loop_fn, Loop}; - -/// Messages over the proposal. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Message

{ - /// Prepare to vote for proposal P. - Prepare(P), -} - -/// A localized message, including the sender. -#[derive(Debug, Clone)] -pub struct LocalizedMessage { - /// The message received. - pub message: Message

, - /// The sender of the message - pub sender: V, - /// The signature of the message. - pub signature: S, -} - -/// The agreed-upon data. -#[derive(Debug, Clone)] -pub struct Agreed { - /// The agreed-upon proposal. - pub proposal: P, - /// The justification for the proposal. - pub justification: Vec>, -} - -/// Check validity and compactness justification set for a proposal. -/// -/// Validity checks whether the set of signed messages is enough to justify -/// the agreement of the proposal by the validators. -/// -/// Compactness enforces that no extraneous messages are included. -/// -/// Provide the proposal, the justification set to check, and a closure for -/// extracting validator IDs from signatures. Should return true only if the -/// signature is valid and the signer was a validator at that time. -pub fn check_justification( - proposal: P, - justification: &[LocalizedMessage], - max_faulty: usize, - check_sig: C, -) -> bool - where - P: Eq, - V: Hash + Eq, - C: Fn(&Message

, &S) -> Option -{ - let mut prepared = HashSet::new(); - - for message in justification { - let signer = match check_sig(&message.message, &message.signature) { - Some(signer) => signer, - None => return false, // compactness. - }; - - if signer != message.sender { return false } - - match message.message { - Message::Prepare(ref p) if p == &proposal => {}, - _ => return false, - }; - - // compactness - if !prepared.insert(signer) { return false } - - if prepared.len() > max_faulty * 2 { return true } - } - - false -} - -/// Reach BFT agreement. Input the local proposal, message input stream, message output stream, -/// and maximum number of faulty participants. -/// -/// Messages should only be yielded from the input stream if the sender is authorized -/// to send messages. -/// -/// The input stream also may never conclude or the agreement code will panic. -/// Duplicate messages are allowed. -/// -/// The output stream assumes that messages will eventually be delivered to all -/// honest participants, either by repropagation, gossip, or some reliable -/// broadcast mechanism. -/// -/// This will collect 2f + 1 "prepare" messages. Since this is all within a single -/// view, the commit phase is not necessary. -// TODO: consider cross-view committing -// TODO: impl future. -pub fn agree<'a, P, V, S, F, I, O>( - local_proposal: P, - local_id: V, - mut sign_local: F, - input: I, - output: O, - max_faulty: usize, -) -> Box, Error=I::Error> + Send + 'a> - where - P: 'a + Send + Hash + Eq + Clone, - V: 'a + Send + Hash + Eq + Clone, - S: 'a + Send + Eq + Clone, - F: 'a + Send + FnMut(&Message

) -> S, - I: 'a + Send + Stream>, - O: 'a + Send + Sink,SinkError=I::Error>, - I::Error: Send -{ - use std::collections::hash_map::Entry; - - let voting_for = HashMap::new(); - let prepared = HashMap::new(); - - let local_prepare = { - let local_prepare = Message::Prepare(local_proposal); - let local_signature = sign_local(&local_prepare); - - LocalizedMessage { - message: local_prepare, - sender: local_id, - signature: local_signature, - } - }; - - // broadcast out our local prepare message and shortcut it into our input - // stream. - let broadcast_message = output.send(local_prepare.clone()); - let input = ::futures::stream::once(Ok(local_prepare)).chain(input); - - let wait_for_prepares = loop_fn((input, voting_for, prepared), move |(input, mut voting_for, mut prepared)| { - input.into_future().and_then(move |(msg, remainder)| { - let msg = msg.expect("input stream never concludes; qed"); - let LocalizedMessage { message: Message::Prepare(p), sender, signature } = msg; - - let is_complete = match voting_for.entry(sender) { - Entry::Occupied(_) => { - // TODO: handle double vote. - false - } - Entry::Vacant(vacant) => { - vacant.insert((p.clone(), signature)); - let n = prepared.entry(p.clone()).or_insert(0); - *n += 1; - *n > max_faulty * 2 - } - }; - - if is_complete { - let justification = voting_for.into_iter().filter_map(|(v, (x, s))| { - if x == p { - Some(LocalizedMessage { - message: Message::Prepare(x), - sender: v, - signature: s, - }) - } else { - None - } - }).collect(); - - ok(Loop::Break(Agreed { - justification, - proposal: p, - })) - } else { - ok(Loop::Continue((remainder, voting_for, prepared))) - } - - }).map_err(|(e, _)| e) - }); - - Box::new(broadcast_message.and_then(move |_| wait_for_prepares)) -} - -#[cfg(test)] -mod tests { - use futures::{Future, Stream, Sink}; - use super::*; - - #[test] - fn broadcasts_message() { - let (i_tx, i_rx) = ::futures::sync::mpsc::channel::>(10); - let (o_tx, o_rx) = ::futures::sync::mpsc::channel(10); - let max_faulty = 3; - - let agreement = agree( - 100_000, - 255, - |_msg| true, - i_rx.map_err(|_| ()), - o_tx.sink_map_err(|_| ()), - max_faulty, - ); - - ::std::thread::spawn(move || { - let _i_tx = i_tx; - let _ = agreement.wait(); - }); - - let sent_message = o_rx.wait() - .next() - .expect("to have a next item") - .expect("not to have an error"); - - let Message::Prepare(p) = sent_message.message; - assert_eq!(p, 100_000); - assert_eq!(sent_message.sender, 255); - } - - #[test] - fn concludes_on_2f_prepares_for_local_proposal() { - let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10); - let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10); - let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel(); - let max_faulty = 3; - - let agreement = agree( - 100_000, - 255, - |msg| (msg.clone(), 255), - i_rx.map_err(|_| ()), - o_tx.sink_map_err(|_| ()), - max_faulty, - ); - - let iter = (0..(max_faulty * 2)).map(|i| { - LocalizedMessage { - message: Message::Prepare(100_000), - sender: i, - signature: (Message::Prepare(100_000), i), - } - }); - - let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap(); - - ::std::thread::spawn(move || { - ::std::thread::sleep(::std::time::Duration::from_secs(5)); - timeout_tx.send(None).unwrap(); - }); - - let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ())) - .wait() - .map(|(r, _)| r) - .map_err(|(e, _)| e) - .expect("not to have an error") - .expect("not to fail to agree"); - - assert_eq!(agreed_value.proposal, 100_000); - assert!(check_justification( - agreed_value.proposal, - &agreed_value.justification, - max_faulty, - |msg, sig| if msg == &sig.0 { Some(sig.1) } else { None } - )); - } - - #[test] - fn concludes_on_2f_plus_one_prepares_for_alternate_proposal() { - let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10); - let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10); - let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel(); - let max_faulty = 3; - - let agreement = agree( - 100_000, - 255, - |msg| (msg.clone(), 255), - i_rx.map_err(|_| ()), - o_tx.sink_map_err(|_| ()), - max_faulty, - ); - - let iter = (0..(max_faulty * 2 + 1)).map(|i| { - LocalizedMessage { - message: Message::Prepare(100_001), - sender: i, - signature: (Message::Prepare(100_001), i), - } - }); - - let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap(); - - ::std::thread::spawn(move || { - ::std::thread::sleep(::std::time::Duration::from_secs(5)); - timeout_tx.send(None).unwrap(); - }); - - let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ())) - .wait() - .map(|(r, _)| r) - .map_err(|(e, _)| e) - .expect("not to have an error") - .expect("not to fail to agree"); - - assert_eq!(agreed_value.proposal, 100_001); - assert!(check_justification( - agreed_value.proposal, - &agreed_value.justification, - max_faulty, - |msg, sig| if msg == &sig.0 { Some(sig.1) } else { None } - )); - } - - #[test] - fn never_concludes_on_less_than_2f_prepares_for_local() { - let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10); - let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10); - let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel(); - let max_faulty = 3; - - let agreement = agree( - 100_000, - 255, - |_msg| true, - i_rx.map_err(|_| ()), - o_tx.sink_map_err(|_| ()), - max_faulty, - ); - - let iter = (1..(max_faulty * 2)).map(|i| { - LocalizedMessage { - message: Message::Prepare(100_000), - sender: i, - signature: true, - } - }); - - let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap(); - - ::std::thread::spawn(move || { - ::std::thread::sleep(::std::time::Duration::from_millis(250)); - timeout_tx.send(None).unwrap(); - }); - - let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ())) - .wait() - .map(|(r, _)| r) - .map_err(|(e, _)| e) - .expect("not to have an error"); - - assert!(agreed_value.is_none()); - } - - #[test] - fn never_concludes_on_less_than_2f_plus_one_prepares_for_alternate() { - let (i_tx, i_rx) = ::futures::sync::mpsc::channel(10); - let (o_tx, _o_rx) = ::futures::sync::mpsc::channel(10); - let (timeout_tx, timeout_rx) = ::futures::sync::oneshot::channel(); - let max_faulty = 3; - - let agreement = agree( - 100_000, - 255, - |_msg| true, - i_rx.map_err(|_| ()), - o_tx.sink_map_err(|_| ()), - max_faulty, - ); - - let iter = (1..(max_faulty * 2 + 1)).map(|i| { - LocalizedMessage { - message: Message::Prepare(100_001), - sender: i, - signature: true, - } - }); - - let (_i_tx, _) = i_tx.send_all(::futures::stream::iter_ok(iter)).wait().unwrap(); - - ::std::thread::spawn(move || { - ::std::thread::sleep(::std::time::Duration::from_millis(250)); - timeout_tx.send(None).unwrap(); - }); - - let agreed_value = agreement.map(Some).select(timeout_rx.map_err(|_| ())) - .wait() - .map(|(r, _)| r) - .map_err(|(e, _)| e) - .expect("not to have an error"); - - assert!(agreed_value.is_none()); - } -} diff --git a/substrate/candidate-agreement/src/bft/accumulator.rs b/substrate/candidate-agreement/src/bft/accumulator.rs new file mode 100644 index 0000000000..69d6329f9b --- /dev/null +++ b/substrate/candidate-agreement/src/bft/accumulator.rs @@ -0,0 +1,528 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Message accumulator for each round of BFT consensus. + +use std::collections::{HashMap, HashSet}; +use std::collections::hash_map::Entry; +use std::hash::Hash; + +use super::{Message, LocalizedMessage}; + +/// Context necessary to execute a round of BFT. +pub trait Context { + /// A full candidate. + type Candidate: Clone; + /// Unique digest of a proposed candidate (think hash). + type Digest: Hash + Eq + Clone; + /// Validator ID. + type ValidatorId: Hash + Eq; + /// A signature. + type Signature: Eq + Clone; +} + +/// Justification at a given round. +#[derive(PartialEq, Eq, Debug)] +pub struct Justification { + /// The round. + pub round_number: usize, + /// The digest prepared for. + pub digest: D, + /// Signatures for the prepare messages. + pub signatures: Vec, +} + +impl Justification { + /// Fails if there are duplicate signatures or invalid. + /// + /// Provide a closure for checking whether the signature is valid on a + /// digest. + /// + /// The closure should return true iff the round number, digest, and signature + /// represent a valid prepare message and the signer was authorized to issue + /// it. + pub fn check(&self, max_faulty: usize, check_message: F) -> bool + where + F: Fn(usize, &D, &S) -> Option, + V: Hash + Eq, + { + let mut prepared = HashSet::new(); + + let mut good = false; + for signature in &self.signatures { + match check_message(self.round_number, &self.digest, signature) { + None => return false, + Some(v) => { + if !prepared.insert(v) { + return false; + } else if prepared.len() > max_faulty * 2 { + // don't return just yet since later signatures may be invalid. + good = true; + } + } + } + } + + good + } +} + +/// Type alias to represent a justification specifically for a prepare. +pub type PrepareJustification = Justification; + +/// The round's state, based on imported messages. +#[derive(PartialEq, Eq, Debug)] +pub enum State { + /// No proposal yet. + Begin, + /// Proposal received. + Proposed(C), + /// Seen 2f + 1 prepares for this digest. + Prepared(PrepareJustification), + /// Seen 2f + 1 commits for a digest. + Concluded(Justification), + /// Seen 2f + 1 round-advancement messages. + Advanced(Option>), +} + +/// Accumulates messages for a given round of BFT consensus. +pub struct Accumulator { + round_number: usize, + max_faulty: usize, + round_proposer: V, + proposal: Option, + prepares: HashMap, + commits: HashMap, + vote_counts: HashMap, + advance_round: HashSet, + state: State, +} + +impl Accumulator + where + C: Eq + Clone, + D: Hash + Clone + Eq, + V: Hash + Eq, + S: Eq + Clone, +{ + /// Create a new state accumulator. + pub fn new(round_number: usize, max_faulty: usize, round_proposer: V) -> Self { + Accumulator { + round_number, + max_faulty, + round_proposer, + proposal: None, + prepares: HashMap::new(), + commits: HashMap::new(), + vote_counts: HashMap::new(), + advance_round: HashSet::new(), + state: State::Begin, + } + } + + /// How advance votes we have seen. + pub fn advance_votes(&self) -> usize { + self.advance_round.len() + } + + /// Inspect the current consensus state. + pub fn state(&self) -> &State { + &self.state + } + + /// Import a message. Importing duplicates is fine, but the signature + /// and authorization should have already been checked. + pub fn import_message( + &mut self, + message: LocalizedMessage, + ) + { + // old message. + if message.message.round_number() != self.round_number { + return; + } + + let (sender, signature) = (message.sender, message.signature); + + match message.message { + Message::Propose(_, p) => self.import_proposal(p, sender, signature), + Message::Prepare(_, d) => self.import_prepare(d, sender, signature), + Message::Commit(_, d) => self.import_commit(d, sender, signature), + Message::AdvanceRound(_) => self.import_advance_round(sender), + } + } + + fn import_proposal( + &mut self, + proposal: C, + sender: V, + signature: S, + ) { + if sender != self.round_proposer || self.proposal.is_some() { return } + + self.proposal = Some(proposal.clone()); + self.state = State::Proposed(proposal); + } + + fn import_prepare( + &mut self, + candidate: D, + sender: V, + signature: S, + ) { + // ignore any subsequent prepares by the same sender. + // TODO: if digest is different, that's misbehavior. + let prepared_for = if let Entry::Vacant(vacant) = self.prepares.entry(sender) { + vacant.insert((candidate.clone(), signature)); + let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0)); + count.0 += 1; + + if count.0 == self.max_faulty * 2 + 1 { + Some(candidate) + } else { + None + } + } else { + None + }; + + // only allow transition to prepare from begin or proposed state. + let valid_transition = match self.state { + State::Begin | State::Proposed(_) => true, + _ => false, + }; + + if let (true, Some(prepared_for)) = (valid_transition, prepared_for) { + let signatures = self.prepares + .values() + .filter(|&&(ref d, _)| d == &prepared_for) + .map(|&(_, ref s)| s.clone()) + .collect(); + + self.state = State::Prepared(PrepareJustification { + round_number: self.round_number, + digest: prepared_for, + signatures: signatures, + }); + } + } + + fn import_commit( + &mut self, + candidate: D, + sender: V, + signature: S, + ) { + // ignore any subsequent commits by the same sender. + // TODO: if digest is different, that's misbehavior. + let committed_for = if let Entry::Vacant(vacant) = self.commits.entry(sender) { + vacant.insert((candidate.clone(), signature)); + let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0)); + count.1 += 1; + + if count.1 == self.max_faulty * 2 + 1 { + Some(candidate) + } else { + None + } + } else { + None + }; + + // transition to concluded state always valid. + // only weird case is if the prior state was "advanced", + // but technically it's the same behavior as if the order of receiving + // the last "advance round" and "commit" messages were reversed. + if let Some(committed_for) = committed_for { + let signatures = self.commits + .values() + .filter(|&&(ref d, _)| d == &committed_for) + .map(|&(_, ref s)| s.clone()) + .collect(); + + self.state = State::Concluded(Justification { + round_number: self.round_number, + digest: committed_for, + signatures: signatures, + }); + } + } + + fn import_advance_round( + &mut self, + sender: V, + ) { + self.advance_round.insert(sender); + + if self.advance_round.len() != self.max_faulty * 2 + 1 { return } + + // allow transition to new round only if we haven't produced a justification + // yet. + self.state = match ::std::mem::replace(&mut self.state, State::Begin) { + State::Concluded(j) => State::Concluded(j), + State::Prepared(j) => State::Advanced(Some(j)), + State::Advanced(j) => State::Advanced(j), + State::Begin | State::Proposed(_) => State::Advanced(None), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Clone, PartialEq, Eq, Debug)] + pub struct Candidate(usize); + + #[derive(Hash, PartialEq, Eq, Clone, Debug)] + pub struct Digest(usize); + + #[derive(Hash, PartialEq, Eq, Debug)] + pub struct ValidatorId(usize); + + #[derive(PartialEq, Eq, Clone, Debug)] + pub struct Signature(usize, usize); + + #[test] + fn justification_checks_out() { + let mut justification = Justification { + round_number: 2, + digest: Digest(600), + signatures: (0..10).map(|i| Signature(600, i)).collect(), + }; + + let check_message = |r, d: &Digest, s: &Signature| { + if r == 2 && d.0 == 600 && s.0 == 600 { + Some(ValidatorId(s.1)) + } else { + None + } + }; + + assert!(justification.check(3, &check_message)); + assert!(!justification.check(5, &check_message)); + + { + // one bad signature is enough to spoil it. + justification.signatures.push(Signature(1001, 255)); + assert!(!justification.check(3, &check_message)); + + justification.signatures.pop(); + } + // duplicates not allowed. + justification.signatures.extend((0..10).map(|i| Signature(600, i))); + assert!(!justification.check(3, &check_message)); + } + + #[test] + fn accepts_proposal_from_proposer_only() { + let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 3, ValidatorId(8)); + assert_eq!(accumulator.state(), &State::Begin); + + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(5), + signature: Signature(999, 5), + message: Message::Propose(1, Candidate(999)), + }); + + assert_eq!(accumulator.state(), &State::Begin); + + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(8), + signature: Signature(999, 8), + message: Message::Propose(1, Candidate(999)), + }); + + assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); + } + + #[test] + fn reaches_prepare_phase() { + let mut accumulator = Accumulator::new(1, 3, ValidatorId(8)); + assert_eq!(accumulator.state(), &State::Begin); + + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(8), + signature: Signature(999, 8), + message: Message::Propose(1, Candidate(999)), + }); + + assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); + + for i in 0..6 { + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(i), + signature: Signature(999, i), + message: Message::Prepare(1, Digest(999)), + }); + + assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); + } + + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(7), + signature: Signature(999, 7), + message: Message::Prepare(1, Digest(999)), + }); + + match accumulator.state() { + &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + } + + #[test] + fn prepare_to_commit() { + let mut accumulator = Accumulator::new(1, 3, ValidatorId(8)); + assert_eq!(accumulator.state(), &State::Begin); + + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(8), + signature: Signature(999, 8), + message: Message::Propose(1, Candidate(999)), + }); + + assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); + + for i in 0..6 { + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(i), + signature: Signature(999, i), + message: Message::Prepare(1, Digest(999)), + }); + + assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); + } + + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(7), + signature: Signature(999, 7), + message: Message::Prepare(1, Digest(999)), + }); + + match accumulator.state() { + &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + + for i in 0..6 { + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(i), + signature: Signature(999, i), + message: Message::Commit(1, Digest(999)), + }); + + match accumulator.state() { + &State::Prepared(_) => {}, + s => panic!("wrong state: {:?}", s), + } + } + + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(7), + signature: Signature(999, 7), + message: Message::Commit(1, Digest(999)), + }); + + match accumulator.state() { + &State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + } + + #[test] + fn prepare_to_advance() { + let mut accumulator = Accumulator::new(1, 3, ValidatorId(8)); + assert_eq!(accumulator.state(), &State::Begin); + + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(8), + signature: Signature(999, 8), + message: Message::Propose(1, Candidate(999)), + }); + + assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); + + for i in 0..7 { + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(i), + signature: Signature(999, i), + message: Message::Prepare(1, Digest(999)), + }); + } + + match accumulator.state() { + &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + + for i in 0..6 { + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(i), + signature: Signature(999, i), + message: Message::AdvanceRound(1), + }); + + match accumulator.state() { + &State::Prepared(_) => {}, + s => panic!("wrong state: {:?}", s), + } + } + + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(7), + signature: Signature(999, 7), + message: Message::AdvanceRound(1), + }); + + match accumulator.state() { + &State::Advanced(Some(_)) => {}, + s => panic!("wrong state: {:?}", s), + } + } + + #[test] + fn conclude_different_than_proposed() { + let mut accumulator = Accumulator::::new(1, 3, ValidatorId(8)); + assert_eq!(accumulator.state(), &State::Begin); + + for i in 0..7 { + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(i), + signature: Signature(999, i), + message: Message::Prepare(1, Digest(999)), + }); + } + + match accumulator.state() { + &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + + for i in 0..7 { + accumulator.import_message(LocalizedMessage { + sender: ValidatorId(i), + signature: Signature(999, i), + message: Message::Commit(1, Digest(999)), + }); + } + + match accumulator.state() { + &State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + } +} diff --git a/substrate/candidate-agreement/src/bft/mod.rs b/substrate/candidate-agreement/src/bft/mod.rs new file mode 100644 index 0000000000..da4b6fe91d --- /dev/null +++ b/substrate/candidate-agreement/src/bft/mod.rs @@ -0,0 +1,100 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! BFT Agreement based on a rotating proposer in different rounds. + +use std::collections::{HashMap, HashSet}; +use std::hash::Hash; + +use futures::{IntoFuture, Future, Stream, Sink}; +use futures::future::{ok, loop_fn, Loop}; + +mod accumulator; + +/// Messages over the proposal. +/// Each message carries an associated round number. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Message { + /// Send a full proposal. + Propose(usize, P), + /// Prepare to vote for proposal with digest D. + Prepare(usize, D), + /// Commit to proposal with digest D.. + Commit(usize, D), + /// Propose advancement to a new round. + AdvanceRound(usize), +} + +impl Message { + fn round_number(&self) -> usize { + match *self { + Message::Propose(round, _) => round, + Message::Prepare(round, _) => round, + Message::Commit(round, _) => round, + Message::AdvanceRound(round) => round, + } + } +} + +/// A localized message, including the sender. +#[derive(Debug, Clone)] +pub struct LocalizedMessage { + /// The message received. + pub message: Message, + /// The sender of the message + pub sender: V, + /// The signature of the message. + pub signature: S, +} + +/// The agreed-upon data. +#[derive(Debug, Clone)] +pub struct Agreed { + /// The agreed-upon proposal. + pub proposal: P, + /// The justification for the proposal. + pub justification: Vec>, +} + +/// Parameters to agreement. +pub struct Params< + Validator, + SignLocal, + Timeout, + CanInclude, + MessagesIn, + MessagesOut, +> { + /// The ID of the current view's primary. + pub primary: Validator, + /// The local ID. + pub local_id: Validator, + /// A closure for signing local messages. + pub sign_local: SignLocal, + /// A timeout that fires when the view change should begin. + pub begin_view_change: Timeout, + /// A function for checking if a proposal can be voted for. + pub can_include: CanInclude, + /// The input stream. Should never conclude, and should yield only messages + /// sent by validators and which have been authenticated properly. + pub input: MessagesIn, + /// The output message sink. This assumes that messages will eventually + /// be delivered to all honest participants, either by repropagation, gossip, + /// or some reliable broadcast mechanism. + pub output: MessagesOut, + /// The maximum number of faulty nodes. + pub max_faulty: usize, +} diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs index c072be3ee0..68025c3708 100644 --- a/substrate/candidate-agreement/src/table.rs +++ b/substrate/candidate-agreement/src/table.rs @@ -32,6 +32,49 @@ use std::collections::hash_map::{HashMap, Entry}; use std::hash::Hash; use std::fmt::Debug; +/// Context for the statement table. +pub trait Context { + /// A validator ID + type ValidatorId: Hash + Eq + Clone + Debug; + /// The digest (hash or other unique attribute) of a candidate. + type Digest: Hash + Eq + Clone + Debug; + /// Candidate type. + type Candidate: Ord + Eq + Clone + Debug; + /// The group ID type + type GroupId: Hash + Ord + Eq + Clone + Debug; + /// A signature type. + type Signature: Eq + Clone + Debug; + + /// get the digest of a candidate. + fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest; + + /// get the group of a candidate. + fn candidate_group(&self, candidate: &Self::Candidate) -> Self::GroupId; + + /// Whether a validator is a member of a group. + /// Members are meant to submit candidates and vote on validity. + fn is_member_of(&self, validator: &Self::ValidatorId, group: &Self::GroupId) -> bool; + + /// Whether a validator is an availability guarantor of a group. + /// Guarantors are meant to vote on availability for candidates submitted + /// in a group. + fn is_availability_guarantor_of( + &self, + validator: &Self::ValidatorId, + group: &Self::GroupId, + ) -> bool; + + // recover signer of statement and ensure the signature corresponds to the + // statement. + fn statement_signer( + &self, + statement: &SignedStatement, + ) -> Option; + + // requisite number of votes for validity and availability respectively from a group. + fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize); +} + /// Statements circulated among peers. #[derive(PartialEq, Eq, Debug)] pub enum Statement { @@ -80,49 +123,6 @@ enum StatementTrace { Available(V, D), } -/// Context for the statement table. -pub trait Context { - /// A validator ID - type ValidatorId: Hash + Eq + Clone + Debug; - /// The digest (hash or other unique attribute) of a candidate. - type Digest: Hash + Eq + Clone + Debug; - /// Candidate type. - type Candidate: Ord + Eq + Clone + Debug; - /// The group ID type - type GroupId: Hash + Ord + Eq + Clone + Debug; - /// A signature type. - type Signature: Eq + Clone + Debug; - - /// get the digest of a candidate. - fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest; - - /// get the group of a candidate. - fn candidate_group(&self, candidate: &Self::Candidate) -> Self::GroupId; - - /// Whether a validator is a member of a group. - /// Members are meant to submit candidates and vote on validity. - fn is_member_of(&self, validator: &Self::ValidatorId, group: &Self::GroupId) -> bool; - - /// Whether a validator is an availability guarantor of a group. - /// Guarantors are meant to vote on availability for candidates submitted - /// in a group. - fn is_availability_guarantor_of( - &self, - validator: &Self::ValidatorId, - group: &Self::GroupId, - ) -> bool; - - // recover signer of statement and ensure the signature corresponds to the - // statement. - fn statement_signer( - &self, - statement: &SignedStatement, - ) -> Option; - - // requisite number of votes for validity and availability respectively from a group. - fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize); -} - /// Misbehavior: voting more than one way on candidate validity. /// /// Since there are three possible ways to vote, a double vote is possible in