diff --git a/polkadot/candidate-agreement/Cargo.toml b/polkadot/candidate-agreement/Cargo.toml deleted file mode 100644 index 8aa2d0001b..0000000000 --- a/polkadot/candidate-agreement/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "polkadot-candidate-agreement" -version = "0.1.0" -authors = ["Parity Technologies "] - -[dependencies] -futures = "0.1.17" -parking_lot = "0.4" -tokio-timer = "0.1.2" diff --git a/polkadot/candidate-agreement/src/bft/accumulator.rs b/polkadot/candidate-agreement/src/bft/accumulator.rs deleted file mode 100644 index ab035737fb..0000000000 --- a/polkadot/candidate-agreement/src/bft/accumulator.rs +++ /dev/null @@ -1,602 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Message accumulator for each round of BFT consensus. - -use std::collections::{HashMap, HashSet}; -use std::collections::hash_map::Entry; -use std::hash::Hash; - -use super::{Message, LocalizedMessage}; - -/// Justification for some state at a given round. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct UncheckedJustification { - /// The round. - pub round_number: usize, - /// The digest prepared for. - pub digest: D, - /// Signatures for the prepare messages. - pub signatures: Vec, -} - -impl UncheckedJustification { - /// Fails if there are duplicate signatures or invalid. - /// - /// Provide a closure for checking whether the signature is valid on a - /// digest. - /// - /// The closure should returns a checked justification iff the round number, digest, and signature - /// represent a valid message and the signer was authorized to issue - /// it. - /// - /// The `check_message` closure may vary based on context. - pub fn check(self, threshold: usize, mut check_message: F) - -> Result, Self> - where - F: FnMut(usize, &D, &S) -> Option, - V: Hash + Eq, - { - let checks_out = { - let mut checks_out = || { - let mut voted = HashSet::new(); - - for signature in &self.signatures { - match check_message(self.round_number, &self.digest, signature) { - None => return false, - Some(v) => { - if !voted.insert(v) { - return false; - } - } - } - } - - voted.len() >= threshold - }; - - checks_out() - }; - - if checks_out { - Ok(Justification(self)) - } else { - Err(self) - } - } -} - -/// A checked justification. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Justification(UncheckedJustification); - -impl ::std::ops::Deref for Justification { - type Target = UncheckedJustification; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// Type alias to represent a justification specifically for a prepare. -pub type PrepareJustification = Justification; - -/// The round's state, based on imported messages. -#[derive(PartialEq, Eq, Debug)] -pub enum State { - /// No proposal yet. - Begin, - /// Proposal received. - Proposed(Candidate), - /// Seen n - f prepares for this digest. - Prepared(PrepareJustification), - /// Seen n - f commits for a digest. - Committed(Justification), - /// Seen n - f round-advancement messages. - Advanced(Option>), -} - -#[derive(Debug, Default)] -struct VoteCounts { - prepared: usize, - committed: usize, -} - -/// Accumulates messages for a given round of BFT consensus. -/// -/// This isn't tied to the "view" of a single authority. It -/// keeps accurate track of the state of the BFT consensus based -/// on all messages imported. -#[derive(Debug)] -pub struct Accumulator - where - Candidate: Eq + Clone, - Digest: Hash + Eq + Clone, - AuthorityId: Hash + Eq, - Signature: Eq + Clone, -{ - round_number: usize, - threshold: usize, - round_proposer: AuthorityId, - proposal: Option, - prepares: HashMap, - commits: HashMap, - vote_counts: HashMap, - advance_round: HashSet, - state: State, -} - -impl Accumulator - where - Candidate: Eq + Clone, - Digest: Hash + Eq + Clone, - AuthorityId: Hash + Eq, - Signature: Eq + Clone, -{ - /// Create a new state accumulator. - pub fn new(round_number: usize, threshold: usize, round_proposer: AuthorityId) -> Self { - Accumulator { - round_number, - threshold, - round_proposer, - proposal: None, - prepares: HashMap::new(), - commits: HashMap::new(), - vote_counts: HashMap::new(), - advance_round: HashSet::new(), - state: State::Begin, - } - } - - /// How advance votes we have seen. - pub fn advance_votes(&self) -> usize { - self.advance_round.len() - } - - /// Get the round number. - pub fn round_number(&self) -> usize { - self.round_number.clone() - } - - pub fn proposal(&self) -> Option<&Candidate> { - self.proposal.as_ref() - } - - /// Inspect the current consensus state. - pub fn state(&self) -> &State { - &self.state - } - - /// Import a message. Importing duplicates is fine, but the signature - /// and authorization should have already been checked. - pub fn import_message( - &mut self, - message: LocalizedMessage, - ) - { - // message from different round. - if message.message.round_number() != self.round_number { - return; - } - - let (sender, signature) = (message.sender, message.signature); - - match message.message { - Message::Propose(_, p) => self.import_proposal(p, sender), - Message::Prepare(_, d) => self.import_prepare(d, sender, signature), - Message::Commit(_, d) => self.import_commit(d, sender, signature), - Message::AdvanceRound(_) => self.import_advance_round(sender), - } - } - - fn import_proposal( - &mut self, - proposal: Candidate, - sender: AuthorityId, - ) { - if sender != self.round_proposer || self.proposal.is_some() { return } - - self.proposal = Some(proposal.clone()); - self.state = State::Proposed(proposal); - } - - fn import_prepare( - &mut self, - digest: Digest, - sender: AuthorityId, - signature: Signature, - ) { - // ignore any subsequent prepares by the same sender. - // TODO: if digest is different, that's misbehavior. - let threshold_prepared = if let Entry::Vacant(vacant) = self.prepares.entry(sender) { - vacant.insert((digest.clone(), signature)); - let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default); - count.prepared += 1; - - if count.prepared >= self.threshold { - Some(digest) - } else { - None - } - } else { - None - }; - - // only allow transition to prepare from begin or proposed state. - let valid_transition = match self.state { - State::Begin | State::Proposed(_) => true, - _ => false, - }; - - if let (true, Some(threshold_prepared)) = (valid_transition, threshold_prepared) { - let signatures = self.prepares - .values() - .filter(|&&(ref d, _)| d == &threshold_prepared) - .map(|&(_, ref s)| s.clone()) - .collect(); - - self.state = State::Prepared(Justification(UncheckedJustification { - round_number: self.round_number, - digest: threshold_prepared, - signatures: signatures, - })); - } - } - - fn import_commit( - &mut self, - digest: Digest, - sender: AuthorityId, - signature: Signature, - ) { - // ignore any subsequent commits by the same sender. - // TODO: if digest is different, that's misbehavior. - let threshold_committed = if let Entry::Vacant(vacant) = self.commits.entry(sender) { - vacant.insert((digest.clone(), signature)); - let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default); - count.committed += 1; - - if count.committed >= self.threshold { - Some(digest) - } else { - None - } - } else { - None - }; - - // transition to concluded state always valid. - // only weird case is if the prior state was "advanced", - // but technically it's the same behavior as if the order of receiving - // the last "advance round" and "commit" messages were reversed. - if let Some(threshold_committed) = threshold_committed { - let signatures = self.commits - .values() - .filter(|&&(ref d, _)| d == &threshold_committed) - .map(|&(_, ref s)| s.clone()) - .collect(); - - self.state = State::Committed(Justification(UncheckedJustification { - round_number: self.round_number, - digest: threshold_committed, - signatures: signatures, - })); - } - } - - fn import_advance_round( - &mut self, - sender: AuthorityId, - ) { - self.advance_round.insert(sender); - - if self.advance_round.len() < self.threshold { return } - - // allow transition to new round only if we haven't produced a justification - // yet. - self.state = match ::std::mem::replace(&mut self.state, State::Begin) { - State::Committed(j) => State::Committed(j), - State::Prepared(j) => State::Advanced(Some(j)), - State::Advanced(j) => State::Advanced(j), - State::Begin | State::Proposed(_) => State::Advanced(None), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[derive(Clone, PartialEq, Eq, Debug)] - pub struct Candidate(usize); - - #[derive(Hash, PartialEq, Eq, Clone, Debug)] - pub struct Digest(usize); - - #[derive(Hash, PartialEq, Eq, Debug)] - pub struct AuthorityId(usize); - - #[derive(PartialEq, Eq, Clone, Debug)] - pub struct Signature(usize, usize); - - #[test] - fn justification_checks_out() { - let mut justification = UncheckedJustification { - round_number: 2, - digest: Digest(600), - signatures: (0..10).map(|i| Signature(600, i)).collect(), - }; - - let check_message = |r, d: &Digest, s: &Signature| { - if r == 2 && d.0 == 600 && s.0 == 600 { - Some(AuthorityId(s.1)) - } else { - None - } - }; - - assert!(justification.clone().check(7, &check_message).is_ok()); - assert!(justification.clone().check(11, &check_message).is_err()); - - { - // one bad signature is enough to spoil it. - justification.signatures.push(Signature(1001, 255)); - assert!(justification.clone().check(7, &check_message).is_err()); - - justification.signatures.pop(); - } - // duplicates not allowed. - justification.signatures.extend((0..10).map(|i| Signature(600, i))); - assert!(justification.clone().check(11, &check_message).is_err()); - } - - #[test] - fn accepts_proposal_from_proposer_only() { - let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(5), - signature: Signature(999, 5), - message: Message::Propose(1, Candidate(999)), - }); - - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(8), - signature: Signature(999, 8), - message: Message::Propose(1, Candidate(999)), - }); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - } - - #[test] - fn reaches_prepare_phase() { - let mut accumulator = Accumulator::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(8), - signature: Signature(999, 8), - message: Message::Propose(1, Candidate(999)), - }); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - - for i in 0..6 { - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(i), - signature: Signature(999, i), - message: Message::Prepare(1, Digest(999)), - }); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - } - - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(7), - signature: Signature(999, 7), - message: Message::Prepare(1, Digest(999)), - }); - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn prepare_to_commit() { - let mut accumulator = Accumulator::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(8), - signature: Signature(999, 8), - message: Message::Propose(1, Candidate(999)), - }); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - - for i in 0..6 { - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(i), - signature: Signature(999, i), - message: Message::Prepare(1, Digest(999)), - }); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - } - - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(7), - signature: Signature(999, 7), - message: Message::Prepare(1, Digest(999)), - }); - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - - for i in 0..6 { - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(i), - signature: Signature(999, i), - message: Message::Commit(1, Digest(999)), - }); - - match accumulator.state() { - &State::Prepared(_) => {}, - s => panic!("wrong state: {:?}", s), - } - } - - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(7), - signature: Signature(999, 7), - message: Message::Commit(1, Digest(999)), - }); - - match accumulator.state() { - &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn prepare_to_advance() { - let mut accumulator = Accumulator::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(8), - signature: Signature(999, 8), - message: Message::Propose(1, Candidate(999)), - }); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - - for i in 0..7 { - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(i), - signature: Signature(999, i), - message: Message::Prepare(1, Digest(999)), - }); - } - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - - for i in 0..6 { - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(i), - signature: Signature(999, i), - message: Message::AdvanceRound(1), - }); - - match accumulator.state() { - &State::Prepared(_) => {}, - s => panic!("wrong state: {:?}", s), - } - } - - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(7), - signature: Signature(999, 7), - message: Message::AdvanceRound(1), - }); - - match accumulator.state() { - &State::Advanced(Some(_)) => {}, - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn conclude_different_than_proposed() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(i), - signature: Signature(999, i), - message: Message::Prepare(1, Digest(999)), - }); - } - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - - for i in 0..7 { - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(i), - signature: Signature(999, i), - message: Message::Commit(1, Digest(999)), - }); - } - - match accumulator.state() { - &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn begin_to_advance() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(i), - signature: Signature(1, i), - message: Message::AdvanceRound(1), - }); - } - - match accumulator.state() { - &State::Advanced(ref j) => assert!(j.is_none()), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn conclude_without_prepare() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedMessage { - sender: AuthorityId(i), - signature: Signature(999, i), - message: Message::Commit(1, Digest(999)), - }); - } - - match accumulator.state() { - &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } -} diff --git a/polkadot/candidate-agreement/src/bft/mod.rs b/polkadot/candidate-agreement/src/bft/mod.rs deleted file mode 100644 index f131e44e1f..0000000000 --- a/polkadot/candidate-agreement/src/bft/mod.rs +++ /dev/null @@ -1,721 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! BFT Agreement based on a rotating proposer in different rounds. - -mod accumulator; - -#[cfg(test)] -mod tests; - -use std::collections::{HashMap, VecDeque}; -use std::fmt::Debug; -use std::hash::Hash; - -use futures::{future, Future, Stream, Sink, Poll, Async, AsyncSink}; - -use self::accumulator::State; - -pub use self::accumulator::{Accumulator, Justification, PrepareJustification, UncheckedJustification}; - -/// Messages over the proposal. -/// Each message carries an associated round number. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Message { - /// Send a full proposal. - Propose(usize, C), - /// Prepare to vote for proposal with digest D. - Prepare(usize, D), - /// Commit to proposal with digest D.. - Commit(usize, D), - /// Propose advancement to a new round. - AdvanceRound(usize), -} - -impl Message { - fn round_number(&self) -> usize { - match *self { - Message::Propose(round, _) => round, - Message::Prepare(round, _) => round, - Message::Commit(round, _) => round, - Message::AdvanceRound(round) => round, - } - } -} - -/// A localized message, including the sender. -#[derive(Debug, Clone)] -pub struct LocalizedMessage { - /// The message received. - pub message: Message, - /// The sender of the message - pub sender: V, - /// The signature of the message. - pub signature: S, -} - -/// Context necessary for agreement. -/// -/// Provides necessary types for protocol messages, and functions necessary for a -/// participant to evaluate and create those messages. -pub trait Context { - /// Candidate proposed. - type Candidate: Debug + Eq + Clone; - /// Candidate digest. - type Digest: Debug + Hash + Eq + Clone; - /// Authority ID. - type AuthorityId: Debug + Hash + Eq + Clone; - /// Signature. - type Signature: Debug + Eq + Clone; - /// A future that resolves when a round timeout is concluded. - type RoundTimeout: Future; - /// A future that resolves when a proposal is ready. - type CreateProposal: Future; - - /// Get the local authority ID. - fn local_id(&self) -> Self::AuthorityId; - - /// Get the best proposal. - fn proposal(&self) -> Self::CreateProposal; - - /// Get the digest of a candidate. - fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest; - - /// Sign a message using the local authority ID. - fn sign_local(&self, message: Message) - -> LocalizedMessage; - - /// Get the proposer for a given round of consensus. - fn round_proposer(&self, round: usize) -> Self::AuthorityId; - - /// Whether the candidate is valid. - fn candidate_valid(&self, candidate: &Self::Candidate) -> bool; - - /// Create a round timeout. The context will determine the correct timeout - /// length, and create a future that will resolve when the timeout is - /// concluded. - fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout; -} - -/// Communication that can occur between participants in consensus. -#[derive(Debug, Clone)] -pub enum Communication { - /// A consensus message (proposal or vote) - Consensus(LocalizedMessage), - /// Auxiliary communication (just proof-of-lock for now). - Auxiliary(PrepareJustification), -} - -/// Type alias for a localized message using only type parameters from `Context`. -// TODO: actual type alias when it's no longer a warning. -pub struct ContextCommunication(pub Communication); - -impl Clone for ContextCommunication - where - LocalizedMessage: Clone, - PrepareJustification: Clone, -{ - fn clone(&self) -> Self { - ContextCommunication(self.0.clone()) - } -} - -#[derive(Debug)] -struct Sending { - items: VecDeque, - flushing: bool, -} - -impl Sending { - fn with_capacity(n: usize) -> Self { - Sending { - items: VecDeque::with_capacity(n), - flushing: false, - } - } - - fn push(&mut self, item: T) { - self.items.push_back(item); - self.flushing = false; - } - - // process all the sends into the sink. - fn process_all>(&mut self, sink: &mut S) -> Poll<(), S::SinkError> { - while let Some(item) = self.items.pop_front() { - match sink.start_send(item) { - Err(e) => return Err(e), - Ok(AsyncSink::NotReady(item)) => { - self.items.push_front(item); - return Ok(Async::NotReady); - } - Ok(AsyncSink::Ready) => { self.flushing = true; } - } - } - - if self.flushing { - match sink.poll_complete() { - Err(e) => return Err(e), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(())) => { self.flushing = false; } - } - } - - Ok(Async::Ready(())) - } -} - -/// Error returned when the input stream concludes. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct InputStreamConcluded; - -impl ::std::fmt::Display for InputStreamConcluded { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(f, "{}", ::std::error::Error::description(self)) - } -} - -impl ::std::error::Error for InputStreamConcluded { - fn description(&self) -> &str { - "input stream of messages concluded prematurely" - } -} - -// get the "full BFT" threshold based on an amount of nodes and -// a maximum faulty. if nodes == 3f + 1, then threshold == 2f + 1. -fn bft_threshold(nodes: usize, max_faulty: usize) -> usize { - nodes - max_faulty -} - -/// Committed successfully. -#[derive(Debug, Clone)] -pub struct Committed { - /// The candidate committed for. This will be unknown if - /// we never witnessed the proposal of the last round. - pub candidate: Option, - /// A justification for the candidate. - pub justification: Justification, -} - -struct Locked { - justification: PrepareJustification, -} - -impl Locked { - fn digest(&self) -> &D { - &self.justification.digest - } -} - -// the state of the local node during the current state of consensus. -// -// behavior is different when locked on a proposal. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum LocalState { - Start, - Proposed, - Prepared, - Committed, - VoteAdvance, -} - -// This structure manages a single "view" of consensus. -// -// We maintain two message accumulators: one for the round we are currently in, -// and one for a future round. -// -// We advance the round accumulators when one of two conditions is met: -// - we witness consensus of advancement in the current round. in this case we -// advance by one. -// - a higher threshold-prepare is broadcast to us. in this case we can -// advance to the round of the threshold-prepare. this is an indication -// that we have experienced severe asynchrony/clock drift with the remainder -// of the other authorities, and it is unlikely that we can assist in -// consensus meaningfully. nevertheless we make an attempt. -struct Strategy { - nodes: usize, - max_faulty: usize, - fetching_proposal: Option, - round_timeout: future::Fuse, - local_state: LocalState, - locked: Option>, - notable_candidates: HashMap, - current_accumulator: Accumulator, - future_accumulator: Accumulator, - local_id: C::AuthorityId, -} - -impl Strategy { - fn create(context: &C, nodes: usize, max_faulty: usize) -> Self { - let timeout = context.begin_round_timeout(0); - let threshold = bft_threshold(nodes, max_faulty); - - let current_accumulator = Accumulator::new( - 0, - threshold, - context.round_proposer(0), - ); - - let future_accumulator = Accumulator::new( - 1, - threshold, - context.round_proposer(1), - ); - - Strategy { - nodes, - max_faulty, - current_accumulator, - future_accumulator, - fetching_proposal: None, - local_state: LocalState::Start, - locked: None, - notable_candidates: HashMap::new(), - round_timeout: timeout.fuse(), - local_id: context.local_id(), - } - } - - fn import_message( - &mut self, - msg: LocalizedMessage - ) { - let round_number = msg.message.round_number(); - - if round_number == self.current_accumulator.round_number() { - self.current_accumulator.import_message(msg); - } else if round_number == self.future_accumulator.round_number() { - self.future_accumulator.import_message(msg); - } - } - - fn import_lock_proof( - &mut self, - context: &C, - justification: PrepareJustification, - ) { - // TODO: find a way to avoid processing of the signatures if the sender is - // not the primary or the round number is low. - if justification.round_number > self.current_accumulator.round_number() { - // jump ahead to the prior round as this is an indication of a supermajority - // good nodes being at least on that round. - self.advance_to_round(context, justification.round_number); - } - - let lock_to_new = self.locked.as_ref() - .map_or(true, |l| l.justification.round_number < justification.round_number); - - if lock_to_new { - self.locked = Some(Locked { justification }) - } - } - - // poll the strategy: this will queue messages to be sent and advance - // rounds if necessary. - // - // only call within the context of a `Task`. - fn poll(&mut self, context: &C, sending: &mut Sending>) - -> Poll, E> - where - C::RoundTimeout: Future, - C::CreateProposal: Future, - { - let mut last_watermark = ( - self.current_accumulator.round_number(), - self.local_state - ); - - // poll until either completion or state doesn't change. - loop { - match self.poll_once(context, sending)? { - Async::Ready(x) => return Ok(Async::Ready(x)), - Async::NotReady => { - let new_watermark = ( - self.current_accumulator.round_number(), - self.local_state - ); - - if new_watermark == last_watermark { - return Ok(Async::NotReady) - } else { - last_watermark = new_watermark; - } - } - } - } - } - - // perform one round of polling: attempt to broadcast messages and change the state. - // if the round or internal round-state changes, this should be called again. - fn poll_once(&mut self, context: &C, sending: &mut Sending>) - -> Poll, E> - where - C::RoundTimeout: Future, - C::CreateProposal: Future, - { - self.propose(context, sending)?; - self.prepare(context, sending); - self.commit(context, sending); - self.vote_advance(context, sending)?; - - let advance = match self.current_accumulator.state() { - &State::Advanced(ref p_just) => { - // lock to any witnessed prepare justification. - if let Some(p_just) = p_just.as_ref() { - self.locked = Some(Locked { justification: p_just.clone() }); - } - - let round_number = self.current_accumulator.round_number(); - Some(round_number + 1) - } - &State::Committed(ref just) => { - // fetch the agreed-upon candidate: - // - we may not have received the proposal in the first place - // - there is no guarantee that the proposal we got was agreed upon - // (can happen if faulty primary) - // - look in the candidates of prior rounds just in case. - let candidate = self.current_accumulator - .proposal() - .and_then(|c| if context.candidate_digest(c) == just.digest { - Some(c.clone()) - } else { - None - }) - .or_else(|| self.notable_candidates.get(&just.digest).cloned()); - - let committed = Committed { - candidate, - justification: just.clone() - }; - - return Ok(Async::Ready(committed)) - } - _ => None, - }; - - if let Some(new_round) = advance { - self.advance_to_round(context, new_round); - } - - Ok(Async::NotReady) - } - - fn propose(&mut self, context: &C, sending: &mut Sending>) - -> Result<(), ::Error> - { - if let LocalState::Start = self.local_state { - let mut propose = false; - if let &State::Begin = self.current_accumulator.state() { - let round_number = self.current_accumulator.round_number(); - let primary = context.round_proposer(round_number); - propose = self.local_id == primary; - }; - - if !propose { return Ok(()) } - - // obtain the proposal to broadcast. - let proposal = match self.locked { - Some(ref locked) => { - // TODO: it's possible but very unlikely that we don't have the - // corresponding proposal for what we are locked to. - // - // since this is an edge case on an edge case, it is fine - // to eat the round timeout for now, but it can be optimized by - // broadcasting an advance vote. - self.notable_candidates.get(locked.digest()).cloned() - } - None => { - let res = self.fetching_proposal - .get_or_insert_with(|| context.proposal()) - .poll()?; - - match res { - Async::Ready(p) => Some(p), - Async::NotReady => None, - } - } - }; - - if let Some(proposal) = proposal { - self.fetching_proposal = None; - - let message = Message::Propose( - self.current_accumulator.round_number(), - proposal - ); - - self.import_and_send_message(message, context, sending); - - // broadcast the justification along with the proposal if we are locked. - if let Some(ref locked) = self.locked { - sending.push( - ContextCommunication(Communication::Auxiliary(locked.justification.clone())) - ); - } - - self.local_state = LocalState::Proposed; - } - } - - Ok(()) - } - - fn prepare(&mut self, context: &C, sending: &mut Sending>) { - // prepare only upon start or having proposed. - match self.local_state { - LocalState::Start | LocalState::Proposed => {}, - _ => return - }; - - let mut prepare_for = None; - - // we can't prepare until something was proposed. - if let &State::Proposed(ref candidate) = self.current_accumulator.state() { - let digest = context.candidate_digest(candidate); - - // vote to prepare only if we believe the candidate to be valid and - // we are not locked on some other candidate. - match self.locked { - Some(ref locked) if locked.digest() != &digest => {} - Some(_) => { - // don't check validity if we are locked. - // this is necessary to preserve the liveness property. - prepare_for = Some(digest); - } - None => if context.candidate_valid(candidate) { - prepare_for = Some(digest); - } - } - } - - if let Some(digest) = prepare_for { - let message = Message::Prepare( - self.current_accumulator.round_number(), - digest - ); - - self.import_and_send_message(message, context, sending); - self.local_state = LocalState::Prepared; - } - } - - fn commit(&mut self, context: &C, sending: &mut Sending>) { - // commit only if we haven't voted to advance or committed already - match self.local_state { - LocalState::Committed | LocalState::VoteAdvance => return, - _ => {} - } - - let mut commit_for = None; - - if let &State::Prepared(ref p_just) = self.current_accumulator.state() { - // we are now locked to this prepare justification. - let digest = p_just.digest.clone(); - self.locked = Some(Locked { justification: p_just.clone() }); - commit_for = Some(digest); - } - - if let Some(digest) = commit_for { - let message = Message::Commit( - self.current_accumulator.round_number(), - digest - ); - - self.import_and_send_message(message, context, sending); - self.local_state = LocalState::Committed; - } - } - - fn vote_advance(&mut self, context: &C, sending: &mut Sending>) - -> Result<(), ::Error> - { - // we can vote for advancement under all circumstances unless we have already. - if let LocalState::VoteAdvance = self.local_state { return Ok(()) } - - // if we got f + 1 advance votes, or the timeout has fired, and we haven't - // sent an AdvanceRound message yet, do so. - let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty; - - if let Async::Ready(_) = self.round_timeout.poll()? { - attempt_advance = true; - } - - if attempt_advance { - let message = Message::AdvanceRound( - self.current_accumulator.round_number(), - ); - - self.import_and_send_message(message, context, sending); - self.local_state = LocalState::VoteAdvance; - } - - Ok(()) - } - - fn advance_to_round(&mut self, context: &C, round: usize) { - assert!(round > self.current_accumulator.round_number()); - - let threshold = self.nodes - self.max_faulty; - - self.fetching_proposal = None; - self.round_timeout = context.begin_round_timeout(round).fuse(); - self.local_state = LocalState::Start; - - let new_future = Accumulator::new( - round + 1, - threshold, - context.round_proposer(round + 1), - ); - - // when advancing from a round, store away the witnessed proposal. - // - // if we or other participants end up locked on that candidate, - // we will have it. - if let Some(proposal) = self.current_accumulator.proposal() { - let digest = context.candidate_digest(proposal); - self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone()); - } - - // special case when advancing by a single round. - if self.future_accumulator.round_number() == round { - self.current_accumulator - = ::std::mem::replace(&mut self.future_accumulator, new_future); - } else { - self.future_accumulator = new_future; - self.current_accumulator = Accumulator::new( - round, - threshold, - context.round_proposer(round), - ); - } - } - - fn import_and_send_message( - &mut self, - message: Message, - context: &C, - sending: &mut Sending> - ) { - let signed_message = context.sign_local(message); - self.import_message(signed_message.clone()); - sending.push(ContextCommunication(Communication::Consensus(signed_message))); - } -} - -/// Future that resolves upon BFT agreement for a candidate. -#[must_use = "futures do nothing unless polled"] -pub struct Agreement { - context: C, - input: I, - output: O, - concluded: Option>, - sending: Sending>, - strategy: Strategy, -} - -impl Future for Agreement - where - C: Context, - C::RoundTimeout: Future, - C::CreateProposal: Future, - I: Stream,Error=E>, - O: Sink,SinkError=E>, - E: From, -{ - type Item = Committed; - type Error = E; - - fn poll(&mut self) -> Poll { - // even if we've observed the conclusion, wait until all - // pending outgoing messages are flushed. - if let Some(just) = self.concluded.take() { - return Ok(match self.sending.process_all(&mut self.output)? { - Async::Ready(()) => Async::Ready(just), - Async::NotReady => { - self.concluded = Some(just); - Async::NotReady - } - }) - } - - loop { - let message = match self.input.poll()? { - Async::Ready(msg) => msg.ok_or(InputStreamConcluded)?, - Async::NotReady => break, - }; - - match message.0 { - Communication::Consensus(message) => self.strategy.import_message(message), - Communication::Auxiliary(lock_proof) - => self.strategy.import_lock_proof(&self.context, lock_proof), - } - } - - // try to process timeouts. - let state_machine_res = self.strategy.poll(&self.context, &mut self.sending)?; - - // make progress on flushing all pending messages. - let _ = self.sending.process_all(&mut self.output)?; - - match state_machine_res { - Async::Ready(just) => { - self.concluded = Some(just); - self.poll() - } - Async::NotReady => { - - Ok(Async::NotReady) - } - } - } -} - -/// Attempt to reach BFT agreement on a candidate. -/// -/// `nodes` is the number of nodes in the system. -/// `max_faulty` is the maximum number of faulty nodes. Should be less than -/// 1/3 of `nodes`, otherwise agreement may never be reached. -/// -/// The input stream should never logically conclude. The logic here assumes -/// that messages flushed to the output stream will eventually reach other nodes. -/// -/// Note that it is possible to witness agreement being reached without ever -/// seeing the candidate. Any candidates seen will be checked for validity. -/// -/// Although technically the agreement will always complete (given the eventual -/// delivery of messages), in practice it is possible for this future to -/// conclude without having witnessed the conclusion. -/// In general, this future should be pre-empted by the import of a justification -/// set for this block height. -pub fn agree(context: C, nodes: usize, max_faulty: usize, input: I, output: O) - -> Agreement - where - C: Context, - C::RoundTimeout: Future, - C::CreateProposal: Future, - I: Stream,Error=E>, - O: Sink,SinkError=E>, - E: From, -{ - let strategy = Strategy::create(&context, nodes, max_faulty); - Agreement { - context, - input, - output, - concluded: None, - sending: Sending::with_capacity(4), - strategy: strategy, - } -} diff --git a/polkadot/candidate-agreement/src/bft/tests.rs b/polkadot/candidate-agreement/src/bft/tests.rs deleted file mode 100644 index 10ef932124..0000000000 --- a/polkadot/candidate-agreement/src/bft/tests.rs +++ /dev/null @@ -1,350 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Tests for the candidate agreement strategy. - -use super::*; - -use tests::Network; - -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -use futures::prelude::*; -use futures::sync::oneshot; -use futures::future::FutureResult; - -#[derive(Debug, PartialEq, Eq, Clone, Hash)] -struct Candidate(usize); - -#[derive(Debug, PartialEq, Eq, Clone, Hash)] -struct Digest(usize); - -#[derive(Debug, PartialEq, Eq, Clone, Hash)] -struct AuthorityId(usize); - -#[derive(Debug, PartialEq, Eq, Clone)] -struct Signature(Message, AuthorityId); - -struct SharedContext { - node_count: usize, - current_round: usize, - awaiting_round_timeouts: HashMap>>, -} - -#[derive(Debug)] -struct Error; - -impl From for Error { - fn from(_: InputStreamConcluded) -> Error { - Error - } -} - -impl SharedContext { - fn new(node_count: usize) -> Self { - SharedContext { - node_count, - current_round: 0, - awaiting_round_timeouts: HashMap::new() - } - } - - fn round_timeout(&mut self, round: usize) -> Box> { - let (tx, rx) = oneshot::channel(); - if round < self.current_round { - tx.send(()).unwrap(); - } else { - self.awaiting_round_timeouts - .entry(round) - .or_insert_with(Vec::new) - .push(tx); - } - - Box::new(rx.map_err(|_| Error)) - } - - fn bump_round(&mut self) { - let awaiting_timeout = self.awaiting_round_timeouts - .remove(&self.current_round) - .unwrap_or_else(Vec::new); - - for tx in awaiting_timeout { - let _ = tx.send(()); - } - - self.current_round += 1; - } - - fn round_proposer(&self, round: usize) -> AuthorityId { - AuthorityId(round % self.node_count) - } -} - -struct TestContext { - local_id: AuthorityId, - proposal: Mutex, - shared: Arc>, -} - -impl Context for TestContext { - type Candidate = Candidate; - type Digest = Digest; - type AuthorityId = AuthorityId; - type Signature = Signature; - type RoundTimeout = Box>; - type CreateProposal = FutureResult; - - fn local_id(&self) -> AuthorityId { - self.local_id.clone() - } - - fn proposal(&self) -> Self::CreateProposal { - let proposal = { - let mut p = self.proposal.lock().unwrap(); - let x = *p; - *p = (*p * 2) + 1; - x - }; - - Ok(Candidate(proposal)).into_future() - } - - fn candidate_digest(&self, candidate: &Candidate) -> Digest { - Digest(candidate.0) - } - - fn sign_local(&self, message: Message) - -> LocalizedMessage - { - let signature = Signature(message.clone(), self.local_id.clone()); - LocalizedMessage { - message, - signature, - sender: self.local_id.clone() - } - } - - fn round_proposer(&self, round: usize) -> AuthorityId { - self.shared.lock().unwrap().round_proposer(round) - } - - fn candidate_valid(&self, candidate: &Candidate) -> bool { - candidate.0 % 3 != 0 - } - - fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { - self.shared.lock().unwrap().round_timeout(round) - } -} - -fn timeout_in(t: Duration) -> oneshot::Receiver<()> { - let (tx, rx) = oneshot::channel(); - ::std::thread::spawn(move || { - ::std::thread::sleep(t); - let _ = tx.send(()); - }); - - rx -} - -#[test] -fn consensus_completes_with_minimum_good() { - let node_count = 10; - let max_faulty = 3; - - let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count))); - - let (network, net_send, net_recv) = Network::new(node_count); - network.route_on_thread(); - - let nodes = net_send - .into_iter() - .zip(net_recv) - .take(node_count - max_faulty) - .enumerate() - .map(|(i, (tx, rx))| { - let ctx = TestContext { - local_id: AuthorityId(i), - proposal: Mutex::new(i), - shared: shared_context.clone(), - }; - - agree( - ctx, - node_count, - max_faulty, - rx.map_err(|_| Error), - tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), - ) - }) - .collect::>(); - - ::std::thread::spawn(move || { - let mut timeout = ::std::time::Duration::from_millis(50); - loop { - ::std::thread::sleep(timeout.clone()); - shared_context.lock().unwrap().bump_round(); - timeout *= 2; - } - }); - - let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); - let results = ::futures::future::join_all(nodes) - .map(Some) - .select(timeout.map(|_| None)) - .wait() - .map(|(i, _)| i) - .map_err(|(e, _)| e) - .expect("to complete") - .expect("to not time out"); - - for result in &results { - assert_eq!(&result.justification.digest, &results[0].justification.digest); - } -} - -#[test] -fn consensus_does_not_complete_without_enough_nodes() { - let node_count = 10; - let max_faulty = 3; - - let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count))); - - let (network, net_send, net_recv) = Network::new(node_count); - network.route_on_thread(); - - let nodes = net_send - .into_iter() - .zip(net_recv) - .take(node_count - max_faulty - 1) - .enumerate() - .map(|(i, (tx, rx))| { - let ctx = TestContext { - local_id: AuthorityId(i), - proposal: Mutex::new(i), - shared: shared_context.clone(), - }; - - agree( - ctx, - node_count, - max_faulty, - rx.map_err(|_| Error), - tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), - ) - }) - .collect::>(); - - let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); - let result = ::futures::future::join_all(nodes) - .map(Some) - .select(timeout.map(|_| None)) - .wait() - .map(|(i, _)| i) - .map_err(|(e, _)| e) - .expect("to complete"); - - assert!(result.is_none(), "not enough online nodes"); -} - -#[test] -fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { - let node_count = 10; - let max_faulty = 3; - - let locked_proposal = Candidate(999_999_999); - let locked_digest = Digest(999_999_999); - let locked_round = 1; - let justification = UncheckedJustification { - round_number: locked_round, - digest: locked_digest.clone(), - signatures: (0..7) - .map(|i| Signature(Message::Prepare(locked_round, locked_digest.clone()), AuthorityId(i))) - .collect() - }.check(7, |_, _, s| Some(s.1.clone())).unwrap(); - - let mut shared_context = SharedContext::new(node_count); - shared_context.current_round = locked_round + 1; - let shared_context = Arc::new(Mutex::new(shared_context)); - - let (network, net_send, net_recv) = Network::new(node_count); - network.route_on_thread(); - - let nodes = net_send - .into_iter() - .zip(net_recv) - .enumerate() - .map(|(i, (tx, rx))| { - let ctx = TestContext { - local_id: AuthorityId(i), - proposal: Mutex::new(i), - shared: shared_context.clone(), - }; - - let mut agreement = agree( - ctx, - node_count, - max_faulty, - rx.map_err(|_| Error), - tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), - ); - - agreement.strategy.advance_to_round( - &agreement.context, - locked_round + 1 - ); - - if i <= max_faulty { - agreement.strategy.locked = Some(Locked { - justification: justification.clone(), - }) - } - - if i == max_faulty { - agreement.strategy.notable_candidates.insert( - locked_digest.clone(), - locked_proposal.clone(), - ); - } - - agreement - }) - .collect::>(); - - ::std::thread::spawn(move || { - let mut timeout = ::std::time::Duration::from_millis(50); - loop { - ::std::thread::sleep(timeout.clone()); - shared_context.lock().unwrap().bump_round(); - timeout *= 2; - } - }); - - let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); - let results = ::futures::future::join_all(nodes) - .map(Some) - .select(timeout.map(|_| None)) - .wait() - .map(|(i, _)| i) - .map_err(|(e, _)| e) - .expect("to complete") - .expect("to not time out"); - - for result in &results { - assert_eq!(&result.justification.digest, &locked_digest); - } -} diff --git a/polkadot/candidate-agreement/src/handle_incoming.rs b/polkadot/candidate-agreement/src/handle_incoming.rs deleted file mode 100644 index 625c950784..0000000000 --- a/polkadot/candidate-agreement/src/handle_incoming.rs +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! A stream that handles incoming messages to the BFT agreement module and statement -//! table. It forwards as necessary, and dispatches requests for determining availability -//! and validity of candidates as necessary. - -use std::collections::HashSet; - -use futures::prelude::*; -use futures::stream::{Fuse, FuturesUnordered}; -use futures::sync::mpsc; - -use table::{self, Statement, Context as TableContext}; - -use super::{Context, CheckedMessage, SharedTable, TypeResolve}; - -enum CheckResult { - Available, - Unavailable, - Valid, - Invalid, -} - -enum Checking { - Availability(D, A), - Validity(D, V), -} - -impl Future for Checking - where - D: Clone, - A: Future, - V: Future, -{ - type Item = (D, CheckResult); - type Error = E; - - fn poll(&mut self) -> Poll { - Ok(Async::Ready(match *self { - Checking::Availability(ref digest, ref mut f) => { - match try_ready!(f.poll()) { - true => (digest.clone(), CheckResult::Available), - false => (digest.clone(), CheckResult::Unavailable), - } - } - Checking::Validity(ref digest, ref mut f) => { - match try_ready!(f.poll()) { - true => (digest.clone(), CheckResult::Valid), - false => (digest.clone(), CheckResult::Invalid), - } - } - })) - } -} - -/// Handles incoming messages to the BFT service and statement table. -/// -/// Also triggers requests for determining validity and availability of other -/// parachain candidates. -pub struct HandleIncoming { - table: SharedTable, - messages_in: Fuse, - bft_out: mpsc::UnboundedSender<::BftCommunication>, - local_id: C::AuthorityId, - requesting_about: FuturesUnordered::Future, - ::Future, - >>, - checked_validity: HashSet, - checked_availability: HashSet, -} - -impl HandleIncoming { - fn sign_and_import_statement(&self, digest: C::Digest, result: CheckResult) { - let statement = match result { - CheckResult::Valid => Statement::Valid(digest), - CheckResult::Invalid => Statement::Invalid(digest), - CheckResult::Available => Statement::Available(digest), - CheckResult::Unavailable => return, // no such statement and not provable. - }; - - // TODO: trigger broadcast to peers immediately? - self.table.sign_and_import(statement); - } - - fn import_message(&mut self, origin: C::AuthorityId, message: CheckedMessage) { - match message { - CheckedMessage::Bft(msg) => { let _ = self.bft_out.unbounded_send(msg); } - CheckedMessage::Table(table_messages) => { - // import all table messages and check for any that we - // need to produce statements for. - let msg_iter = table_messages - .into_iter() - .map(|m| (m, Some(origin.clone()))); - let summaries: Vec<_> = self.table.import_statements(msg_iter); - - for summary in summaries { - self.dispatch_on_summary(summary) - } - } - } - } - - // on new candidates in our group, begin checking validity. - // on new candidates in our availability sphere, begin checking availability. - fn dispatch_on_summary(&mut self, summary: table::Summary) { - let is_validity_member = - self.table.context().is_member_of(&self.local_id, &summary.group_id); - - let is_availability_member = - self.table.context().is_availability_guarantor_of(&self.local_id, &summary.group_id); - - let digest = &summary.candidate; - - // TODO: consider a strategy based on the number of candidate votes as well. - let checking_validity = - is_validity_member && - self.checked_validity.insert(digest.clone()) && - self.table.proposed_digest() != Some(digest.clone()); - - let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone()); - - if checking_validity || checking_availability { - let context = &*self.table.context(); - let requesting_about = &mut self.requesting_about; - self.table.with_candidate(digest, |c| match c { - None => {} // TODO: handle table inconsistency somehow? - Some(candidate) => { - if checking_validity { - let future = context.check_validity(candidate).into_future(); - let checking = Checking::Validity(digest.clone(), future); - requesting_about.push(checking); - } - - if checking_availability { - let future = context.check_availability(candidate).into_future(); - let checking = Checking::Availability(digest.clone(), future); - requesting_about.push(checking); - } - } - }) - } - } -} - -impl HandleIncoming - where - C: Context, - I: Stream),Error=E>, - C::CheckAvailability: IntoFuture, - C::CheckCandidate: IntoFuture, -{ - pub fn new( - table: SharedTable, - messages_in: I, - bft_out: mpsc::UnboundedSender<::BftCommunication>, - ) -> Self { - let local_id = table.context().local_id(); - - HandleIncoming { - table, - bft_out, - local_id, - messages_in: messages_in.fuse(), - requesting_about: FuturesUnordered::new(), - checked_validity: HashSet::new(), - checked_availability: HashSet::new(), - } - } -} - -impl Future for HandleIncoming - where - C: Context, - I: Stream),Error=E>, - C::CheckAvailability: IntoFuture, - C::CheckCandidate: IntoFuture, -{ - type Item = (); - type Error = E; - - fn poll(&mut self) -> Poll<(), E> { - loop { - // FuturesUnordered is safe to poll after it has completed. - while let Async::Ready(Some((d, r))) = self.requesting_about.poll()? { - self.sign_and_import_statement(d, r); - } - - match try_ready!(self.messages_in.poll()) { - None => if self.requesting_about.is_empty() { - return Ok(Async::Ready(())) - } else { - return Ok(Async::NotReady) - }, - Some((origin, msg)) => self.import_message(origin, msg), - } - } - } -} diff --git a/polkadot/candidate-agreement/src/lib.rs b/polkadot/candidate-agreement/src/lib.rs deleted file mode 100644 index 2cf4be5c54..0000000000 --- a/polkadot/candidate-agreement/src/lib.rs +++ /dev/null @@ -1,625 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Propagation and agreement of candidates. -//! -//! Authorities are split into groups by parachain, and each authority might come -//! up its own candidate for their parachain. Within groups, authorities pass around -//! their candidates and produce statements of validity. -//! -//! Any candidate that receives majority approval by the authorities in a group -//! may be subject to inclusion, unless any authorities flag that candidate as invalid. -//! -//! Wrongly flagging as invalid should be strongly disincentivized, so that in the -//! equilibrium state it is not expected to happen. Likewise with the submission -//! of invalid blocks. -//! -//! Groups themselves may be compromised by malicious authorities. - -#[macro_use] -extern crate futures; -extern crate parking_lot; -extern crate tokio_timer; - -use std::collections::{HashMap, HashSet}; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; -use std::time::Duration; - -use futures::prelude::*; -use futures::sync::{mpsc, oneshot}; -use parking_lot::Mutex; -use tokio_timer::Timer; - -use table::Table; - -mod bft; -mod handle_incoming; -mod round_robin; -mod table; - -#[cfg(test)] -pub mod tests; - -/// Context necessary for agreement. -pub trait Context: Send + Clone { - /// A authority ID - type AuthorityId: Debug + Hash + Eq + Clone + Ord; - /// The digest (hash or other unique attribute) of a candidate. - type Digest: Debug + Hash + Eq + Clone; - /// The group ID type - type GroupId: Debug + Hash + Ord + Eq + Clone; - /// A signature type. - type Signature: Debug + Eq + Clone; - /// Candidate type. In practice this will be a candidate receipt. - type ParachainCandidate: Debug + Ord + Eq + Clone; - /// The actual block proposal type. This is what is agreed upon, and - /// is composed of multiple candidates. - type Proposal: Debug + Eq + Clone; - - /// A future that resolves when a candidate is checked for validity. - /// - /// In Polkadot, this will involve fetching the corresponding block data, - /// producing the necessary ingress, and running the parachain validity function. - type CheckCandidate: IntoFuture; - - /// A future that resolves when availability of a candidate's external - /// data is checked. - type CheckAvailability: IntoFuture; - - /// The statement batch type. - type StatementBatch: StatementBatch< - Self::AuthorityId, - table::SignedStatement, - >; - - /// Get the digest of a candidate. - fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest; - - /// Get the digest of a proposal. - fn proposal_digest(proposal: &Self::Proposal) -> Self::Digest; - - /// Get the group of a candidate. - fn candidate_group(candidate: &Self::ParachainCandidate) -> Self::GroupId; - - /// Get the primary for a given round. - fn round_proposer(&self, round: usize) -> Self::AuthorityId; - - /// Check a candidate for validity. - fn check_validity(&self, candidate: &Self::ParachainCandidate) -> Self::CheckCandidate; - - /// Check availability of candidate data. - fn check_availability(&self, candidate: &Self::ParachainCandidate) -> Self::CheckAvailability; - - /// Attempt to combine a set of parachain candidates into a proposal. - /// - /// This may arbitrarily return `None`, but the intent is for `Some` - /// to only be returned when candidates from enough groups are known. - /// - /// "enough" may be subjective as well. - fn create_proposal(&self, candidates: Vec<&Self::ParachainCandidate>) - -> Option; - - /// Check validity of a proposal. This should call out to the `check_candidate` - /// function for all parachain candidates contained within it, as well as - /// checking other validity constraints of the proposal. - fn proposal_valid(&self, proposal: &Self::Proposal, check_candidate: F) -> bool - where F: FnMut(&Self::ParachainCandidate) -> bool; - - /// Get the local authority ID. - fn local_id(&self) -> Self::AuthorityId; - - /// Sign a table validity statement with the local key. - fn sign_table_statement( - &self, - statement: &table::Statement - ) -> Self::Signature; - - /// Sign a BFT agreement message. - fn sign_bft_message(&self, &bft::Message) -> Self::Signature; -} - -/// Helper for type resolution for contexts until type aliases apply bounds. -pub trait TypeResolve { - type SignedTableStatement; - type BftCommunication; - type BftCommitted; - type Misbehavior; -} - -impl TypeResolve for C { - type SignedTableStatement = table::SignedStatement; - type BftCommunication = bft::Communication; - type BftCommitted = bft::Committed; - type Misbehavior = table::Misbehavior; -} - -/// Information about a specific group. -#[derive(Debug, Clone)] -pub struct GroupInfo { - /// Authorities meant to check validity of candidates. - pub validity_guarantors: HashSet, - /// Authorities meant to check availability of candidate data. - pub availability_guarantors: HashSet, - /// Number of votes needed for validity. - pub needed_validity: usize, - /// Number of votes needed for availability. - pub needed_availability: usize, -} - -struct TableContext { - context: C, - groups: HashMap>, -} - -impl ::std::ops::Deref for TableContext { - type Target = C; - - fn deref(&self) -> &C { - &self.context - } -} - -impl table::Context for TableContext { - type AuthorityId = C::AuthorityId; - type Digest = C::Digest; - type GroupId = C::GroupId; - type Signature = C::Signature; - type Candidate = C::ParachainCandidate; - - fn candidate_digest(candidate: &Self::Candidate) -> Self::Digest { - C::candidate_digest(candidate) - } - - fn candidate_group(candidate: &Self::Candidate) -> Self::GroupId { - C::candidate_group(candidate) - } - - fn is_member_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool { - self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority)) - } - - fn is_availability_guarantor_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool { - self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority)) - } - - fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize) { - self.groups.get(group).map_or( - (usize::max_value(), usize::max_value()), - |g| (g.needed_validity, g.needed_availability), - ) - } -} - -// A shared table object. -struct SharedTableInner { - table: Table>, - proposed_digest: Option, - awaiting_proposal: Vec>, -} - -impl SharedTableInner { - fn import_statement( - &mut self, - context: &TableContext, - statement: ::SignedTableStatement, - received_from: Option - ) -> Option> { - self.table.import_statement(context, statement, received_from) - } - - fn update_proposal(&mut self, context: &TableContext) { - if self.awaiting_proposal.is_empty() { return } - let proposal_candidates = self.table.proposed_candidates(context); - if let Some(proposal) = context.context.create_proposal(proposal_candidates) { - for sender in self.awaiting_proposal.drain(..) { - let _ = sender.send(proposal.clone()); - } - } - } - - fn get_proposal(&mut self, context: &TableContext) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - self.awaiting_proposal.push(tx); - self.update_proposal(context); - rx - } - - fn proposal_valid(&mut self, context: &TableContext, proposal: &C::Proposal) -> bool { - context.context.proposal_valid(proposal, |contained_candidate| { - // check that the candidate is valid (has enough votes) - let digest = C::candidate_digest(contained_candidate); - self.table.candidate_includable(&digest, context) - }) - } -} - -/// A shared table object. -pub struct SharedTable { - context: Arc>, - inner: Arc>>, -} - -impl Clone for SharedTable { - fn clone(&self) -> Self { - SharedTable { - context: self.context.clone(), - inner: self.inner.clone() - } - } -} - -impl SharedTable { - /// Create a new shared table. - pub fn new(context: C, groups: HashMap>) -> Self { - SharedTable { - context: Arc::new(TableContext { context, groups }), - inner: Arc::new(Mutex::new(SharedTableInner { - table: Table::default(), - awaiting_proposal: Vec::new(), - proposed_digest: None, - })) - } - } - - /// Import a single statement. - pub fn import_statement( - &self, - statement: ::SignedTableStatement, - received_from: Option, - ) -> Option> { - self.inner.lock().import_statement(&*self.context, statement, received_from) - } - - /// Sign and import a local statement. - pub fn sign_and_import( - &self, - statement: table::Statement, - ) -> Option> { - let proposed_digest = match statement { - table::Statement::Candidate(ref c) => Some(C::candidate_digest(c)), - _ => None, - }; - - let signed_statement = table::SignedStatement { - signature: self.context.sign_table_statement(&statement), - sender: self.context.local_id(), - statement, - }; - - let mut inner = self.inner.lock(); - if proposed_digest.is_some() { - inner.proposed_digest = proposed_digest; - } - - inner.import_statement(&*self.context, signed_statement, None) - } - - /// Import many statements at once. - /// - /// Provide an iterator yielding pairs of (statement, received_from). - pub fn import_statements(&self, iterable: I) -> U - where - I: IntoIterator::SignedTableStatement, Option)>, - U: ::std::iter::FromIterator>, - { - let mut inner = self.inner.lock(); - - iterable.into_iter().filter_map(move |(statement, received_from)| { - inner.import_statement(&*self.context, statement, received_from) - }).collect() - } - - /// Update the proposal sealing. - pub fn update_proposal(&self) { - self.inner.lock().update_proposal(&*self.context) - } - - /// Register interest in receiving a proposal when ready. - /// If one is ready immediately, it will be provided. - pub fn get_proposal(&self) -> oneshot::Receiver { - self.inner.lock().get_proposal(&*self.context) - } - - /// Check if a proposal is valid. - pub fn proposal_valid(&self, proposal: &C::Proposal) -> bool { - self.inner.lock().proposal_valid(&*self.context, proposal) - } - - /// Execute a closure using a specific candidate. - /// - /// Deadlocks if called recursively. - pub fn with_candidate(&self, digest: &C::Digest, f: F) -> U - where F: FnOnce(Option<&C::ParachainCandidate>) -> U - { - let inner = self.inner.lock(); - f(inner.table.get_candidate(digest)) - } - - /// Get all witnessed misbehavior. - pub fn get_misbehavior(&self) -> HashMap::Misbehavior> { - self.inner.lock().table.get_misbehavior().clone() - } - - /// Fill a statement batch. - pub fn fill_batch(&self, batch: &mut C::StatementBatch) { - self.inner.lock().table.fill_batch(batch); - } - - /// Get the local proposed candidate digest. - pub fn proposed_digest(&self) -> Option { - self.inner.lock().proposed_digest.clone() - } - - // Get a handle to the table context. - fn context(&self) -> &TableContext { - &*self.context - } -} - -/// Errors that can occur during agreement. -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum Error { - IoTerminated, - FaultyTimer, - CannotPropose, -} - -impl From for Error { - fn from(_: bft::InputStreamConcluded) -> Error { - Error::IoTerminated - } -} - -/// Context owned by the BFT future necessary to execute the logic. -pub struct BftContext { - context: C, - table: SharedTable, - timer: Timer, - round_timeout_multiplier: u64, -} - -impl bft::Context for BftContext - where C::Proposal: 'static, -{ - type AuthorityId = C::AuthorityId; - type Digest = C::Digest; - type Signature = C::Signature; - type Candidate = C::Proposal; - type RoundTimeout = Box>; - type CreateProposal = Box>; - - fn local_id(&self) -> Self::AuthorityId { - self.context.local_id() - } - - fn proposal(&self) -> Self::CreateProposal { - Box::new(self.table.get_proposal().map_err(|_| Error::CannotPropose)) - } - - fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest { - C::proposal_digest(candidate) - } - - fn sign_local(&self, message: bft::Message) - -> bft::LocalizedMessage - { - let sender = self.local_id(); - let signature = self.context.sign_bft_message(&message); - bft::LocalizedMessage { - message, - sender, - signature, - } - } - - fn round_proposer(&self, round: usize) -> Self::AuthorityId { - self.context.round_proposer(round) - } - - fn candidate_valid(&self, proposal: &Self::Candidate) -> bool { - self.table.proposal_valid(proposal) - } - - fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { - let round = ::std::cmp::min(63, round) as u32; - let timeout = 1u64.checked_shl(round) - .unwrap_or_else(u64::max_value) - .saturating_mul(self.round_timeout_multiplier); - - Box::new(self.timer.sleep(Duration::from_secs(timeout)) - .map_err(|_| Error::FaultyTimer)) - } -} - - -/// Parameters necessary for agreement. -pub struct AgreementParams { - /// The context itself. - pub context: C, - /// For scheduling timeouts. - pub timer: Timer, - /// The statement table. - pub table: SharedTable, - /// The number of nodes. - pub nodes: usize, - /// The maximum number of faulty nodes. - pub max_faulty: usize, - /// The round timeout multiplier: 2^round_number is multiplied by this. - pub round_timeout_multiplier: u64, - /// The maximum amount of messages to queue. - pub message_buffer_size: usize, - /// Interval to attempt forming proposals over. - pub form_proposal_interval: Duration, -} - -/// Recovery for messages -pub trait MessageRecovery { - /// The unchecked message type. This implies that work hasn't been done - /// to decode the payload and check and authenticate a signature. - type UncheckedMessage; - - /// Attempt to transform a checked message into an unchecked. - fn check_message(&self, Self::UncheckedMessage) -> Option>; -} - -/// A batch of statements to send out. -pub trait StatementBatch { - /// Get the target authorities of these statements. - fn targets(&self) -> &[V]; - - /// If the batch is empty. - fn is_empty(&self) -> bool; - - /// Push a statement onto the batch. Returns false when the batch is full. - /// - /// This is meant to do work like incrementally serializing the statements - /// into a vector of bytes while making sure the length is below a certain - /// amount. - fn push(&mut self, statement: T) -> bool; -} - -/// Recovered and fully checked messages. -pub enum CheckedMessage { - /// Messages meant for the BFT agreement logic. - Bft(::BftCommunication), - /// Statements circulating about the table. - Table(Vec<::SignedTableStatement>), -} - -/// Outgoing messages to the network. -#[derive(Debug, Clone)] -pub enum OutgoingMessage { - /// Messages meant for BFT agreement peers. - Bft(::BftCommunication), - /// Batches of table statements. - Table(C::StatementBatch), -} - -/// Create an agreement future, and I/O streams. -// TODO: kill 'static bounds and use impl Future. -pub fn agree< - Context, - NetIn, - NetOut, - Recovery, - PropagateStatements, - LocalCandidate, - Err, ->( - params: AgreementParams, - net_in: NetIn, - net_out: NetOut, - recovery: Recovery, - propagate_statements: PropagateStatements, - local_candidate: LocalCandidate, -) - -> Box::BftCommitted,Error=Error>> - where - Context: ::Context + 'static, - Context::CheckCandidate: IntoFuture, - Context::CheckAvailability: IntoFuture, - NetIn: Stream),Error=Err> + 'static, - NetOut: Sink> + 'static, - Recovery: MessageRecovery + 'static, - PropagateStatements: Stream + 'static, - LocalCandidate: IntoFuture + 'static -{ - let (bft_in_in, bft_in_out) = mpsc::unbounded(); - let (bft_out_in, bft_out_out) = mpsc::unbounded(); - - let agreement = { - let bft_context = BftContext { - context: params.context, - table: params.table.clone(), - timer: params.timer.clone(), - round_timeout_multiplier: params.round_timeout_multiplier, - }; - - bft::agree( - bft_context, - params.nodes, - params.max_faulty, - bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated), - bft_out_in.sink_map_err(|_| Error::IoTerminated), - ) - }; - - let route_messages_in = { - let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size); - - let round_robin_recovered = round_robin - .filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x))); - - handle_incoming::HandleIncoming::new( - params.table.clone(), - round_robin_recovered, - bft_in_in, - ).map_err(|_| Error::IoTerminated) - }; - - let route_messages_out = { - let table = params.table.clone(); - let periodic_table_statements = propagate_statements - .or_else(|_| ::futures::future::empty()) // halt the stream instead of error. - .map(move |mut batch| { table.fill_batch(&mut batch); batch }) - .filter(|b| !b.is_empty()) - .map(OutgoingMessage::Table); - - let complete_out_stream = bft_out_out - .map_err(|_| Error::IoTerminated) - .map(|bft::ContextCommunication(x)| x) - .map(OutgoingMessage::Bft) - .select(periodic_table_statements); - - net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream) - }; - - let import_local_candidate = { - let table = params.table.clone(); - local_candidate - .into_future() - .map(table::Statement::Candidate) - .map(Some) - .or_else(|_| Ok(None)) - .map(move |s| if let Some(s) = s { - table.sign_and_import(s); - }) - }; - - let create_proposal_on_interval = { - let table = params.table; - params.timer.interval(params.form_proposal_interval) - .map_err(|_| Error::FaultyTimer) - .for_each(move |_| { table.update_proposal(); Ok(()) }) - }; - - // if these auxiliary futures terminate before the agreement, then - // that is an error. - let auxiliary_futures = route_messages_in.join4( - create_proposal_on_interval, - route_messages_out, - import_local_candidate, - ).and_then(|_| Err(Error::IoTerminated)); - - let future = agreement - .select(auxiliary_futures) - .map(|(committed, _)| committed) - .map_err(|(e, _)| e); - - Box::new(future) -} diff --git a/polkadot/candidate-agreement/src/round_robin.rs b/polkadot/candidate-agreement/src/round_robin.rs deleted file mode 100644 index 3f98507cab..0000000000 --- a/polkadot/candidate-agreement/src/round_robin.rs +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Round-robin buffer for incoming messages. -//! -//! This takes batches of messages associated with a sender as input, -//! and yields messages in a fair order by sender. - -use std::collections::{Bound, BTreeMap, VecDeque}; - -use futures::prelude::*; -use futures::stream::Fuse; - -/// Implementation of the round-robin buffer for incoming messages. -#[derive(Debug)] -pub struct RoundRobinBuffer { - buffer: BTreeMap>, - last_processed_from: Option, - stored_messages: usize, - max_messages: usize, - inner: Fuse, -} - -impl RoundRobinBuffer { - /// Create a new round-robin buffer which holds up to a maximum - /// amount of messages. - pub fn new(stream: S, buffer_size: usize) -> Self { - RoundRobinBuffer { - buffer: BTreeMap::new(), - last_processed_from: None, - stored_messages: 0, - max_messages: buffer_size, - inner: stream.fuse(), - } - } -} - -impl RoundRobinBuffer { - fn next_message(&mut self) -> Option<(V, M)> { - if self.stored_messages == 0 { - return None - } - - // first pick up from the last authority we processed a message from - let mut next = { - let lower_bound = match self.last_processed_from { - None => Bound::Unbounded, - Some(ref x) => Bound::Excluded(x.clone()), - }; - - self.buffer.range_mut((lower_bound, Bound::Unbounded)) - .filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v))) - .next() - }; - - // but wrap around to the beginning again if we got nothing. - if next.is_none() { - next = self.buffer.iter_mut() - .filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v))) - .next(); - } - - if let Some((ref authority, _)) = next { - self.stored_messages -= 1; - self.last_processed_from = Some(authority.clone()); - } - - next - } - - // import messages, discarding when the buffer is full. - fn import_messages(&mut self, sender: V, messages: Vec) { - let space_remaining = self.max_messages - self.stored_messages; - self.stored_messages += ::std::cmp::min(space_remaining, messages.len()); - - let v = self.buffer.entry(sender).or_insert_with(VecDeque::new); - v.extend(messages.into_iter().take(space_remaining)); - } -} - -impl Stream for RoundRobinBuffer - where S: Stream)> -{ - type Item = (V, M); - type Error = S::Error; - - fn poll(&mut self) -> Poll, S::Error> { - loop { - match self.inner.poll()? { - Async::NotReady | Async::Ready(None) => break, - Async::Ready(Some((sender, msgs))) => self.import_messages(sender, msgs), - } - } - - let done = self.inner.is_done(); - Ok(match self.next_message() { - Some(msg) => Async::Ready(Some(msg)), - None => if done { Async::Ready(None) } else { Async::NotReady }, - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures::stream::{self, Stream}; - - #[derive(Debug, PartialEq, Eq)] - struct UncheckedMessage { data: Vec } - - #[test] - fn is_fair_and_wraps_around() { - let stream = stream::iter_ok(vec![ - (1, vec![ - UncheckedMessage { data: vec![1, 3, 5] }, - UncheckedMessage { data: vec![3, 5, 7] }, - UncheckedMessage { data: vec![5, 7, 9] }, - ]), - (2, vec![ - UncheckedMessage { data: vec![2, 4, 6] }, - UncheckedMessage { data: vec![4, 6, 8] }, - UncheckedMessage { data: vec![6, 8, 10] }, - ]), - ]); - - let round_robin = RoundRobinBuffer::new(stream, 100); - let output = round_robin.wait().collect::, ()>>().unwrap(); - - assert_eq!(output, vec![ - (1, UncheckedMessage { data: vec![1, 3, 5] }), - (2, UncheckedMessage { data: vec![2, 4, 6] }), - (1, UncheckedMessage { data: vec![3, 5, 7] }), - - (2, UncheckedMessage { data: vec![4, 6, 8] }), - (1, UncheckedMessage { data: vec![5, 7, 9] }), - (2, UncheckedMessage { data: vec![6, 8, 10] }), - ]); - } - - #[test] - fn discards_when_full() { - let stream = stream::iter_ok(vec![ - (1, (0..200).map(|i| UncheckedMessage { data: vec![i] }).collect()) - ]); - - let round_robin = RoundRobinBuffer::new(stream, 100); - let output = round_robin.wait().collect::, ()>>().unwrap(); - - assert_eq!(output.len(), 100); - } -} diff --git a/polkadot/candidate-agreement/src/tests/mod.rs b/polkadot/candidate-agreement/src/tests/mod.rs deleted file mode 100644 index 1599a94aa6..0000000000 --- a/polkadot/candidate-agreement/src/tests/mod.rs +++ /dev/null @@ -1,385 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Tests and test helpers for the candidate agreement. - -const VALIDITY_CHECK_DELAY_MS: u64 = 100; -const AVAILABILITY_CHECK_DELAY_MS: u64 = 100; -const PROPOSAL_FORMATION_TICK_MS: u64 = 50; -const PROPAGATE_STATEMENTS_TICK_MS: u64 = 200; -const TIMER_TICK_DURATION_MS: u64 = 10; - -use std::collections::HashMap; - -use futures::prelude::*; -use futures::sync::mpsc; -use tokio_timer::Timer; - -use super::*; - -#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)] -struct AuthorityId(usize); - -#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)] -struct Digest(Vec); - -#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)] -struct GroupId(usize); - -#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)] -struct ParachainCandidate { - group: GroupId, - data: usize, -} - -#[derive(PartialEq, Eq, Debug, Clone)] -struct Proposal { - candidates: Vec, -} - -#[derive(PartialEq, Eq, Debug, Clone)] -enum Signature { - Table(AuthorityId, table::Statement), - Bft(AuthorityId, bft::Message), -} - -enum Error { - Timer(tokio_timer::TimerError), - NetOut, - NetIn, -} - -#[derive(Debug, Clone)] -struct SharedTestContext { - n_authorities: usize, - n_groups: usize, - timer: Timer, -} - -#[derive(Debug, Clone)] -struct TestContext { - shared: Arc, - local_id: AuthorityId, -} - -impl Context for TestContext { - type AuthorityId = AuthorityId; - type Digest = Digest; - type GroupId = GroupId; - type Signature = Signature; - type Proposal = Proposal; - type ParachainCandidate = ParachainCandidate; - - type CheckCandidate = Box>; - type CheckAvailability = Box>; - - type StatementBatch = VecBatch< - AuthorityId, - table::SignedStatement - >; - - fn candidate_digest(candidate: &ParachainCandidate) -> Digest { - Digest(vec![candidate.group.0, candidate.data]) - } - - fn proposal_digest(candidate: &Proposal) -> Digest { - Digest(candidate.candidates.iter().fold(Vec::new(), |mut a, c| { - a.extend(Self::candidate_digest(c).0); - a - })) - } - - fn candidate_group(candidate: &ParachainCandidate) -> GroupId { - candidate.group.clone() - } - - fn round_proposer(&self, round: usize) -> AuthorityId { - AuthorityId(round % self.shared.n_authorities) - } - - fn check_validity(&self, _candidate: &ParachainCandidate) -> Self::CheckCandidate { - let future = self.shared.timer - .sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS)) - .map_err(Error::Timer) - .map(|_| true); - - Box::new(future) - } - - fn check_availability(&self, _candidate: &ParachainCandidate) -> Self::CheckAvailability { - let future = self.shared.timer - .sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS)) - .map_err(Error::Timer) - .map(|_| true); - - Box::new(future) - } - - fn create_proposal(&self, candidates: Vec<&ParachainCandidate>) - -> Option - { - let t = self.shared.n_groups * 2 / 3; - if candidates.len() >= t { - Some(Proposal { - candidates: candidates.iter().map(|x| (&**x).clone()).collect() - }) - } else { - None - } - } - - fn proposal_valid(&self, proposal: &Proposal, check_candidate: F) -> bool - where F: FnMut(&ParachainCandidate) -> bool - { - if proposal.candidates.len() >= self.shared.n_groups * 2 / 3 { - proposal.candidates.iter().all(check_candidate) - } else { - false - } - } - - fn local_id(&self) -> AuthorityId { - self.local_id.clone() - } - - fn sign_table_statement( - &self, - statement: &table::Statement - ) -> Signature { - Signature::Table(self.local_id(), statement.clone()) - } - - fn sign_bft_message(&self, message: &bft::Message) -> Signature { - Signature::Bft(self.local_id(), message.clone()) - } -} - -struct TestRecovery; - -impl MessageRecovery for TestRecovery { - type UncheckedMessage = OutgoingMessage; - - fn check_message(&self, msg: Self::UncheckedMessage) -> Option> { - Some(match msg { - OutgoingMessage::Bft(c) => CheckedMessage::Bft(c), - OutgoingMessage::Table(batch) => CheckedMessage::Table(batch.items), - }) - } -} - -pub struct Network { - endpoints: Vec>, - input: mpsc::UnboundedReceiver<(usize, T)>, -} - -impl Network { - pub fn new(nodes: usize) - -> (Self, Vec>, Vec>) - { - let mut inputs = Vec::with_capacity(nodes); - let mut outputs = Vec::with_capacity(nodes); - let mut endpoints = Vec::with_capacity(nodes); - - let (in_tx, in_rx) = mpsc::unbounded(); - for _ in 0..nodes { - let (out_tx, out_rx) = mpsc::unbounded(); - inputs.push(in_tx.clone()); - outputs.push(out_rx); - endpoints.push(out_tx); - } - - let network = Network { - endpoints, - input: in_rx, - }; - - (network, inputs, outputs) - } - - pub fn route_on_thread(self) { - ::std::thread::spawn(move || { let _ = self.wait(); }); - } -} - -impl Future for Network { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), Self::Error> { - match try_ready!(self.input.poll()) { - None => Ok(Async::Ready(())), - Some((sender, item)) => { - { - let receiving_endpoints = self.endpoints - .iter() - .enumerate() - .filter(|&(i, _)| i != sender) - .map(|(_, x)| x); - - for endpoint in receiving_endpoints { - let _ = endpoint.unbounded_send(item.clone()); - } - } - - self.poll() - } - } - } -} - -#[derive(Debug, Clone)] -pub struct VecBatch { - pub max_len: usize, - pub targets: Vec, - pub items: Vec, -} - -impl ::StatementBatch for VecBatch { - fn targets(&self) -> &[V] { &self.targets } - fn is_empty(&self) -> bool { self.items.is_empty() } - fn push(&mut self, item: T) -> bool { - if self.items.len() == self.max_len { - false - } else { - self.items.push(item); - true - } - } -} - -fn make_group_assignments(n_authorities: usize, n_groups: usize) - -> HashMap> -{ - let mut map = HashMap::new(); - let threshold = (n_authorities / n_groups) / 2; - let make_blank_group = || { - GroupInfo { - validity_guarantors: HashSet::new(), - availability_guarantors: HashSet::new(), - needed_validity: threshold, - needed_availability: threshold, - } - }; - - // every authority checks validity of his ID modulo n_groups and - // guarantees availability for the group above that. - for a_id in 0..n_authorities { - let primary_group = a_id % n_groups; - let availability_groups = [ - (a_id + 1) % n_groups, - a_id.wrapping_sub(1) % n_groups, - ]; - - map.entry(GroupId(primary_group)) - .or_insert_with(&make_blank_group) - .validity_guarantors - .insert(AuthorityId(a_id)); - - for &availability_group in &availability_groups { - map.entry(GroupId(availability_group)) - .or_insert_with(&make_blank_group) - .availability_guarantors - .insert(AuthorityId(a_id)); - } - } - - map -} - -fn make_blank_batch(n_authorities: usize) -> VecBatch { - VecBatch { - max_len: 20, - targets: (0..n_authorities).map(AuthorityId).collect(), - items: Vec::new(), - } -} - -#[test] -fn consensus_completes_with_minimum_good() { - let n = 50; - let f = 16; - let n_groups = 10; - - let timer = ::tokio_timer::wheel() - .tick_duration(Duration::from_millis(TIMER_TICK_DURATION_MS)) - .num_slots(1 << 16) - .build(); - - let (network, inputs, outputs) = Network::<(AuthorityId, OutgoingMessage)>::new(n - f); - network.route_on_thread(); - - let shared_test_context = Arc::new(SharedTestContext { - n_authorities: n, - n_groups: n_groups, - timer: timer.clone(), - }); - - let groups = make_group_assignments(n, n_groups); - - let authorities = inputs.into_iter().zip(outputs).enumerate().map(|(raw_id, (input, output))| { - let id = AuthorityId(raw_id); - let context = TestContext { - shared: shared_test_context.clone(), - local_id: id, - }; - - let shared_table = SharedTable::new(context.clone(), groups.clone()); - let params = AgreementParams { - context, - timer: timer.clone(), - table: shared_table, - nodes: n, - max_faulty: f, - round_timeout_multiplier: 4, - message_buffer_size: 100, - form_proposal_interval: Duration::from_millis(PROPOSAL_FORMATION_TICK_MS), - }; - - let net_out = input - .sink_map_err(|_| Error::NetOut) - .with(move |x| Ok::<_, Error>((id.0, (id, x))) ); - - let net_in = output - .map_err(|_| Error::NetIn) - .map(move |(v, msg)| (v, vec![msg])); - - let propagate_statements = timer - .interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS)) - .map(move |()| make_blank_batch(n)) - .map_err(Error::Timer); - - let local_candidate = if raw_id < n_groups { - let candidate = ParachainCandidate { - group: GroupId(raw_id), - data: raw_id, - }; - ::futures::future::Either::A(Ok::<_, Error>(candidate).into_future()) - } else { - ::futures::future::Either::B(::futures::future::empty()) - }; - - agree::<_, _, _, _, _, _, Error>( - params, - net_in, - net_out, - TestRecovery, - propagate_statements, - local_candidate - ) - }).collect::>(); - - futures::future::join_all(authorities).wait().unwrap(); -} diff --git a/polkadot/consensus/Cargo.toml b/polkadot/consensus/Cargo.toml new file mode 100644 index 0000000000..aeda287f80 --- /dev/null +++ b/polkadot/consensus/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "polkadot-consensus" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +futures = "0.1.17" +parking_lot = "0.4" +tokio-timer = "0.1.2" +ed25519 = { path = "../../substrate/ed25519" } +polkadot-primitives = { path = "../primitives" } +polkadot-statement-table = { path = "../statement-table" } +substrate-bft = { path = "../../substrate/bft" } +substrate-codec = { path = "../../substrate/codec" } +substrate-primitives = { path = "../../substrate/primitives" } diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs new file mode 100644 index 0000000000..f3e62ba1d5 --- /dev/null +++ b/polkadot/consensus/src/lib.rs @@ -0,0 +1,243 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Propagation and agreement of candidates. +//! +//! Authorities are split into groups by parachain, and each authority might come +//! up its own candidate for their parachain. Within groups, authorities pass around +//! their candidates and produce statements of validity. +//! +//! Any candidate that receives majority approval by the authorities in a group +//! may be subject to inclusion, unless any authorities flag that candidate as invalid. +//! +//! Wrongly flagging as invalid should be strongly disincentivized, so that in the +//! equilibrium state it is not expected to happen. Likewise with the submission +//! of invalid blocks. +//! +//! Groups themselves may be compromised by malicious authorities. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use codec::Slicable; +use table::Table; +use table::generic::Statement as GenericStatement; +use polkadot_primitives::Hash; +use polkadot_primitives::parachain::{Id as ParaId, CandidateReceipt}; +use primitives::block::Block as SubstrateBlock; +use primitives::AuthorityId; + +use parking_lot::Mutex; + +extern crate futures; +extern crate ed25519; +extern crate parking_lot; +extern crate tokio_timer; +extern crate polkadot_statement_table as table; +extern crate polkadot_primitives; +extern crate substrate_bft as bft; +extern crate substrate_codec as codec; +extern crate substrate_primitives as primitives; + +/// Information about a specific group. +#[derive(Debug, Clone)] +pub struct GroupInfo { + /// Authorities meant to check validity of candidates. + pub validity_guarantors: HashSet, + /// Authorities meant to check availability of candidate data. + pub availability_guarantors: HashSet, + /// Number of votes needed for validity. + pub needed_validity: usize, + /// Number of votes needed for availability. + pub needed_availability: usize, +} + +struct TableContext { + parent_hash: Hash, + key: Arc, + groups: HashMap, +} + +impl table::Context for TableContext { + fn is_member_of(&self, authority: &AuthorityId, group: &ParaId) -> bool { + self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority)) + } + + fn is_availability_guarantor_of(&self, authority: &AuthorityId, group: &ParaId) -> bool { + self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority)) + } + + fn requisite_votes(&self, group: &ParaId) -> (usize, usize) { + self.groups.get(group).map_or( + (usize::max_value(), usize::max_value()), + |g| (g.needed_validity, g.needed_availability), + ) + } +} + +impl TableContext { + fn sign_statement(&self, statement: table::Statement) -> table::SignedStatement { + let signature = sign_table_statement(&statement, &self.key, &self.parent_hash); + let local_id = self.key.public().0; + + table::SignedStatement { + statement, + signature, + sender: local_id, + } + } +} + +/// Sign a table statement against a parent hash. +/// The actual message signed is the encoded statement concatenated with the +/// parent hash. +pub fn sign_table_statement(statement: &table::Statement, key: &ed25519::Pair, parent_hash: &Hash) -> ed25519::Signature { + use polkadot_primitives::parachain::Statement as RawStatement; + + let raw = match *statement { + GenericStatement::Candidate(ref c) => RawStatement::Candidate(c.clone()), + GenericStatement::Valid(h) => RawStatement::Valid(h), + GenericStatement::Invalid(h) => RawStatement::Invalid(h), + GenericStatement::Available(h) => RawStatement::Available(h), + }; + + let mut encoded = raw.encode(); + encoded.extend(&parent_hash.0); + + key.sign(&encoded) +} + +// A shared table object. +struct SharedTableInner { + table: Table, + proposed_digest: Option, +} + +impl SharedTableInner { + fn import_statement( + &mut self, + context: &TableContext, + statement: ::table::SignedStatement, + received_from: Option, + ) -> Option { + self.table.import_statement(context, statement, received_from) + } +} + +/// A shared table object. +pub struct SharedTable { + context: Arc, + inner: Arc>, +} + +impl Clone for SharedTable { + fn clone(&self) -> Self { + SharedTable { + context: self.context.clone(), + inner: self.inner.clone() + } + } +} + +impl SharedTable { + /// Create a new shared table. + /// + /// Provide the key to sign with, and the parent hash of the relay chain + /// block being built. + pub fn new(groups: HashMap, key: Arc, parent_hash: Hash) -> Self { + SharedTable { + context: Arc::new(TableContext { groups, key, parent_hash }), + inner: Arc::new(Mutex::new(SharedTableInner { + table: Table::default(), + proposed_digest: None, + })) + } + } + + /// Import a single statement. + pub fn import_statement( + &self, + statement: table::SignedStatement, + received_from: Option, + ) -> Option { + self.inner.lock().import_statement(&*self.context, statement, received_from) + } + + /// Sign and import a local statement. + pub fn sign_and_import( + &self, + statement: table::Statement, + ) -> Option { + let proposed_digest = match statement { + GenericStatement::Candidate(ref c) => Some(c.hash()), + _ => None, + }; + + let signed_statement = self.context.sign_statement(statement); + + let mut inner = self.inner.lock(); + if proposed_digest.is_some() { + inner.proposed_digest = proposed_digest; + } + + inner.import_statement(&*self.context, signed_statement, None) + } + + /// Import many statements at once. + /// + /// Provide an iterator yielding pairs of (statement, received_from). + pub fn import_statements(&self, iterable: I) -> U + where + I: IntoIterator)>, + U: ::std::iter::FromIterator, + { + let mut inner = self.inner.lock(); + + iterable.into_iter().filter_map(move |(statement, received_from)| { + inner.import_statement(&*self.context, statement, received_from) + }).collect() + } + + /// Check if a proposal is valid. + pub fn proposal_valid(&self, _proposal: &SubstrateBlock) -> bool { + false // TODO + } + + /// Execute a closure using a specific candidate. + /// + /// Deadlocks if called recursively. + pub fn with_candidate(&self, digest: &Hash, f: F) -> U + where F: FnOnce(Option<&CandidateReceipt>) -> U + { + let inner = self.inner.lock(); + f(inner.table.get_candidate(digest)) + } + + /// Get all witnessed misbehavior. + pub fn get_misbehavior(&self) -> HashMap { + self.inner.lock().table.get_misbehavior().clone() + } + + /// Fill a statement batch. + pub fn fill_batch(&self, batch: &mut B) { + self.inner.lock().table.fill_batch(batch); + } + + /// Get the local proposed block's hash. + pub fn proposed_hash(&self) -> Option { + self.inner.lock().proposed_digest.clone() + } +} diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index 0f4c4adefc..a39904f44e 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -20,7 +20,9 @@ use primitives::bytes; use primitives; use codec::{Input, Slicable, NonTrivialSlicable}; +use rstd::cmp::{PartialOrd, Ord, Ordering}; use rstd::vec::Vec; +use ::Hash; /// Unique identifier of a parachain. #[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] @@ -159,6 +161,55 @@ pub struct CandidateReceipt { pub fees: u64, } +impl Slicable for CandidateReceipt { + fn encode(&self) -> Vec { + let mut v = Vec::new(); + + self.parachain_index.using_encoded(|s| v.extend(s)); + self.collator.using_encoded(|s| v.extend(s)); + self.head_data.0.using_encoded(|s| v.extend(s)); + self.balance_uploads.using_encoded(|s| v.extend(s)); + self.egress_queue_roots.using_encoded(|s| v.extend(s)); + self.fees.using_encoded(|s| v.extend(s)); + + v + } + + fn decode(input: &mut I) -> Option { + Some(CandidateReceipt { + parachain_index: try_opt!(Slicable::decode(input)), + collator: try_opt!(Slicable::decode(input)), + head_data: try_opt!(Slicable::decode(input).map(HeadData)), + balance_uploads: try_opt!(Slicable::decode(input)), + egress_queue_roots: try_opt!(Slicable::decode(input)), + fees: try_opt!(Slicable::decode(input)), + }) + } +} + +impl CandidateReceipt { + /// Get the blake2_256 hash + #[cfg(feature = "std")] + pub fn hash(&self) -> Hash { + let encoded = self.encode(); + primitives::hashing::blake2_256(&encoded).into() + } +} + +impl PartialOrd for CandidateReceipt { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for CandidateReceipt { + fn cmp(&self, other: &Self) -> Ordering { + // TODO: compare signatures or something more sane + self.parachain_index.cmp(&other.parachain_index) + .then_with(|| self.head_data.cmp(&other.head_data)) + } +} + /// Parachain ingress queue message. #[derive(PartialEq, Eq, Clone)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] @@ -185,7 +236,7 @@ pub struct BlockData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); /// Parachain head data included in the chain. -#[derive(PartialEq, Eq, Clone)] +#[derive(PartialEq, Eq, Clone, PartialOrd, Ord)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] pub struct HeadData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); @@ -209,6 +260,74 @@ impl Slicable for Activity { } } +#[derive(Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(Debug))] +#[repr(u8)] +enum StatementKind { + Candidate = 1, + Valid = 2, + Invalid = 3, + Available = 4, +} + +/// Statements which can be made about parachain candidates. +#[derive(Clone, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(Debug))] +pub enum Statement { + /// Proposal of a parachain candidate. + Candidate(CandidateReceipt), + /// State that a parachain candidate is valid. + Valid(Hash), + /// Vote to commit to a candidate. + Invalid(Hash), + /// Vote to advance round after inactive primary. + Available(Hash), +} + +impl Slicable for Statement { + fn encode(&self) -> Vec { + let mut v = Vec::new(); + match *self { + Statement::Candidate(ref candidate) => { + v.push(StatementKind::Candidate as u8); + candidate.using_encoded(|s| v.extend(s)); + } + Statement::Valid(ref hash) => { + v.push(StatementKind::Valid as u8); + hash.using_encoded(|s| v.extend(s)); + } + Statement::Invalid(ref hash) => { + v.push(StatementKind::Invalid as u8); + hash.using_encoded(|s| v.extend(s)); + } + Statement::Available(ref hash) => { + v.push(StatementKind::Available as u8); + hash.using_encoded(|s| v.extend(s)); + } + } + + v + } + + fn decode(value: &mut I) -> Option { + match u8::decode(value) { + Some(x) if x == StatementKind::Candidate as u8 => { + Slicable::decode(value).map(Statement::Candidate) + } + Some(x) if x == StatementKind::Valid as u8 => { + Slicable::decode(value).map(Statement::Valid) + } + Some(x) if x == StatementKind::Invalid as u8 => { + Slicable::decode(value).map(Statement::Invalid) + } + Some(x) if x == StatementKind::Available as u8 => { + Slicable::decode(value).map(Statement::Available) + } + _ => None, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/polkadot/primitives/src/transaction.rs b/polkadot/primitives/src/transaction.rs index 24a3ae4ee3..6279a2d0aa 100644 --- a/polkadot/primitives/src/transaction.rs +++ b/polkadot/primitives/src/transaction.rs @@ -152,7 +152,6 @@ impl Slicable for Proposal { } } - /// Public functions that can be dispatched to. #[derive(Clone, Copy, PartialEq, Eq)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] @@ -314,7 +313,7 @@ pub struct UncheckedTransaction { impl Slicable for UncheckedTransaction { fn decode(input: &mut I) -> Option { - // This is a little more complicated than usua since the binary format must be compatible + // This is a little more complicated than usual since the binary format must be compatible // with substrate's generic `Vec` type. Basically this just means accepting that there // will be a prefix of u32, which has the total number of bytes following (we don't need // to use this). diff --git a/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm b/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm index b4b77e5ec9..c6fc9b035c 100644 Binary files a/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm and b/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm differ diff --git a/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm b/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm index 2b5124161a..9f4e4cc836 100644 Binary files a/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm and b/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm differ diff --git a/polkadot/statement-table/Cargo.toml b/polkadot/statement-table/Cargo.toml new file mode 100644 index 0000000000..5c8a61e81d --- /dev/null +++ b/polkadot/statement-table/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "polkadot-statement-table" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +substrate-primitives = { path = "../../substrate/primitives" } +polkadot-primitives = { path = "../primitives" } diff --git a/polkadot/candidate-agreement/src/table.rs b/polkadot/statement-table/src/generic.rs similarity index 97% rename from polkadot/candidate-agreement/src/table.rs rename to polkadot/statement-table/src/generic.rs index 2909d219c6..11665fe114 100644 --- a/polkadot/candidate-agreement/src/table.rs +++ b/polkadot/statement-table/src/generic.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! The statement table. +//! The statement table: generic implementation. //! //! This stores messages other authorities issue about candidates. //! @@ -32,7 +32,21 @@ use std::collections::hash_map::{HashMap, Entry}; use std::hash::Hash; use std::fmt::Debug; -use super::StatementBatch; +/// A batch of statements to send out. +pub trait StatementBatch { + /// Get the target authorities of these statements. + fn targets(&self) -> &[V]; + + /// If the batch is empty. + fn is_empty(&self) -> bool; + + /// Push a statement onto the batch. Returns false when the batch is full. + /// + /// This is meant to do work like incrementally serializing the statements + /// into a vector of bytes while making sure the length is below a certain + /// amount. + fn push(&mut self, statement: T) -> bool; +} /// Context for the statement table. pub trait Context { @@ -380,7 +394,7 @@ impl Table { &self.detected_misbehavior } - /// Fill a statement batch and note messages seen by the targets. + /// Fill a statement batch and note messages as seen by the targets. pub fn fill_batch(&mut self, batch: &mut B) where B: StatementBatch< C::AuthorityId, @@ -709,9 +723,28 @@ impl Table { #[cfg(test)] mod tests { use super::*; - use ::tests::VecBatch; use std::collections::HashMap; + #[derive(Debug, Clone)] + struct VecBatch { + pub max_len: usize, + pub targets: Vec, + pub items: Vec, + } + + impl ::generic::StatementBatch for VecBatch { + fn targets(&self) -> &[V] { &self.targets } + fn is_empty(&self) -> bool { self.items.is_empty() } + fn push(&mut self, item: T) -> bool { + if self.items.len() == self.max_len { + false + } else { + self.items.push(item); + true + } + } + } + fn create() -> Table { Table { authority_data: HashMap::default(), diff --git a/polkadot/statement-table/src/lib.rs b/polkadot/statement-table/src/lib.rs new file mode 100644 index 0000000000..e3abf95686 --- /dev/null +++ b/polkadot/statement-table/src/lib.rs @@ -0,0 +1,108 @@ +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The statement table. +//! +//! This stores messages other authorities issue about candidates. +//! +//! These messages are used to create a proposal submitted to a BFT consensus process. +//! +//! Proposals are formed of sets of candidates which have the requisite number of +//! validity and availability votes. +//! +//! Each parachain is associated with two sets of authorities: those which can +//! propose and attest to validity of candidates, and those who can only attest +//! to availability. + +extern crate substrate_primitives; +extern crate polkadot_primitives as primitives; + +pub mod generic; + +pub use generic::Table; + +use primitives::parachain::{Id, CandidateReceipt}; +use primitives::{SessionKey, Hash, Signature}; + +/// Statements about candidates on the network. +pub type Statement = generic::Statement; + +/// Signed statements about candidates. +pub type SignedStatement = generic::SignedStatement; + +/// Kinds of misbehavior, along with proof. +pub type Misbehavior = generic::Misbehavior; + +/// A summary of import of a statement. +pub type Summary = generic::Summary; + +/// Context necessary to construct a table. +pub trait Context { + /// Whether a authority is a member of a group. + /// Members are meant to submit candidates and vote on validity. + fn is_member_of(&self, authority: &SessionKey, group: &Id) -> bool; + + /// Whether a authority is an availability guarantor of a group. + /// Guarantors are meant to vote on availability for candidates submitted + /// in a group. + fn is_availability_guarantor_of( + &self, + authority: &SessionKey, + group: &Id, + ) -> bool; + + // requisite number of votes for validity and availability respectively from a group. + fn requisite_votes(&self, group: &Id) -> (usize, usize); +} + +impl generic::Context for C { + type AuthorityId = SessionKey; + type Digest = Hash; + type GroupId = Id; + type Signature = Signature; + type Candidate = CandidateReceipt; + + fn candidate_digest(candidate: &CandidateReceipt) -> Hash { + candidate.hash() + } + + fn candidate_group(candidate: &CandidateReceipt) -> Id { + candidate.parachain_index.clone() + } + + fn is_member_of(&self, authority: &SessionKey, group: &Id) -> bool { + Context::is_member_of(self, authority, group) + } + + fn is_availability_guarantor_of(&self, authority: &SessionKey, group: &Id) -> bool { + Context::is_availability_guarantor_of(self, authority, group) + } + + fn requisite_votes(&self, group: &Id) -> (usize, usize) { + Context::requisite_votes(self, group) + } +} + +/// A batch of statements to send out. +pub trait StatementBatch { + /// Get the target authorities of these statements. + fn targets(&self) -> &[SessionKey]; + + /// If the batch is empty. + fn is_empty(&self) -> bool; + + /// Push a statement onto the batch. Returns false when the batch is full. + /// + /// This is meant to do work like incrementally serializing the statements + /// into a vector of bytes while making sure the length is below a certain + /// amount. + fn push(&mut self, statement: SignedStatement) -> bool; +} + +impl generic::StatementBatch for T { + fn targets(&self) -> &[SessionKey] { StatementBatch::targets(self ) } + fn is_empty(&self) -> bool { StatementBatch::is_empty(self) } + fn push(&mut self, statement: SignedStatement) -> bool { + StatementBatch::push(self, statement) + } +}