// Copyright 2017 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! BFT Agreement based on a rotating proposer in different rounds. mod accumulator; #[cfg(test)] mod tests; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::hash::Hash; use futures::{future, Future, Stream, Sink, Poll, Async, AsyncSink}; use self::accumulator::State; pub use self::accumulator::{Accumulator, Justification, PrepareJustification, UncheckedJustification}; /// Messages over the proposal. /// Each message carries an associated round number. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Message { /// Send a full proposal. Propose(usize, C), /// Prepare to vote for proposal with digest D. Prepare(usize, D), /// Commit to proposal with digest D.. Commit(usize, D), /// Propose advancement to a new round. AdvanceRound(usize), } impl Message { fn round_number(&self) -> usize { match *self { Message::Propose(round, _) => round, Message::Prepare(round, _) => round, Message::Commit(round, _) => round, Message::AdvanceRound(round) => round, } } } /// A localized message, including the sender. #[derive(Debug, Clone)] pub struct LocalizedMessage { /// The message received. pub message: Message, /// The sender of the message pub sender: V, /// The signature of the message. pub signature: S, } /// Context necessary for agreement. /// /// Provides necessary types for protocol messages, and functions necessary for a /// participant to evaluate and create those messages. pub trait Context { /// Candidate proposed. type Candidate: Debug + Eq + Clone; /// Candidate digest. type Digest: Debug + Hash + Eq + Clone; /// Authority ID. type AuthorityId: Debug + Hash + Eq + Clone; /// Signature. type Signature: Debug + Eq + Clone; /// A future that resolves when a round timeout is concluded. type RoundTimeout: Future; /// A future that resolves when a proposal is ready. type CreateProposal: Future; /// Get the local authority ID. fn local_id(&self) -> Self::AuthorityId; /// Get the best proposal. fn proposal(&self) -> Self::CreateProposal; /// Get the digest of a candidate. fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest; /// Sign a message using the local authority ID. fn sign_local(&self, message: Message) -> LocalizedMessage; /// Get the proposer for a given round of consensus. fn round_proposer(&self, round: usize) -> Self::AuthorityId; /// Whether the candidate is valid. fn candidate_valid(&self, candidate: &Self::Candidate) -> bool; /// Create a round timeout. The context will determine the correct timeout /// length, and create a future that will resolve when the timeout is /// concluded. fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout; } /// Communication that can occur between participants in consensus. #[derive(Debug, Clone)] pub enum Communication { /// A consensus message (proposal or vote) Consensus(LocalizedMessage), /// Auxiliary communication (just proof-of-lock for now). Auxiliary(PrepareJustification), } /// Type alias for a localized message using only type parameters from `Context`. // TODO: actual type alias when it's no longer a warning. pub struct ContextCommunication(pub Communication); impl Clone for ContextCommunication where LocalizedMessage: Clone, PrepareJustification: Clone, { fn clone(&self) -> Self { ContextCommunication(self.0.clone()) } } #[derive(Debug)] struct Sending { items: VecDeque, flushing: bool, } impl Sending { fn with_capacity(n: usize) -> Self { Sending { items: VecDeque::with_capacity(n), flushing: false, } } fn push(&mut self, item: T) { self.items.push_back(item); self.flushing = false; } // process all the sends into the sink. fn process_all>(&mut self, sink: &mut S) -> Poll<(), S::SinkError> { while let Some(item) = self.items.pop_front() { match sink.start_send(item) { Err(e) => return Err(e), Ok(AsyncSink::NotReady(item)) => { self.items.push_front(item); return Ok(Async::NotReady); } Ok(AsyncSink::Ready) => { self.flushing = true; } } } if self.flushing { match sink.poll_complete() { Err(e) => return Err(e), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(())) => { self.flushing = false; } } } Ok(Async::Ready(())) } } /// Error returned when the input stream concludes. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct InputStreamConcluded; impl ::std::fmt::Display for InputStreamConcluded { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { write!(f, "{}", ::std::error::Error::description(self)) } } impl ::std::error::Error for InputStreamConcluded { fn description(&self) -> &str { "input stream of messages concluded prematurely" } } // get the "full BFT" threshold based on an amount of nodes and // a maximum faulty. if nodes == 3f + 1, then threshold == 2f + 1. fn bft_threshold(nodes: usize, max_faulty: usize) -> usize { nodes - max_faulty } /// Committed successfully. #[derive(Debug, Clone)] pub struct Committed { /// The candidate committed for. This will be unknown if /// we never witnessed the proposal of the last round. pub candidate: Option, /// A justification for the candidate. pub justification: Justification, } struct Locked { justification: PrepareJustification, } impl Locked { fn digest(&self) -> &D { &self.justification.digest } } // the state of the local node during the current state of consensus. // // behavior is different when locked on a proposal. #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum LocalState { Start, Proposed, Prepared, Committed, VoteAdvance, } // This structure manages a single "view" of consensus. // // We maintain two message accumulators: one for the round we are currently in, // and one for a future round. // // We advance the round accumulators when one of two conditions is met: // - we witness consensus of advancement in the current round. in this case we // advance by one. // - a higher threshold-prepare is broadcast to us. in this case we can // advance to the round of the threshold-prepare. this is an indication // that we have experienced severe asynchrony/clock drift with the remainder // of the other authorities, and it is unlikely that we can assist in // consensus meaningfully. nevertheless we make an attempt. struct Strategy { nodes: usize, max_faulty: usize, fetching_proposal: Option, round_timeout: future::Fuse, local_state: LocalState, locked: Option>, notable_candidates: HashMap, current_accumulator: Accumulator, future_accumulator: Accumulator, local_id: C::AuthorityId, } impl Strategy { fn create(context: &C, nodes: usize, max_faulty: usize) -> Self { let timeout = context.begin_round_timeout(0); let threshold = bft_threshold(nodes, max_faulty); let current_accumulator = Accumulator::new( 0, threshold, context.round_proposer(0), ); let future_accumulator = Accumulator::new( 1, threshold, context.round_proposer(1), ); Strategy { nodes, max_faulty, current_accumulator, future_accumulator, fetching_proposal: None, local_state: LocalState::Start, locked: None, notable_candidates: HashMap::new(), round_timeout: timeout.fuse(), local_id: context.local_id(), } } fn import_message( &mut self, msg: LocalizedMessage ) { let round_number = msg.message.round_number(); if round_number == self.current_accumulator.round_number() { self.current_accumulator.import_message(msg); } else if round_number == self.future_accumulator.round_number() { self.future_accumulator.import_message(msg); } } fn import_lock_proof( &mut self, context: &C, justification: PrepareJustification, ) { // TODO: find a way to avoid processing of the signatures if the sender is // not the primary or the round number is low. if justification.round_number > self.current_accumulator.round_number() { // jump ahead to the prior round as this is an indication of a supermajority // good nodes being at least on that round. self.advance_to_round(context, justification.round_number); } let lock_to_new = self.locked.as_ref() .map_or(true, |l| l.justification.round_number < justification.round_number); if lock_to_new { self.locked = Some(Locked { justification }) } } // poll the strategy: this will queue messages to be sent and advance // rounds if necessary. // // only call within the context of a `Task`. fn poll(&mut self, context: &C, sending: &mut Sending>) -> Poll, E> where C::RoundTimeout: Future, C::CreateProposal: Future, { let mut last_watermark = ( self.current_accumulator.round_number(), self.local_state ); // poll until either completion or state doesn't change. loop { match self.poll_once(context, sending)? { Async::Ready(x) => return Ok(Async::Ready(x)), Async::NotReady => { let new_watermark = ( self.current_accumulator.round_number(), self.local_state ); if new_watermark == last_watermark { return Ok(Async::NotReady) } else { last_watermark = new_watermark; } } } } } // perform one round of polling: attempt to broadcast messages and change the state. // if the round or internal round-state changes, this should be called again. fn poll_once(&mut self, context: &C, sending: &mut Sending>) -> Poll, E> where C::RoundTimeout: Future, C::CreateProposal: Future, { self.propose(context, sending)?; self.prepare(context, sending); self.commit(context, sending); self.vote_advance(context, sending)?; let advance = match self.current_accumulator.state() { &State::Advanced(ref p_just) => { // lock to any witnessed prepare justification. if let Some(p_just) = p_just.as_ref() { self.locked = Some(Locked { justification: p_just.clone() }); } let round_number = self.current_accumulator.round_number(); Some(round_number + 1) } &State::Committed(ref just) => { // fetch the agreed-upon candidate: // - we may not have received the proposal in the first place // - there is no guarantee that the proposal we got was agreed upon // (can happen if faulty primary) // - look in the candidates of prior rounds just in case. let candidate = self.current_accumulator .proposal() .and_then(|c| if context.candidate_digest(c) == just.digest { Some(c.clone()) } else { None }) .or_else(|| self.notable_candidates.get(&just.digest).cloned()); let committed = Committed { candidate, justification: just.clone() }; return Ok(Async::Ready(committed)) } _ => None, }; if let Some(new_round) = advance { self.advance_to_round(context, new_round); } Ok(Async::NotReady) } fn propose(&mut self, context: &C, sending: &mut Sending>) -> Result<(), ::Error> { if let LocalState::Start = self.local_state { let mut propose = false; if let &State::Begin = self.current_accumulator.state() { let round_number = self.current_accumulator.round_number(); let primary = context.round_proposer(round_number); propose = self.local_id == primary; }; if !propose { return Ok(()) } // obtain the proposal to broadcast. let proposal = match self.locked { Some(ref locked) => { // TODO: it's possible but very unlikely that we don't have the // corresponding proposal for what we are locked to. // // since this is an edge case on an edge case, it is fine // to eat the round timeout for now, but it can be optimized by // broadcasting an advance vote. self.notable_candidates.get(locked.digest()).cloned() } None => { let res = self.fetching_proposal .get_or_insert_with(|| context.proposal()) .poll()?; match res { Async::Ready(p) => Some(p), Async::NotReady => None, } } }; if let Some(proposal) = proposal { self.fetching_proposal = None; let message = Message::Propose( self.current_accumulator.round_number(), proposal ); self.import_and_send_message(message, context, sending); // broadcast the justification along with the proposal if we are locked. if let Some(ref locked) = self.locked { sending.push( ContextCommunication(Communication::Auxiliary(locked.justification.clone())) ); } self.local_state = LocalState::Proposed; } } Ok(()) } fn prepare(&mut self, context: &C, sending: &mut Sending>) { // prepare only upon start or having proposed. match self.local_state { LocalState::Start | LocalState::Proposed => {}, _ => return }; let mut prepare_for = None; // we can't prepare until something was proposed. if let &State::Proposed(ref candidate) = self.current_accumulator.state() { let digest = context.candidate_digest(candidate); // vote to prepare only if we believe the candidate to be valid and // we are not locked on some other candidate. match self.locked { Some(ref locked) if locked.digest() != &digest => {} Some(_) => { // don't check validity if we are locked. // this is necessary to preserve the liveness property. prepare_for = Some(digest); } None => if context.candidate_valid(candidate) { prepare_for = Some(digest); } } } if let Some(digest) = prepare_for { let message = Message::Prepare( self.current_accumulator.round_number(), digest ); self.import_and_send_message(message, context, sending); self.local_state = LocalState::Prepared; } } fn commit(&mut self, context: &C, sending: &mut Sending>) { // commit only if we haven't voted to advance or committed already match self.local_state { LocalState::Committed | LocalState::VoteAdvance => return, _ => {} } let mut commit_for = None; if let &State::Prepared(ref p_just) = self.current_accumulator.state() { // we are now locked to this prepare justification. let digest = p_just.digest.clone(); self.locked = Some(Locked { justification: p_just.clone() }); commit_for = Some(digest); } if let Some(digest) = commit_for { let message = Message::Commit( self.current_accumulator.round_number(), digest ); self.import_and_send_message(message, context, sending); self.local_state = LocalState::Committed; } } fn vote_advance(&mut self, context: &C, sending: &mut Sending>) -> Result<(), ::Error> { // we can vote for advancement under all circumstances unless we have already. if let LocalState::VoteAdvance = self.local_state { return Ok(()) } // if we got f + 1 advance votes, or the timeout has fired, and we haven't // sent an AdvanceRound message yet, do so. let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty; if let Async::Ready(_) = self.round_timeout.poll()? { attempt_advance = true; } if attempt_advance { let message = Message::AdvanceRound( self.current_accumulator.round_number(), ); self.import_and_send_message(message, context, sending); self.local_state = LocalState::VoteAdvance; } Ok(()) } fn advance_to_round(&mut self, context: &C, round: usize) { assert!(round > self.current_accumulator.round_number()); let threshold = self.nodes - self.max_faulty; self.fetching_proposal = None; self.round_timeout = context.begin_round_timeout(round).fuse(); self.local_state = LocalState::Start; let new_future = Accumulator::new( round + 1, threshold, context.round_proposer(round + 1), ); // when advancing from a round, store away the witnessed proposal. // // if we or other participants end up locked on that candidate, // we will have it. if let Some(proposal) = self.current_accumulator.proposal() { let digest = context.candidate_digest(proposal); self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone()); } // special case when advancing by a single round. if self.future_accumulator.round_number() == round { self.current_accumulator = ::std::mem::replace(&mut self.future_accumulator, new_future); } else { self.future_accumulator = new_future; self.current_accumulator = Accumulator::new( round, threshold, context.round_proposer(round), ); } } fn import_and_send_message( &mut self, message: Message, context: &C, sending: &mut Sending> ) { let signed_message = context.sign_local(message); self.import_message(signed_message.clone()); sending.push(ContextCommunication(Communication::Consensus(signed_message))); } } /// Future that resolves upon BFT agreement for a candidate. #[must_use = "futures do nothing unless polled"] pub struct Agreement { context: C, input: I, output: O, concluded: Option>, sending: Sending>, strategy: Strategy, } impl Future for Agreement where C: Context, C::RoundTimeout: Future, C::CreateProposal: Future, I: Stream,Error=E>, O: Sink,SinkError=E>, E: From, { type Item = Committed; type Error = E; fn poll(&mut self) -> Poll { // even if we've observed the conclusion, wait until all // pending outgoing messages are flushed. if let Some(just) = self.concluded.take() { return Ok(match self.sending.process_all(&mut self.output)? { Async::Ready(()) => Async::Ready(just), Async::NotReady => { self.concluded = Some(just); Async::NotReady } }) } loop { let message = match self.input.poll()? { Async::Ready(msg) => msg.ok_or(InputStreamConcluded)?, Async::NotReady => break, }; match message.0 { Communication::Consensus(message) => self.strategy.import_message(message), Communication::Auxiliary(lock_proof) => self.strategy.import_lock_proof(&self.context, lock_proof), } } // try to process timeouts. let state_machine_res = self.strategy.poll(&self.context, &mut self.sending)?; // make progress on flushing all pending messages. let _ = self.sending.process_all(&mut self.output)?; match state_machine_res { Async::Ready(just) => { self.concluded = Some(just); self.poll() } Async::NotReady => { Ok(Async::NotReady) } } } } /// Attempt to reach BFT agreement on a candidate. /// /// `nodes` is the number of nodes in the system. /// `max_faulty` is the maximum number of faulty nodes. Should be less than /// 1/3 of `nodes`, otherwise agreement may never be reached. /// /// The input stream should never logically conclude. The logic here assumes /// that messages flushed to the output stream will eventually reach other nodes. /// /// Note that it is possible to witness agreement being reached without ever /// seeing the candidate. Any candidates seen will be checked for validity. /// /// Although technically the agreement will always complete (given the eventual /// delivery of messages), in practice it is possible for this future to /// conclude without having witnessed the conclusion. /// In general, this future should be pre-empted by the import of a justification /// set for this block height. pub fn agree(context: C, nodes: usize, max_faulty: usize, input: I, output: O) -> Agreement where C: Context, C::RoundTimeout: Future, C::CreateProposal: Future, I: Stream,Error=E>, O: Sink,SinkError=E>, E: From, { let strategy = Strategy::create(&context, nodes, max_faulty); Agreement { context, input, output, concluded: None, sending: Sending::with_capacity(4), strategy: strategy, } }