mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 21:11:07 +00:00
implement honest node strategy for BFT
This commit is contained in:
@@ -35,7 +35,7 @@ pub trait Context {
|
||||
}
|
||||
|
||||
/// Justification at a given round.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub struct Justification<D, S> {
|
||||
/// The round.
|
||||
pub round_number: usize,
|
||||
@@ -54,21 +54,21 @@ impl<D, S> Justification<D, S> {
|
||||
/// The closure should return true iff the round number, digest, and signature
|
||||
/// represent a valid prepare message and the signer was authorized to issue
|
||||
/// it.
|
||||
pub fn check<F, V>(&self, max_faulty: usize, check_message: F) -> bool
|
||||
pub fn check<F, V>(&self, threshold: usize, check_message: F) -> bool
|
||||
where
|
||||
F: Fn(usize, &D, &S) -> Option<V>,
|
||||
V: Hash + Eq,
|
||||
{
|
||||
let mut prepared = HashSet::new();
|
||||
let mut voted = HashSet::new();
|
||||
|
||||
let mut good = false;
|
||||
for signature in &self.signatures {
|
||||
match check_message(self.round_number, &self.digest, signature) {
|
||||
None => return false,
|
||||
Some(v) => {
|
||||
if !prepared.insert(v) {
|
||||
if !voted.insert(v) {
|
||||
return false;
|
||||
} else if prepared.len() > max_faulty * 2 {
|
||||
} else if voted.len() >= threshold {
|
||||
// don't return just yet since later signatures may be invalid.
|
||||
good = true;
|
||||
}
|
||||
@@ -93,15 +93,22 @@ pub enum State<C, D, S> {
|
||||
/// Seen 2f + 1 prepares for this digest.
|
||||
Prepared(PrepareJustification<D, S>),
|
||||
/// Seen 2f + 1 commits for a digest.
|
||||
Concluded(Justification<D, S>),
|
||||
Committed(Justification<D, S>),
|
||||
/// Seen 2f + 1 round-advancement messages.
|
||||
Advanced(Option<PrepareJustification<D, S>>),
|
||||
}
|
||||
|
||||
/// Accumulates messages for a given round of BFT consensus.
|
||||
pub struct Accumulator<C, D, V, S> {
|
||||
#[derive(Debug)]
|
||||
pub struct Accumulator<C, D, V, S>
|
||||
where
|
||||
C: Eq + Clone,
|
||||
D: Hash + Eq + Clone,
|
||||
V: Hash + Eq,
|
||||
S: Eq + Clone,
|
||||
{
|
||||
round_number: usize,
|
||||
max_faulty: usize,
|
||||
threshold: usize,
|
||||
round_proposer: V,
|
||||
proposal: Option<C>,
|
||||
prepares: HashMap<V, (D, S)>,
|
||||
@@ -114,15 +121,15 @@ pub struct Accumulator<C, D, V, S> {
|
||||
impl<C, D, V, S> Accumulator<C, D, V, S>
|
||||
where
|
||||
C: Eq + Clone,
|
||||
D: Hash + Clone + Eq,
|
||||
D: Hash + Eq + Clone,
|
||||
V: Hash + Eq,
|
||||
S: Eq + Clone,
|
||||
{
|
||||
/// Create a new state accumulator.
|
||||
pub fn new(round_number: usize, max_faulty: usize, round_proposer: V) -> Self {
|
||||
pub fn new(round_number: usize, threshold: usize, round_proposer: V) -> Self {
|
||||
Accumulator {
|
||||
round_number,
|
||||
max_faulty,
|
||||
threshold,
|
||||
round_proposer,
|
||||
proposal: None,
|
||||
prepares: HashMap::new(),
|
||||
@@ -138,6 +145,20 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
|
||||
self.advance_round.len()
|
||||
}
|
||||
|
||||
/// Get the round number.
|
||||
pub fn round_number(&self) -> usize {
|
||||
self.round_number.clone()
|
||||
}
|
||||
|
||||
/// Get the round proposer.
|
||||
pub fn round_proposer(&self) -> &V {
|
||||
&self.round_proposer
|
||||
}
|
||||
|
||||
pub fn proposal(&self) -> Option<&C> {
|
||||
self.proposal.as_ref()
|
||||
}
|
||||
|
||||
/// Inspect the current consensus state.
|
||||
pub fn state(&self) -> &State<C, D, S> {
|
||||
&self.state
|
||||
@@ -189,7 +210,7 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
|
||||
let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0));
|
||||
count.0 += 1;
|
||||
|
||||
if count.0 == self.max_faulty * 2 + 1 {
|
||||
if count.0 == self.threshold {
|
||||
Some(candidate)
|
||||
} else {
|
||||
None
|
||||
@@ -232,7 +253,7 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
|
||||
let count = self.vote_counts.entry(candidate.clone()).or_insert((0, 0));
|
||||
count.1 += 1;
|
||||
|
||||
if count.1 == self.max_faulty * 2 + 1 {
|
||||
if count.1 == self.threshold {
|
||||
Some(candidate)
|
||||
} else {
|
||||
None
|
||||
@@ -252,7 +273,7 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
|
||||
.map(|&(_, ref s)| s.clone())
|
||||
.collect();
|
||||
|
||||
self.state = State::Concluded(Justification {
|
||||
self.state = State::Committed(Justification {
|
||||
round_number: self.round_number,
|
||||
digest: committed_for,
|
||||
signatures: signatures,
|
||||
@@ -266,12 +287,12 @@ impl<C, D, V, S> Accumulator<C, D, V, S>
|
||||
) {
|
||||
self.advance_round.insert(sender);
|
||||
|
||||
if self.advance_round.len() != self.max_faulty * 2 + 1 { return }
|
||||
if self.advance_round.len() != self.threshold { return }
|
||||
|
||||
// allow transition to new round only if we haven't produced a justification
|
||||
// yet.
|
||||
self.state = match ::std::mem::replace(&mut self.state, State::Begin) {
|
||||
State::Concluded(j) => State::Concluded(j),
|
||||
State::Committed(j) => State::Committed(j),
|
||||
State::Prepared(j) => State::Advanced(Some(j)),
|
||||
State::Advanced(j) => State::Advanced(j),
|
||||
State::Begin | State::Proposed(_) => State::Advanced(None),
|
||||
@@ -311,24 +332,24 @@ mod tests {
|
||||
}
|
||||
};
|
||||
|
||||
assert!(justification.check(3, &check_message));
|
||||
assert!(!justification.check(5, &check_message));
|
||||
assert!(justification.check(7, &check_message));
|
||||
assert!(!justification.check(11, &check_message));
|
||||
|
||||
{
|
||||
// one bad signature is enough to spoil it.
|
||||
justification.signatures.push(Signature(1001, 255));
|
||||
assert!(!justification.check(3, &check_message));
|
||||
assert!(!justification.check(7, &check_message));
|
||||
|
||||
justification.signatures.pop();
|
||||
}
|
||||
// duplicates not allowed.
|
||||
justification.signatures.extend((0..10).map(|i| Signature(600, i)));
|
||||
assert!(!justification.check(3, &check_message));
|
||||
assert!(!justification.check(11, &check_message));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accepts_proposal_from_proposer_only() {
|
||||
let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 3, ValidatorId(8));
|
||||
let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
@@ -350,7 +371,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn reaches_prepare_phase() {
|
||||
let mut accumulator = Accumulator::new(1, 3, ValidatorId(8));
|
||||
let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
@@ -385,7 +406,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn prepare_to_commit() {
|
||||
let mut accumulator = Accumulator::new(1, 3, ValidatorId(8));
|
||||
let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
@@ -437,14 +458,14 @@ mod tests {
|
||||
});
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prepare_to_advance() {
|
||||
let mut accumulator = Accumulator::new(1, 3, ValidatorId(8));
|
||||
let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
accumulator.import_message(LocalizedMessage {
|
||||
@@ -495,7 +516,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn conclude_different_than_proposed() {
|
||||
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 3, ValidatorId(8));
|
||||
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
for i in 0..7 {
|
||||
@@ -520,14 +541,14 @@ mod tests {
|
||||
}
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn begin_to_advance() {
|
||||
let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 3, ValidatorId(8));
|
||||
let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
for i in 0..7 {
|
||||
@@ -546,7 +567,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn conclude_without_prepare() {
|
||||
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 3, ValidatorId(8));
|
||||
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, ValidatorId(8));
|
||||
assert_eq!(accumulator.state(), &State::Begin);
|
||||
|
||||
for i in 0..7 {
|
||||
@@ -558,7 +579,7 @@ mod tests {
|
||||
}
|
||||
|
||||
match accumulator.state() {
|
||||
&State::Concluded(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||
s => panic!("wrong state: {:?}", s),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,15 @@
|
||||
|
||||
mod accumulator;
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::hash::Hash;
|
||||
|
||||
use futures::{future, Future, Stream, Sink, Poll, Async, AsyncSink};
|
||||
|
||||
use self::accumulator::State;
|
||||
|
||||
pub use self::accumulator::{Accumulator, Justification, PrepareJustification};
|
||||
|
||||
/// Messages over the proposal.
|
||||
/// Each message carries an associated round number.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -54,38 +63,556 @@ pub struct LocalizedMessage<T, P, V, S> {
|
||||
pub signature: S,
|
||||
}
|
||||
|
||||
/// The agreed-upon data.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Agreed<T, P, V, S> {
|
||||
/// The agreed-upon proposal.
|
||||
pub proposal: P,
|
||||
/// The justification for the proposal.
|
||||
pub justification: Vec<LocalizedMessage<T, P, V, S>>,
|
||||
/// Context necessary for agreement.
|
||||
pub trait Context {
|
||||
/// Candidate proposed.
|
||||
type Candidate: Eq + Clone;
|
||||
/// Candidate digest.
|
||||
type Digest: Hash + Eq + Clone;
|
||||
/// Validator ID.
|
||||
type ValidatorId: Hash + Eq + Clone;
|
||||
/// Signature.
|
||||
type Signature: Eq + Clone;
|
||||
/// A future that resolves when a round timeout is concluded.
|
||||
type RoundTimeout: Future<Item=()>;
|
||||
/// A future that resolves when a proposal is ready.
|
||||
type Proposal: Future<Item=Self::Candidate>;
|
||||
|
||||
/// Get the local validator ID.
|
||||
fn local_id(&self) -> Self::ValidatorId;
|
||||
|
||||
/// Get the best proposal.
|
||||
fn proposal(&self) -> Self::Proposal;
|
||||
|
||||
/// Get the digest of a candidate.
|
||||
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
|
||||
|
||||
/// Sign a message using the local validator ID.
|
||||
fn sign_local(&self, message: Message<Self::Candidate, Self::Digest>)
|
||||
-> ContextLocalizedMessage<Self>;
|
||||
|
||||
/// Get the proposer for a given round of consensus.
|
||||
fn round_proposer(&self, round: usize) -> Self::ValidatorId;
|
||||
|
||||
/// Whether the candidate is valid.
|
||||
fn candidate_valid(&self, candidate: &Self::Candidate) -> bool;
|
||||
|
||||
/// Create a round timeout. The context will determine the correct timeout
|
||||
/// length, and create a future that will resolve when the timeout is
|
||||
/// concluded.
|
||||
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout;
|
||||
}
|
||||
|
||||
/// Parameters to agreement.
|
||||
pub struct Params<
|
||||
Validator,
|
||||
SignLocal,
|
||||
CanInclude,
|
||||
MessagesIn,
|
||||
MessagesOut,
|
||||
> {
|
||||
/// The ID of the current view's primary.
|
||||
pub primary: Validator,
|
||||
/// The local ID.
|
||||
pub local_id: Validator,
|
||||
/// A closure for signing local messages.
|
||||
pub sign_local: SignLocal,
|
||||
/// A function for checking if a proposal can be voted for.
|
||||
pub can_include: CanInclude,
|
||||
/// The input stream. Should never conclude, and should yield only messages
|
||||
/// sent by validators and which have been authenticated properly.
|
||||
pub input: MessagesIn,
|
||||
/// The output message sink. This assumes that messages will eventually
|
||||
/// be delivered to all honest participants, either by repropagation, gossip,
|
||||
/// or some reliable broadcast mechanism.
|
||||
pub output: MessagesOut,
|
||||
/// The maximum number of faulty nodes.
|
||||
pub max_faulty: usize,
|
||||
/// Type alias for a localized message using only type parameters from `Context`.
|
||||
// TODO: actual type alias when it's no longer a warning.
|
||||
#[derive(Debug)]
|
||||
pub struct ContextLocalizedMessage<C: Context + ?Sized>(pub LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>);
|
||||
|
||||
impl<C: Context + ?Sized> Clone for ContextLocalizedMessage<C>
|
||||
where LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>: Clone
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
ContextLocalizedMessage(self.0.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Sending<T> {
|
||||
items: VecDeque<T>,
|
||||
flushing: bool,
|
||||
}
|
||||
|
||||
impl<T> Sending<T> {
|
||||
fn with_capacity(n: usize) -> Self {
|
||||
Sending {
|
||||
items: VecDeque::with_capacity(n),
|
||||
flushing: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&mut self, item: T) {
|
||||
self.items.push_back(item);
|
||||
self.flushing = false;
|
||||
}
|
||||
|
||||
// process all the sends into the sink.
|
||||
fn process_all<S: Sink<SinkItem=T>>(&mut self, sink: &mut S) -> Poll<(), S::SinkError> {
|
||||
while let Some(item) = self.items.pop_front() {
|
||||
match sink.start_send(item) {
|
||||
Err(e) => return Err(e),
|
||||
Ok(AsyncSink::NotReady(item)) => {
|
||||
self.items.push_front(item);
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
Ok(AsyncSink::Ready) => { self.flushing = true; }
|
||||
}
|
||||
}
|
||||
|
||||
while self.flushing {
|
||||
match sink.poll_complete() {
|
||||
Err(e) => return Err(e),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(())) => { self.flushing = false; }
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Error returned when the input stream concludes.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct InputStreamConcluded;
|
||||
|
||||
impl ::std::fmt::Display for InputStreamConcluded {
|
||||
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
|
||||
write!(f, "{}", ::std::error::Error::description(self))
|
||||
}
|
||||
}
|
||||
|
||||
impl ::std::error::Error for InputStreamConcluded {
|
||||
fn description(&self) -> &str {
|
||||
"input stream of messages concluded prematurely"
|
||||
}
|
||||
}
|
||||
|
||||
// get the "full BFT" threshold based on an amount of nodes and
|
||||
// a maximum faulty. if nodes == 3f + 1, then threshold == 2f + 1.
|
||||
fn bft_threshold(nodes: usize, max_faulty: usize) -> usize {
|
||||
nodes - max_faulty
|
||||
}
|
||||
|
||||
/// Committed successfully.
|
||||
pub struct Committed<C, D, S> {
|
||||
/// The candidate committed for. This will be unknown if
|
||||
/// we never witnessed the proposal of the last round.
|
||||
pub candidate: Option<C>,
|
||||
/// A justification for the candidate.
|
||||
pub justification: Justification<D, S>,
|
||||
}
|
||||
|
||||
struct Locked<D, S> {
|
||||
justification: PrepareJustification<D, S>,
|
||||
}
|
||||
|
||||
impl<D, S> Locked<D, S> {
|
||||
fn digest(&self) -> &D {
|
||||
&self.justification.digest
|
||||
}
|
||||
}
|
||||
|
||||
// the state of the local node during the current state of consensus.
|
||||
//
|
||||
// behavior is different when locked on a proposal.
|
||||
#[derive(Clone, Copy)]
|
||||
enum LocalState {
|
||||
Start,
|
||||
Proposed,
|
||||
Prepared,
|
||||
Committed,
|
||||
VoteAdvance,
|
||||
}
|
||||
|
||||
// This structure manages a single "view" of consensus.
|
||||
//
|
||||
// We maintain two message accumulators: one for the round we are currently in,
|
||||
// and one for a future round.
|
||||
//
|
||||
// We also store notable candidates: any proposed or prepared for, as well as any
|
||||
// with witnessed threshold-prepares.
|
||||
// This ensures that threshold-prepares witnessed by even one honest participant
|
||||
// will still have the candidate available for proposal.
|
||||
//
|
||||
// We advance the round accumulators when one of two conditions is met:
|
||||
// - we witness consensus of advancement in the current round. in this case we
|
||||
// advance by one.
|
||||
// - a higher threshold-prepare is broadcast to us. in this case we can
|
||||
// advance to the round of the threshold-prepare. this is an indication
|
||||
// that we have experienced severe asynchrony/clock drift with the remainder
|
||||
// of the other validators, and it is unlikely that we can assist in
|
||||
// consensus meaningfully. nevertheless we make an attempt.
|
||||
struct Strategy<C: Context> {
|
||||
nodes: usize,
|
||||
max_faulty: usize,
|
||||
fetching_proposal: Option<C::Proposal>,
|
||||
round_timeout: future::Fuse<C::RoundTimeout>,
|
||||
local_state: LocalState,
|
||||
locked: Option<Locked<C::Digest, C::Signature>>,
|
||||
notable_candidates: HashMap<C::Digest, C::Candidate>,
|
||||
current_accumulator: Accumulator<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
|
||||
future_accumulator: Accumulator<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
|
||||
local_id: C::ValidatorId,
|
||||
}
|
||||
|
||||
impl<C: Context> Strategy<C> {
|
||||
fn create(context: &C, nodes: usize, max_faulty: usize) -> Self {
|
||||
let timeout = context.begin_round_timeout(0);
|
||||
let threshold = bft_threshold(nodes, max_faulty);
|
||||
|
||||
let current_accumulator = Accumulator::new(
|
||||
0,
|
||||
threshold,
|
||||
context.round_proposer(0),
|
||||
);
|
||||
|
||||
let future_accumulator = Accumulator::new(
|
||||
1,
|
||||
threshold,
|
||||
context.round_proposer(1),
|
||||
);
|
||||
|
||||
Strategy {
|
||||
nodes,
|
||||
max_faulty,
|
||||
current_accumulator,
|
||||
future_accumulator,
|
||||
fetching_proposal: None,
|
||||
local_state: LocalState::Start,
|
||||
locked: None,
|
||||
notable_candidates: HashMap::new(),
|
||||
round_timeout: timeout.fuse(),
|
||||
local_id: context.local_id(),
|
||||
}
|
||||
}
|
||||
|
||||
fn import_message(&mut self, msg: ContextLocalizedMessage<C>) {
|
||||
let msg = msg.0;
|
||||
let round_number = msg.message.round_number();
|
||||
|
||||
if round_number == self.current_accumulator.round_number() {
|
||||
self.current_accumulator.import_message(msg);
|
||||
} else if round_number == self.future_accumulator.round_number() {
|
||||
self.future_accumulator.import_message(msg);
|
||||
}
|
||||
}
|
||||
|
||||
// poll the strategy: this will queue messages to be sent and advance
|
||||
// rounds if necessary.
|
||||
//
|
||||
// only call within the context of a `Task`.
|
||||
fn poll<E>(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>)
|
||||
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
|
||||
where
|
||||
C::RoundTimeout: Future<Error=E>,
|
||||
C::Proposal: Future<Error=E>,
|
||||
{
|
||||
self.propose(context, sending)?;
|
||||
self.prepare(context, sending);
|
||||
self.commit(context, sending);
|
||||
self.vote_advance(context, sending)?;
|
||||
|
||||
let advance = match self.current_accumulator.state() {
|
||||
&State::Advanced(ref p_just) => {
|
||||
// lock to any witnessed prepare justification.
|
||||
if let Some(p_just) = p_just.as_ref() {
|
||||
self.locked = Some(Locked { justification: p_just.clone() });
|
||||
}
|
||||
|
||||
let round_number = self.current_accumulator.round_number();
|
||||
Some(round_number + 1)
|
||||
}
|
||||
&State::Committed(ref just) => {
|
||||
let candidate = self.notable_candidates.get(&just.digest).cloned();
|
||||
let committed = Committed {
|
||||
candidate,
|
||||
justification: just.clone()
|
||||
};
|
||||
|
||||
return Ok(Async::Ready(committed))
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if let Some(new_round) = advance {
|
||||
self.advance_to_round(context, new_round);
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
|
||||
fn propose(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>)
|
||||
-> Result<(), <C::Proposal as Future>::Error>
|
||||
{
|
||||
if let LocalState::Start = self.local_state {
|
||||
let mut propose = false;
|
||||
if let &State::Begin = self.current_accumulator.state() {
|
||||
let round_number = self.current_accumulator.round_number();
|
||||
let primary = context.round_proposer(round_number);
|
||||
propose = self.local_id == primary;
|
||||
};
|
||||
|
||||
if !propose { return Ok(()) }
|
||||
|
||||
// obtain the proposal to broadcast.
|
||||
let proposal = match self.locked {
|
||||
Some(ref locked) => {
|
||||
// TODO: it's possible but very unlikely that we don't have the
|
||||
// corresponding proposal for what we are locked to.
|
||||
//
|
||||
// since this is an edge case on an edge case, it is fine
|
||||
// to eat the round timeout for now, but it can be optimized by
|
||||
// broadcasting an advance vote.
|
||||
self.notable_candidates.get(locked.digest()).cloned()
|
||||
}
|
||||
None => {
|
||||
let res = self.fetching_proposal
|
||||
.get_or_insert_with(|| context.proposal())
|
||||
.poll()?;
|
||||
|
||||
match res {
|
||||
Async::Ready(p) => Some(p),
|
||||
Async::NotReady => None,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(proposal) = proposal {
|
||||
self.fetching_proposal = None;
|
||||
|
||||
let message = Message::Propose(
|
||||
self.current_accumulator.round_number(),
|
||||
proposal
|
||||
);
|
||||
|
||||
self.import_and_send_message(message, context, sending);
|
||||
self.local_state = LocalState::Proposed;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>) {
|
||||
// prepare only upon start or having proposed.
|
||||
match self.local_state {
|
||||
LocalState::Start | LocalState::Proposed => {},
|
||||
_ => return
|
||||
};
|
||||
|
||||
let mut prepare_for = None;
|
||||
|
||||
// we can't prepare until something was proposed.
|
||||
if let &State::Proposed(ref candidate) = self.current_accumulator.state() {
|
||||
let digest = context.candidate_digest(candidate);
|
||||
|
||||
// vote to prepare only if we believe the candidate to be valid and
|
||||
// we are not locked on some other candidate.
|
||||
match self.locked {
|
||||
Some(ref locked) if locked.digest() != &digest => {}
|
||||
Some(_) | None => {
|
||||
if context.candidate_valid(candidate) {
|
||||
prepare_for = Some(digest);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(digest) = prepare_for {
|
||||
let message = Message::Prepare(
|
||||
self.current_accumulator.round_number(),
|
||||
digest
|
||||
);
|
||||
|
||||
self.import_and_send_message(message, context, sending);
|
||||
self.local_state = LocalState::Prepared;
|
||||
}
|
||||
}
|
||||
|
||||
fn commit(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>) {
|
||||
// commit only if we haven't voted to advance or committed already
|
||||
match self.local_state {
|
||||
LocalState::Committed | LocalState::VoteAdvance => return,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let mut commit_for = None;
|
||||
|
||||
if let &State::Prepared(ref p_just) = self.current_accumulator.state() {
|
||||
// we are now locked to this prepare justification.
|
||||
let digest = p_just.digest.clone();
|
||||
self.locked = Some(Locked { justification: p_just.clone() });
|
||||
commit_for = Some(digest);
|
||||
}
|
||||
|
||||
if let Some(digest) = commit_for {
|
||||
let message = Message::Commit(
|
||||
self.current_accumulator.round_number(),
|
||||
digest
|
||||
);
|
||||
|
||||
self.import_and_send_message(message, context, sending);
|
||||
self.local_state = LocalState::Committed;
|
||||
}
|
||||
}
|
||||
|
||||
fn vote_advance(&mut self, context: &C, sending: &mut Sending<ContextLocalizedMessage<C>>)
|
||||
-> Result<(), <C::RoundTimeout as Future>::Error>
|
||||
{
|
||||
// we can vote for advancement under all circumstances unless we have already.
|
||||
if let LocalState::VoteAdvance = self.local_state { return Ok(()) }
|
||||
|
||||
// if we got f + 1 advance votes, or the timeout has fired, and we haven't
|
||||
// sent an AdvanceRound message yet, do so.
|
||||
let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty;
|
||||
|
||||
if let Async::Ready(_) = self.round_timeout.poll()? {
|
||||
attempt_advance = true;
|
||||
}
|
||||
|
||||
// the other situation we attempt to advance is if there is a proposal
|
||||
// that is not equal to the one we are locked to.
|
||||
match (self.local_state, self.current_accumulator.state(), &self.locked) {
|
||||
(LocalState::Start, &State::Proposed(ref candidate), &Some(ref locked)) => {
|
||||
let candidate_digest = context.candidate_digest(candidate);
|
||||
if &candidate_digest != locked.digest() {
|
||||
attempt_advance = true;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if attempt_advance {
|
||||
let message = Message::AdvanceRound(
|
||||
self.current_accumulator.round_number(),
|
||||
);
|
||||
|
||||
self.import_and_send_message(message, context, sending);
|
||||
self.local_state = LocalState::VoteAdvance;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn advance_to_round(&mut self, context: &C, round: usize) {
|
||||
assert!(round > self.current_accumulator.round_number());
|
||||
|
||||
let threshold = self.nodes - self.max_faulty;
|
||||
|
||||
self.fetching_proposal = None;
|
||||
self.round_timeout = context.begin_round_timeout(round).fuse();
|
||||
self.local_state = LocalState::Start;
|
||||
|
||||
let new_future = Accumulator::new(
|
||||
round + 1,
|
||||
threshold,
|
||||
context.round_proposer(round + 1),
|
||||
);
|
||||
|
||||
// when advancing from a round, store away the witnessed proposal.
|
||||
//
|
||||
// if we or other participants end up locked on that candidate,
|
||||
// we will have it.
|
||||
if let Some(proposal) = self.current_accumulator.proposal() {
|
||||
let digest = context.candidate_digest(proposal);
|
||||
self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone());
|
||||
}
|
||||
|
||||
// special case when advancing by a single round.
|
||||
if self.future_accumulator.round_number() == round {
|
||||
self.current_accumulator
|
||||
= ::std::mem::replace(&mut self.future_accumulator, new_future);
|
||||
} else {
|
||||
self.future_accumulator = new_future;
|
||||
self.current_accumulator = Accumulator::new(
|
||||
round,
|
||||
threshold,
|
||||
context.round_proposer(round),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn import_and_send_message(
|
||||
&mut self,
|
||||
message: Message<C::Candidate, C::Digest>,
|
||||
context: &C,
|
||||
sending: &mut Sending<ContextLocalizedMessage<C>>
|
||||
) {
|
||||
let signed_message = context.sign_local(message);
|
||||
self.import_message(signed_message.clone());
|
||||
sending.push(signed_message);
|
||||
}
|
||||
}
|
||||
|
||||
/// Future that resolves upon BFT agreement for a candidate.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct Agreement<C: Context, I, O> {
|
||||
context: C,
|
||||
input: I,
|
||||
output: O,
|
||||
concluded: Option<Committed<C::Candidate, C::Digest, C::Signature>>,
|
||||
sending: Sending<ContextLocalizedMessage<C>>,
|
||||
strategy: Strategy<C>,
|
||||
}
|
||||
|
||||
impl<C, I, O, E> Future for Agreement<C, I, O>
|
||||
where
|
||||
C: Context,
|
||||
C::RoundTimeout: Future<Error=E>,
|
||||
C::Proposal: Future<Error=E>,
|
||||
I: Stream<Item=ContextLocalizedMessage<C>,Error=E>,
|
||||
O: Sink<SinkItem=ContextLocalizedMessage<C>,SinkError=E>,
|
||||
E: From<InputStreamConcluded>,
|
||||
{
|
||||
type Item = Committed<C::Candidate, C::Digest, C::Signature>;
|
||||
type Error = E;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
// even if we've observed the conclusion, wait until all
|
||||
// pending outgoing messages are flushed.
|
||||
if let Some(just) = self.concluded.take() {
|
||||
return Ok(match self.sending.process_all(&mut self.output)? {
|
||||
Async::Ready(()) => Async::Ready(just),
|
||||
Async::NotReady => {
|
||||
self.concluded = Some(just);
|
||||
Async::NotReady
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// make progress on flushing all pending messages.
|
||||
let _ = self.sending.process_all(&mut self.output)?;
|
||||
|
||||
// try to process timeouts.
|
||||
if let Async::Ready(just) = self.strategy.poll(&self.context, &mut self.sending)? {
|
||||
self.concluded = Some(just);
|
||||
return self.poll();
|
||||
}
|
||||
|
||||
let message = try_ready!(self.input.poll()).ok_or(InputStreamConcluded)?;
|
||||
self.strategy.import_message(message);
|
||||
|
||||
self.poll()
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to reach BFT agreement on a candidate.
|
||||
///
|
||||
/// `nodes` is the number of nodes in the system.
|
||||
/// `max_faulty` is the maximum number of faulty nodes. Should be less than
|
||||
/// 1/3 of `nodes`, otherwise agreement may never be reached.
|
||||
///
|
||||
/// The input stream should never logically conclude. The logic here assumes
|
||||
/// that messages flushed to the output stream will eventually reach other nodes.
|
||||
///
|
||||
/// Note that it is possible to witness agreement being reached without ever
|
||||
/// seeing the candidate. Any candidates seen will be checked for validity.
|
||||
///
|
||||
/// Although technically the agreement will always complete (given the eventual
|
||||
/// delivery of messages), in practice it is possible for this future to
|
||||
/// conclude without having witnessed the conclusion.
|
||||
/// In general, this future should be pre-empted by the import of a justification
|
||||
/// set for this block height.
|
||||
pub fn agree<C: Context, I, O>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
|
||||
-> Agreement<C, I, O>
|
||||
{
|
||||
let strategy = Strategy::create(&context, nodes, max_faulty);
|
||||
Agreement {
|
||||
context,
|
||||
input,
|
||||
output,
|
||||
concluded: None,
|
||||
sending: Sending::with_capacity(4),
|
||||
strategy: strategy,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
//!
|
||||
//! Groups themselves may be compromised by malicious validators.
|
||||
|
||||
#[macro_use]
|
||||
extern crate futures;
|
||||
extern crate polkadot_primitives as primitives;
|
||||
|
||||
|
||||
@@ -70,32 +70,32 @@ pub trait Context {
|
||||
|
||||
/// Statements circulated among peers.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum Statement<C: Context + ?Sized> {
|
||||
pub enum Statement<C, D> {
|
||||
/// Broadcast by a validator to indicate that this is his candidate for
|
||||
/// inclusion.
|
||||
///
|
||||
/// Broadcasting two different candidate messages per round is not allowed.
|
||||
Candidate(C::Candidate),
|
||||
Candidate(C),
|
||||
/// Broadcast by a validator to attest that the candidate with given digest
|
||||
/// is valid.
|
||||
Valid(C::Digest),
|
||||
Valid(D),
|
||||
/// Broadcast by a validator to attest that the auxiliary data for a candidate
|
||||
/// with given digest is available.
|
||||
Available(C::Digest),
|
||||
Available(D),
|
||||
/// Broadcast by a validator to attest that the candidate with given digest
|
||||
/// is invalid.
|
||||
Invalid(C::Digest),
|
||||
Invalid(D),
|
||||
}
|
||||
|
||||
/// A signed statement.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub struct SignedStatement<C: Context + ?Sized> {
|
||||
pub struct SignedStatement<C, D, V, S> {
|
||||
/// The statement.
|
||||
pub statement: Statement<C>,
|
||||
pub statement: Statement<C, D>,
|
||||
/// The signature.
|
||||
pub signature: C::Signature,
|
||||
pub signature: S,
|
||||
/// The sender.
|
||||
pub sender: C::ValidatorId,
|
||||
pub sender: V,
|
||||
}
|
||||
|
||||
// A unique trace for a class of valid statements issued by a validator.
|
||||
@@ -123,41 +123,52 @@ enum StatementTrace<V, D> {
|
||||
/// Since there are three possible ways to vote, a double vote is possible in
|
||||
/// three possible combinations (unordered)
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum ValidityDoubleVote<C: Context> {
|
||||
pub enum ValidityDoubleVote<C, D, S> {
|
||||
/// Implicit vote by issuing and explicity voting validity.
|
||||
IssuedAndValidity((C::Candidate, C::Signature), (C::Digest, C::Signature)),
|
||||
IssuedAndValidity((C, S), (D, S)),
|
||||
/// Implicit vote by issuing and explicitly voting invalidity
|
||||
IssuedAndInvalidity((C::Candidate, C::Signature), (C::Digest, C::Signature)),
|
||||
IssuedAndInvalidity((C, S), (D, S)),
|
||||
/// Direct votes for validity and invalidity
|
||||
ValidityAndInvalidity(C::Digest, C::Signature, C::Signature),
|
||||
ValidityAndInvalidity(D, S, S),
|
||||
}
|
||||
|
||||
/// Misbehavior: declaring multiple candidates.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub struct MultipleCandidates<C: Context> {
|
||||
pub struct MultipleCandidates<C, S> {
|
||||
/// The first candidate seen.
|
||||
pub first: (C::Candidate, C::Signature),
|
||||
pub first: (C, S),
|
||||
/// The second candidate seen.
|
||||
pub second: (C::Candidate, C::Signature),
|
||||
pub second: (C, S),
|
||||
}
|
||||
|
||||
/// Misbehavior: submitted statement for wrong group.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub struct UnauthorizedStatement<C: Context> {
|
||||
pub struct UnauthorizedStatement<C, D, V, S> {
|
||||
/// A signed statement which was submitted without proper authority.
|
||||
pub statement: SignedStatement<C>,
|
||||
pub statement: SignedStatement<C, D, V, S>,
|
||||
}
|
||||
|
||||
/// Different kinds of misbehavior. All of these kinds of malicious misbehavior
|
||||
/// are easily provable and extremely disincentivized.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum Misbehavior<C: Context> {
|
||||
pub enum Misbehavior<C, D, V, S> {
|
||||
/// Voted invalid and valid on validity.
|
||||
ValidityDoubleVote(ValidityDoubleVote<C>),
|
||||
ValidityDoubleVote(ValidityDoubleVote<C, D, S>),
|
||||
/// Submitted multiple candidates.
|
||||
MultipleCandidates(MultipleCandidates<C>),
|
||||
MultipleCandidates(MultipleCandidates<C, S>),
|
||||
/// Submitted a message withou
|
||||
UnauthorizedStatement(UnauthorizedStatement<C>),
|
||||
UnauthorizedStatement(UnauthorizedStatement<C, D, V, S>),
|
||||
}
|
||||
|
||||
/// Fancy work-around for a type alias of context-based misbehavior
|
||||
/// without producing compiler warnings.
|
||||
pub trait ResolveMisbehavior {
|
||||
/// The misbehavior type.
|
||||
type Misbehavior;
|
||||
}
|
||||
|
||||
impl<C: Context + ?Sized> ResolveMisbehavior for C {
|
||||
type Misbehavior = Misbehavior<C::Candidate, C::Digest, C::ValidatorId, C::Signature>;
|
||||
}
|
||||
|
||||
// kinds of votes for validity
|
||||
@@ -251,7 +262,7 @@ pub fn create<C: Context>() -> Table<C> {
|
||||
#[derive(Default)]
|
||||
pub struct Table<C: Context> {
|
||||
validator_data: HashMap<C::ValidatorId, ValidatorData<C>>,
|
||||
detected_misbehavior: HashMap<C::ValidatorId, Misbehavior<C>>,
|
||||
detected_misbehavior: HashMap<C::ValidatorId, <C as ResolveMisbehavior>::Misbehavior>,
|
||||
candidate_votes: HashMap<C::Digest, CandidateData<C>>,
|
||||
}
|
||||
|
||||
@@ -294,7 +305,7 @@ impl<C: Context> Table<C> {
|
||||
}
|
||||
|
||||
/// Drain all misbehavior observed up to this point.
|
||||
pub fn drain_misbehavior(&mut self) -> HashMap<C::ValidatorId, Misbehavior<C>> {
|
||||
pub fn drain_misbehavior(&mut self) -> HashMap<C::ValidatorId, <C as ResolveMisbehavior>::Misbehavior> {
|
||||
::std::mem::replace(&mut self.detected_misbehavior, HashMap::new())
|
||||
}
|
||||
|
||||
@@ -303,9 +314,12 @@ impl<C: Context> Table<C> {
|
||||
///
|
||||
/// This can note the origin of the statement to indicate that he has
|
||||
/// seen it already.
|
||||
pub fn import_statement(&mut self, context: &C, statement: SignedStatement<C>, from: Option<C::ValidatorId>)
|
||||
-> Option<Summary<C::Digest, C::GroupId>>
|
||||
{
|
||||
pub fn import_statement(
|
||||
&mut self,
|
||||
context: &C,
|
||||
statement: SignedStatement<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
|
||||
from: Option<C::ValidatorId>
|
||||
) -> Option<Summary<C::Digest, C::GroupId>> {
|
||||
let SignedStatement { statement, signature, sender: signer } = statement;
|
||||
|
||||
let trace = match statement {
|
||||
@@ -370,7 +384,7 @@ impl<C: Context> Table<C> {
|
||||
from: C::ValidatorId,
|
||||
candidate: C::Candidate,
|
||||
signature: C::Signature,
|
||||
) -> (Option<Misbehavior<C>>, Option<Summary<C::Digest, C::GroupId>>) {
|
||||
) -> (Option<<C as ResolveMisbehavior>::Misbehavior>, Option<Summary<C::Digest, C::GroupId>>) {
|
||||
let group = context.candidate_group(&candidate);
|
||||
if !context.is_member_of(&from, &group) {
|
||||
return (
|
||||
@@ -444,7 +458,7 @@ impl<C: Context> Table<C> {
|
||||
from: C::ValidatorId,
|
||||
digest: C::Digest,
|
||||
vote: ValidityVote<C::Signature>,
|
||||
) -> (Option<Misbehavior<C>>, Option<Summary<C::Digest, C::GroupId>>) {
|
||||
) -> (Option<<C as ResolveMisbehavior>::Misbehavior>, Option<Summary<C::Digest, C::GroupId>>) {
|
||||
let votes = match self.candidate_votes.get_mut(&digest) {
|
||||
None => return (None, None), // TODO: queue up but don't get DoS'ed
|
||||
Some(votes) => votes,
|
||||
@@ -522,7 +536,7 @@ impl<C: Context> Table<C> {
|
||||
from: C::ValidatorId,
|
||||
digest: C::Digest,
|
||||
signature: C::Signature,
|
||||
) -> (Option<Misbehavior<C>>, Option<Summary<C::Digest, C::GroupId>>) {
|
||||
) -> (Option<<C as ResolveMisbehavior>::Misbehavior>, Option<Summary<C::Digest, C::GroupId>>) {
|
||||
let votes = match self.candidate_votes.get_mut(&digest) {
|
||||
None => return (None, None), // TODO: queue up but don't get DoS'ed
|
||||
Some(votes) => votes,
|
||||
|
||||
Reference in New Issue
Block a user