diff --git a/substrate/candidate-agreement/src/bft/accumulator.rs b/substrate/candidate-agreement/src/bft/accumulator.rs index ce54c55ea1..78f2958bc5 100644 --- a/substrate/candidate-agreement/src/bft/accumulator.rs +++ b/substrate/candidate-agreement/src/bft/accumulator.rs @@ -35,7 +35,7 @@ pub trait Context { } /// Justification at a given round. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub struct Justification { /// The round. pub round_number: usize, @@ -54,21 +54,21 @@ impl Justification { /// The closure should return true iff the round number, digest, and signature /// represent a valid prepare message and the signer was authorized to issue /// it. - pub fn check(&self, max_faulty: usize, check_message: F) -> bool + pub fn check(&self, threshold: usize, check_message: F) -> bool where F: Fn(usize, &D, &S) -> Option, V: Hash + Eq, { - let mut prepared = HashSet::new(); + let mut voted = HashSet::new(); let mut good = false; for signature in &self.signatures { match check_message(self.round_number, &self.digest, signature) { None => return false, Some(v) => { - if !prepared.insert(v) { + if !voted.insert(v) { return false; - } else if prepared.len() > max_faulty * 2 { + } else if voted.len() >= threshold { // don't return just yet since later signatures may be invalid. good = true; } @@ -93,15 +93,22 @@ pub enum State { /// Seen 2f + 1 prepares for this digest. Prepared(PrepareJustification), /// Seen 2f + 1 commits for a digest. - Concluded(Justification), + Committed(Justification), /// Seen 2f + 1 round-advancement messages. Advanced(Option>), } /// Accumulates messages for a given round of BFT consensus. -pub struct Accumulator { +#[derive(Debug)] +pub struct Accumulator + where + C: Eq + Clone, + D: Hash + Eq + Clone, + V: Hash + Eq, + S: Eq + Clone, +{ round_number: usize, - max_faulty: usize, + threshold: usize, round_proposer: V, proposal: Option, prepares: HashMap, @@ -114,15 +121,15 @@ pub struct Accumulator { impl Accumulator where C: Eq + Clone, - D: Hash + Clone + Eq, + D: Hash + Eq + Clone, V: Hash + Eq, S: Eq + Clone, { /// Create a new state accumulator. - pub fn new(round_number: usize, max_faulty: usize, round_proposer: V) -> Self { + pub fn new(round_number: usize, threshold: usize, round_proposer: V) -> Self { Accumulator { round_number, - max_faulty, + threshold, round_proposer, proposal: None, prepares: HashMap::new(), @@ -138,6 +145,20 @@ impl Accumulator self.advance_round.len() } + /// Get the round number. + pub fn round_number(&self) -> usize { + self.round_number.clone() + } + + /// Get the round proposer. + pub fn round_proposer(&self) -> &V { + &self.round_proposer + } + + pub fn proposal(&self) -> Option<&C> { + self.proposal.as_ref() + } + /// Inspect the current consensus state. pub fn state(&self) -> &State { &self.state @@ -189,7 +210,7 @@ impl Accumulator let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0)); count.0 += 1; - if count.0 == self.max_faulty * 2 + 1 { + if count.0 == self.threshold { Some(candidate) } else { None @@ -232,7 +253,7 @@ impl Accumulator let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0)); count.1 += 1; - if count.1 == self.max_faulty * 2 + 1 { + if count.1 == self.threshold { Some(candidate) } else { None @@ -252,7 +273,7 @@ impl Accumulator .map(|&(_, ref s)| s.clone()) .collect(); - self.state = State::Concluded(Justification { + self.state = State::Committed(Justification { round_number: self.round_number, digest: committed_for, signatures: signatures, @@ -266,12 +287,12 @@ impl Accumulator ) { self.advance_round.insert(sender); - if self.advance_round.len() != self.max_faulty * 2 + 1 { return } + if self.advance_round.len() != self.threshold { return } // allow transition to new round only if we haven't produced a justification // yet. self.state = match ::std::mem::replace(&mut self.state, State::Begin) { - State::Concluded(j) => State::Concluded(j), + State::Committed(j) => State::Committed(j), State::Prepared(j) => State::Advanced(Some(j)), State::Advanced(j) => State::Advanced(j), State::Begin | State::Proposed(_) => State::Advanced(None), @@ -311,24 +332,24 @@ mod tests { } }; - assert!(justification.check(3, &check_message)); - assert!(!justification.check(5, &check_message)); + assert!(justification.check(7, &check_message)); + assert!(!justification.check(11, &check_message)); { // one bad signature is enough to spoil it. justification.signatures.push(Signature(1001, 255)); - assert!(!justification.check(3, &check_message)); + assert!(!justification.check(7, &check_message)); justification.signatures.pop(); } // duplicates not allowed. justification.signatures.extend((0..10).map(|i| Signature(600, i))); - assert!(!justification.check(3, &check_message)); + assert!(!justification.check(11, &check_message)); } #[test] fn accepts_proposal_from_proposer_only() { - let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 3, ValidatorId(8)); + let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, ValidatorId(8)); assert_eq!(accumulator.state(), &State::Begin); accumulator.import_message(LocalizedMessage { @@ -350,7 +371,7 @@ mod tests { #[test] fn reaches_prepare_phase() { - let mut accumulator = Accumulator::new(1, 3, ValidatorId(8)); + let mut accumulator = Accumulator::new(1, 7, ValidatorId(8)); assert_eq!(accumulator.state(), &State::Begin); accumulator.import_message(LocalizedMessage { @@ -385,7 +406,7 @@ mod tests { #[test] fn prepare_to_commit() { - let mut accumulator = Accumulator::new(1, 3, ValidatorId(8)); + let mut accumulator = Accumulator::new(1, 7, ValidatorId(8)); assert_eq!(accumulator.state(), &State::Begin); accumulator.import_message(LocalizedMessage { @@ -437,14 +458,14 @@ mod tests { }); match accumulator.state() { - &State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)), + &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), s => panic!("wrong state: {:?}", s), } } #[test] fn prepare_to_advance() { - let mut accumulator = Accumulator::new(1, 3, ValidatorId(8)); + let mut accumulator = Accumulator::new(1, 7, ValidatorId(8)); assert_eq!(accumulator.state(), &State::Begin); accumulator.import_message(LocalizedMessage { @@ -495,7 +516,7 @@ mod tests { #[test] fn conclude_different_than_proposed() { - let mut accumulator = Accumulator::::new(1, 3, ValidatorId(8)); + let mut accumulator = Accumulator::::new(1, 7, ValidatorId(8)); assert_eq!(accumulator.state(), &State::Begin); for i in 0..7 { @@ -520,14 +541,14 @@ mod tests { } match accumulator.state() { - &State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)), + &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), s => panic!("wrong state: {:?}", s), } } #[test] fn begin_to_advance() { - let mut accumulator = Accumulator::::new(1, 3, ValidatorId(8)); + let mut accumulator = Accumulator::::new(1, 7, ValidatorId(8)); assert_eq!(accumulator.state(), &State::Begin); for i in 0..7 { @@ -546,7 +567,7 @@ mod tests { #[test] fn conclude_without_prepare() { - let mut accumulator = Accumulator::::new(1, 3, ValidatorId(8)); + let mut accumulator = Accumulator::::new(1, 7, ValidatorId(8)); assert_eq!(accumulator.state(), &State::Begin); for i in 0..7 { @@ -558,7 +579,7 @@ mod tests { } match accumulator.state() { - &State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)), + &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), s => panic!("wrong state: {:?}", s), } } diff --git a/substrate/candidate-agreement/src/bft/mod.rs b/substrate/candidate-agreement/src/bft/mod.rs index bcb3571842..0c78029ecf 100644 --- a/substrate/candidate-agreement/src/bft/mod.rs +++ b/substrate/candidate-agreement/src/bft/mod.rs @@ -18,6 +18,15 @@ mod accumulator; +use std::collections::{HashMap, VecDeque}; +use std::hash::Hash; + +use futures::{future, Future, Stream, Sink, Poll, Async, AsyncSink}; + +use self::accumulator::State; + +pub use self::accumulator::{Accumulator, Justification, PrepareJustification}; + /// Messages over the proposal. /// Each message carries an associated round number. #[derive(Debug, Clone, PartialEq, Eq)] @@ -54,38 +63,556 @@ pub struct LocalizedMessage { pub signature: S, } -/// The agreed-upon data. -#[derive(Debug, Clone)] -pub struct Agreed { - /// The agreed-upon proposal. - pub proposal: P, - /// The justification for the proposal. - pub justification: Vec>, +/// Context necessary for agreement. +pub trait Context { + /// Candidate proposed. + type Candidate: Eq + Clone; + /// Candidate digest. + type Digest: Hash + Eq + Clone; + /// Validator ID. + type ValidatorId: Hash + Eq + Clone; + /// Signature. + type Signature: Eq + Clone; + /// A future that resolves when a round timeout is concluded. + type RoundTimeout: Future; + /// A future that resolves when a proposal is ready. + type Proposal: Future; + + /// Get the local validator ID. + fn local_id(&self) -> Self::ValidatorId; + + /// Get the best proposal. + fn proposal(&self) -> Self::Proposal; + + /// Get the digest of a candidate. + fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest; + + /// Sign a message using the local validator ID. + fn sign_local(&self, message: Message) + -> ContextLocalizedMessage; + + /// Get the proposer for a given round of consensus. + fn round_proposer(&self, round: usize) -> Self::ValidatorId; + + /// Whether the candidate is valid. + fn candidate_valid(&self, candidate: &Self::Candidate) -> bool; + + /// Create a round timeout. The context will determine the correct timeout + /// length, and create a future that will resolve when the timeout is + /// concluded. + fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout; } -/// Parameters to agreement. -pub struct Params< - Validator, - SignLocal, - CanInclude, - MessagesIn, - MessagesOut, -> { - /// The ID of the current view's primary. - pub primary: Validator, - /// The local ID. - pub local_id: Validator, - /// A closure for signing local messages. - pub sign_local: SignLocal, - /// A function for checking if a proposal can be voted for. - pub can_include: CanInclude, - /// The input stream. Should never conclude, and should yield only messages - /// sent by validators and which have been authenticated properly. - pub input: MessagesIn, - /// The output message sink. This assumes that messages will eventually - /// be delivered to all honest participants, either by repropagation, gossip, - /// or some reliable broadcast mechanism. - pub output: MessagesOut, - /// The maximum number of faulty nodes. - pub max_faulty: usize, +/// Type alias for a localized message using only type parameters from `Context`. +// TODO: actual type alias when it's no longer a warning. +#[derive(Debug)] +pub struct ContextLocalizedMessage(pub LocalizedMessage); + +impl Clone for ContextLocalizedMessage + where LocalizedMessage: Clone +{ + fn clone(&self) -> Self { + ContextLocalizedMessage(self.0.clone()) + } +} + +#[derive(Debug)] +struct Sending { + items: VecDeque, + flushing: bool, +} + +impl Sending { + fn with_capacity(n: usize) -> Self { + Sending { + items: VecDeque::with_capacity(n), + flushing: false, + } + } + + fn push(&mut self, item: T) { + self.items.push_back(item); + self.flushing = false; + } + + // process all the sends into the sink. + fn process_all>(&mut self, sink: &mut S) -> Poll<(), S::SinkError> { + while let Some(item) = self.items.pop_front() { + match sink.start_send(item) { + Err(e) => return Err(e), + Ok(AsyncSink::NotReady(item)) => { + self.items.push_front(item); + return Ok(Async::NotReady); + } + Ok(AsyncSink::Ready) => { self.flushing = true; } + } + } + + while self.flushing { + match sink.poll_complete() { + Err(e) => return Err(e), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => { self.flushing = false; } + } + } + + Ok(Async::Ready(())) + } +} + +/// Error returned when the input stream concludes. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct InputStreamConcluded; + +impl ::std::fmt::Display for InputStreamConcluded { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "{}", ::std::error::Error::description(self)) + } +} + +impl ::std::error::Error for InputStreamConcluded { + fn description(&self) -> &str { + "input stream of messages concluded prematurely" + } +} + +// get the "full BFT" threshold based on an amount of nodes and +// a maximum faulty. if nodes == 3f + 1, then threshold == 2f + 1. +fn bft_threshold(nodes: usize, max_faulty: usize) -> usize { + nodes - max_faulty +} + +/// Committed successfully. +pub struct Committed { + /// The candidate committed for. This will be unknown if + /// we never witnessed the proposal of the last round. + pub candidate: Option, + /// A justification for the candidate. + pub justification: Justification, +} + +struct Locked { + justification: PrepareJustification, +} + +impl Locked { + fn digest(&self) -> &D { + &self.justification.digest + } +} + +// the state of the local node during the current state of consensus. +// +// behavior is different when locked on a proposal. +#[derive(Clone, Copy)] +enum LocalState { + Start, + Proposed, + Prepared, + Committed, + VoteAdvance, +} + +// This structure manages a single "view" of consensus. +// +// We maintain two message accumulators: one for the round we are currently in, +// and one for a future round. +// +// We also store notable candidates: any proposed or prepared for, as well as any +// with witnessed threshold-prepares. +// This ensures that threshold-prepares witnessed by even one honest participant +// will still have the candidate available for proposal. +// +// We advance the round accumulators when one of two conditions is met: +// - we witness consensus of advancement in the current round. in this case we +// advance by one. +// - a higher threshold-prepare is broadcast to us. in this case we can +// advance to the round of the threshold-prepare. this is an indication +// that we have experienced severe asynchrony/clock drift with the remainder +// of the other validators, and it is unlikely that we can assist in +// consensus meaningfully. nevertheless we make an attempt. +struct Strategy { + nodes: usize, + max_faulty: usize, + fetching_proposal: Option, + round_timeout: future::Fuse, + local_state: LocalState, + locked: Option>, + notable_candidates: HashMap, + current_accumulator: Accumulator, + future_accumulator: Accumulator, + local_id: C::ValidatorId, +} + +impl Strategy { + fn create(context: &C, nodes: usize, max_faulty: usize) -> Self { + let timeout = context.begin_round_timeout(0); + let threshold = bft_threshold(nodes, max_faulty); + + let current_accumulator = Accumulator::new( + 0, + threshold, + context.round_proposer(0), + ); + + let future_accumulator = Accumulator::new( + 1, + threshold, + context.round_proposer(1), + ); + + Strategy { + nodes, + max_faulty, + current_accumulator, + future_accumulator, + fetching_proposal: None, + local_state: LocalState::Start, + locked: None, + notable_candidates: HashMap::new(), + round_timeout: timeout.fuse(), + local_id: context.local_id(), + } + } + + fn import_message(&mut self, msg: ContextLocalizedMessage) { + let msg = msg.0; + let round_number = msg.message.round_number(); + + if round_number == self.current_accumulator.round_number() { + self.current_accumulator.import_message(msg); + } else if round_number == self.future_accumulator.round_number() { + self.future_accumulator.import_message(msg); + } + } + + // poll the strategy: this will queue messages to be sent and advance + // rounds if necessary. + // + // only call within the context of a `Task`. + fn poll(&mut self, context: &C, sending: &mut Sending>) + -> Poll, E> + where + C::RoundTimeout: Future, + C::Proposal: Future, + { + self.propose(context, sending)?; + self.prepare(context, sending); + self.commit(context, sending); + self.vote_advance(context, sending)?; + + let advance = match self.current_accumulator.state() { + &State::Advanced(ref p_just) => { + // lock to any witnessed prepare justification. + if let Some(p_just) = p_just.as_ref() { + self.locked = Some(Locked { justification: p_just.clone() }); + } + + let round_number = self.current_accumulator.round_number(); + Some(round_number + 1) + } + &State::Committed(ref just) => { + let candidate = self.notable_candidates.get(&just.digest).cloned(); + let committed = Committed { + candidate, + justification: just.clone() + }; + + return Ok(Async::Ready(committed)) + } + _ => None, + }; + + if let Some(new_round) = advance { + self.advance_to_round(context, new_round); + } + + Ok(Async::NotReady) + } + + fn propose(&mut self, context: &C, sending: &mut Sending>) + -> Result<(), ::Error> + { + if let LocalState::Start = self.local_state { + let mut propose = false; + if let &State::Begin = self.current_accumulator.state() { + let round_number = self.current_accumulator.round_number(); + let primary = context.round_proposer(round_number); + propose = self.local_id == primary; + }; + + if !propose { return Ok(()) } + + // obtain the proposal to broadcast. + let proposal = match self.locked { + Some(ref locked) => { + // TODO: it's possible but very unlikely that we don't have the + // corresponding proposal for what we are locked to. + // + // since this is an edge case on an edge case, it is fine + // to eat the round timeout for now, but it can be optimized by + // broadcasting an advance vote. + self.notable_candidates.get(locked.digest()).cloned() + } + None => { + let res = self.fetching_proposal + .get_or_insert_with(|| context.proposal()) + .poll()?; + + match res { + Async::Ready(p) => Some(p), + Async::NotReady => None, + } + } + }; + + if let Some(proposal) = proposal { + self.fetching_proposal = None; + + let message = Message::Propose( + self.current_accumulator.round_number(), + proposal + ); + + self.import_and_send_message(message, context, sending); + self.local_state = LocalState::Proposed; + } + + } + + Ok(()) + } + + fn prepare(&mut self, context: &C, sending: &mut Sending>) { + // prepare only upon start or having proposed. + match self.local_state { + LocalState::Start | LocalState::Proposed => {}, + _ => return + }; + + let mut prepare_for = None; + + // we can't prepare until something was proposed. + if let &State::Proposed(ref candidate) = self.current_accumulator.state() { + let digest = context.candidate_digest(candidate); + + // vote to prepare only if we believe the candidate to be valid and + // we are not locked on some other candidate. + match self.locked { + Some(ref locked) if locked.digest() != &digest => {} + Some(_) | None => { + if context.candidate_valid(candidate) { + prepare_for = Some(digest); + } + } + } + } + + if let Some(digest) = prepare_for { + let message = Message::Prepare( + self.current_accumulator.round_number(), + digest + ); + + self.import_and_send_message(message, context, sending); + self.local_state = LocalState::Prepared; + } + } + + fn commit(&mut self, context: &C, sending: &mut Sending>) { + // commit only if we haven't voted to advance or committed already + match self.local_state { + LocalState::Committed | LocalState::VoteAdvance => return, + _ => {} + } + + let mut commit_for = None; + + if let &State::Prepared(ref p_just) = self.current_accumulator.state() { + // we are now locked to this prepare justification. + let digest = p_just.digest.clone(); + self.locked = Some(Locked { justification: p_just.clone() }); + commit_for = Some(digest); + } + + if let Some(digest) = commit_for { + let message = Message::Commit( + self.current_accumulator.round_number(), + digest + ); + + self.import_and_send_message(message, context, sending); + self.local_state = LocalState::Committed; + } + } + + fn vote_advance(&mut self, context: &C, sending: &mut Sending>) + -> Result<(), ::Error> + { + // we can vote for advancement under all circumstances unless we have already. + if let LocalState::VoteAdvance = self.local_state { return Ok(()) } + + // if we got f + 1 advance votes, or the timeout has fired, and we haven't + // sent an AdvanceRound message yet, do so. + let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty; + + if let Async::Ready(_) = self.round_timeout.poll()? { + attempt_advance = true; + } + + // the other situation we attempt to advance is if there is a proposal + // that is not equal to the one we are locked to. + match (self.local_state, self.current_accumulator.state(), &self.locked) { + (LocalState::Start, &State::Proposed(ref candidate), &Some(ref locked)) => { + let candidate_digest = context.candidate_digest(candidate); + if &candidate_digest != locked.digest() { + attempt_advance = true; + } + } + _ => {} + } + + if attempt_advance { + let message = Message::AdvanceRound( + self.current_accumulator.round_number(), + ); + + self.import_and_send_message(message, context, sending); + self.local_state = LocalState::VoteAdvance; + } + + Ok(()) + } + + fn advance_to_round(&mut self, context: &C, round: usize) { + assert!(round > self.current_accumulator.round_number()); + + let threshold = self.nodes - self.max_faulty; + + self.fetching_proposal = None; + self.round_timeout = context.begin_round_timeout(round).fuse(); + self.local_state = LocalState::Start; + + let new_future = Accumulator::new( + round + 1, + threshold, + context.round_proposer(round + 1), + ); + + // when advancing from a round, store away the witnessed proposal. + // + // if we or other participants end up locked on that candidate, + // we will have it. + if let Some(proposal) = self.current_accumulator.proposal() { + let digest = context.candidate_digest(proposal); + self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone()); + } + + // special case when advancing by a single round. + if self.future_accumulator.round_number() == round { + self.current_accumulator + = ::std::mem::replace(&mut self.future_accumulator, new_future); + } else { + self.future_accumulator = new_future; + self.current_accumulator = Accumulator::new( + round, + threshold, + context.round_proposer(round), + ); + } + } + + fn import_and_send_message( + &mut self, + message: Message, + context: &C, + sending: &mut Sending> + ) { + let signed_message = context.sign_local(message); + self.import_message(signed_message.clone()); + sending.push(signed_message); + } +} + +/// Future that resolves upon BFT agreement for a candidate. +#[must_use = "futures do nothing unless polled"] +pub struct Agreement { + context: C, + input: I, + output: O, + concluded: Option>, + sending: Sending>, + strategy: Strategy, +} + +impl Future for Agreement + where + C: Context, + C::RoundTimeout: Future, + C::Proposal: Future, + I: Stream,Error=E>, + O: Sink,SinkError=E>, + E: From, +{ + type Item = Committed; + type Error = E; + + fn poll(&mut self) -> Poll { + // even if we've observed the conclusion, wait until all + // pending outgoing messages are flushed. + if let Some(just) = self.concluded.take() { + return Ok(match self.sending.process_all(&mut self.output)? { + Async::Ready(()) => Async::Ready(just), + Async::NotReady => { + self.concluded = Some(just); + Async::NotReady + } + }) + } + + // make progress on flushing all pending messages. + let _ = self.sending.process_all(&mut self.output)?; + + // try to process timeouts. + if let Async::Ready(just) = self.strategy.poll(&self.context, &mut self.sending)? { + self.concluded = Some(just); + return self.poll(); + } + + let message = try_ready!(self.input.poll()).ok_or(InputStreamConcluded)?; + self.strategy.import_message(message); + + self.poll() + } +} + +/// Attempt to reach BFT agreement on a candidate. +/// +/// `nodes` is the number of nodes in the system. +/// `max_faulty` is the maximum number of faulty nodes. Should be less than +/// 1/3 of `nodes`, otherwise agreement may never be reached. +/// +/// The input stream should never logically conclude. The logic here assumes +/// that messages flushed to the output stream will eventually reach other nodes. +/// +/// Note that it is possible to witness agreement being reached without ever +/// seeing the candidate. Any candidates seen will be checked for validity. +/// +/// Although technically the agreement will always complete (given the eventual +/// delivery of messages), in practice it is possible for this future to +/// conclude without having witnessed the conclusion. +/// In general, this future should be pre-empted by the import of a justification +/// set for this block height. +pub fn agree(context: C, nodes: usize, max_faulty: usize, input: I, output: O) + -> Agreement +{ + let strategy = Strategy::create(&context, nodes, max_faulty); + Agreement { + context, + input, + output, + concluded: None, + sending: Sending::with_capacity(4), + strategy: strategy, + } } diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index 09dd56f5f0..b23b9c606c 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -29,6 +29,7 @@ //! //! Groups themselves may be compromised by malicious validators. +#[macro_use] extern crate futures; extern crate polkadot_primitives as primitives; diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs index 764f3a77dd..cfcab5e1e4 100644 --- a/substrate/candidate-agreement/src/table.rs +++ b/substrate/candidate-agreement/src/table.rs @@ -70,32 +70,32 @@ pub trait Context { /// Statements circulated among peers. #[derive(PartialEq, Eq, Debug)] -pub enum Statement { +pub enum Statement { /// Broadcast by a validator to indicate that this is his candidate for /// inclusion. /// /// Broadcasting two different candidate messages per round is not allowed. - Candidate(C::Candidate), + Candidate(C), /// Broadcast by a validator to attest that the candidate with given digest /// is valid. - Valid(C::Digest), + Valid(D), /// Broadcast by a validator to attest that the auxiliary data for a candidate /// with given digest is available. - Available(C::Digest), + Available(D), /// Broadcast by a validator to attest that the candidate with given digest /// is invalid. - Invalid(C::Digest), + Invalid(D), } /// A signed statement. #[derive(PartialEq, Eq, Debug)] -pub struct SignedStatement { +pub struct SignedStatement { /// The statement. - pub statement: Statement, + pub statement: Statement, /// The signature. - pub signature: C::Signature, + pub signature: S, /// The sender. - pub sender: C::ValidatorId, + pub sender: V, } // A unique trace for a class of valid statements issued by a validator. @@ -123,41 +123,52 @@ enum StatementTrace { /// Since there are three possible ways to vote, a double vote is possible in /// three possible combinations (unordered) #[derive(PartialEq, Eq, Debug)] -pub enum ValidityDoubleVote { +pub enum ValidityDoubleVote { /// Implicit vote by issuing and explicity voting validity. - IssuedAndValidity((C::Candidate, C::Signature), (C::Digest, C::Signature)), + IssuedAndValidity((C, S), (D, S)), /// Implicit vote by issuing and explicitly voting invalidity - IssuedAndInvalidity((C::Candidate, C::Signature), (C::Digest, C::Signature)), + IssuedAndInvalidity((C, S), (D, S)), /// Direct votes for validity and invalidity - ValidityAndInvalidity(C::Digest, C::Signature, C::Signature), + ValidityAndInvalidity(D, S, S), } /// Misbehavior: declaring multiple candidates. #[derive(PartialEq, Eq, Debug)] -pub struct MultipleCandidates { +pub struct MultipleCandidates { /// The first candidate seen. - pub first: (C::Candidate, C::Signature), + pub first: (C, S), /// The second candidate seen. - pub second: (C::Candidate, C::Signature), + pub second: (C, S), } /// Misbehavior: submitted statement for wrong group. #[derive(PartialEq, Eq, Debug)] -pub struct UnauthorizedStatement { +pub struct UnauthorizedStatement { /// A signed statement which was submitted without proper authority. - pub statement: SignedStatement, + pub statement: SignedStatement, } /// Different kinds of misbehavior. All of these kinds of malicious misbehavior /// are easily provable and extremely disincentivized. #[derive(PartialEq, Eq, Debug)] -pub enum Misbehavior { +pub enum Misbehavior { /// Voted invalid and valid on validity. - ValidityDoubleVote(ValidityDoubleVote), + ValidityDoubleVote(ValidityDoubleVote), /// Submitted multiple candidates. - MultipleCandidates(MultipleCandidates), + MultipleCandidates(MultipleCandidates), /// Submitted a message withou - UnauthorizedStatement(UnauthorizedStatement), + UnauthorizedStatement(UnauthorizedStatement), +} + +/// Fancy work-around for a type alias of context-based misbehavior +/// without producing compiler warnings. +pub trait ResolveMisbehavior { + /// The misbehavior type. + type Misbehavior; +} + +impl ResolveMisbehavior for C { + type Misbehavior = Misbehavior; } // kinds of votes for validity @@ -251,7 +262,7 @@ pub fn create() -> Table { #[derive(Default)] pub struct Table { validator_data: HashMap>, - detected_misbehavior: HashMap>, + detected_misbehavior: HashMap::Misbehavior>, candidate_votes: HashMap>, } @@ -294,7 +305,7 @@ impl Table { } /// Drain all misbehavior observed up to this point. - pub fn drain_misbehavior(&mut self) -> HashMap> { + pub fn drain_misbehavior(&mut self) -> HashMap::Misbehavior> { ::std::mem::replace(&mut self.detected_misbehavior, HashMap::new()) } @@ -303,9 +314,12 @@ impl Table { /// /// This can note the origin of the statement to indicate that he has /// seen it already. - pub fn import_statement(&mut self, context: &C, statement: SignedStatement, from: Option) - -> Option> - { + pub fn import_statement( + &mut self, + context: &C, + statement: SignedStatement, + from: Option + ) -> Option> { let SignedStatement { statement, signature, sender: signer } = statement; let trace = match statement { @@ -370,7 +384,7 @@ impl Table { from: C::ValidatorId, candidate: C::Candidate, signature: C::Signature, - ) -> (Option>, Option>) { + ) -> (Option<::Misbehavior>, Option>) { let group = context.candidate_group(&candidate); if !context.is_member_of(&from, &group) { return ( @@ -444,7 +458,7 @@ impl Table { from: C::ValidatorId, digest: C::Digest, vote: ValidityVote, - ) -> (Option>, Option>) { + ) -> (Option<::Misbehavior>, Option>) { let votes = match self.candidate_votes.get_mut(&digest) { None => return (None, None), // TODO: queue up but don't get DoS'ed Some(votes) => votes, @@ -522,7 +536,7 @@ impl Table { from: C::ValidatorId, digest: C::Digest, signature: C::Signature, - ) -> (Option>, Option>) { + ) -> (Option<::Misbehavior>, Option>) { let votes = match self.candidate_votes.get_mut(&digest) { None => return (None, None), // TODO: queue up but don't get DoS'ed Some(votes) => votes,