PoC-1 consensus extracted to "rhododendron" (#284)

This commit is contained in:
Robert Habermeier
2018-07-09 10:26:04 +01:00
committed by Gav Wood
parent 9b254c3075
commit 374984fe35
18 changed files with 135 additions and 2273 deletions
+17 -1
View File
@@ -1470,6 +1470,7 @@ dependencies = [
"polkadot-runtime 0.1.0",
"polkadot-statement-table 0.1.0",
"polkadot-transaction-pool 0.1.0",
"rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0",
"substrate-client 0.1.0",
"substrate-codec 0.1.0",
@@ -1499,6 +1500,7 @@ dependencies = [
"polkadot-api 0.1.0",
"polkadot-consensus 0.1.0",
"polkadot-primitives 0.1.0",
"rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1802,6 +1804,16 @@ dependencies = [
"winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rhododendron"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ring"
version = "0.12.1"
@@ -2121,6 +2133,7 @@ dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-codec 0.1.0",
"substrate-executor 0.1.0",
"substrate-keyring 0.1.0",
@@ -2194,7 +2207,7 @@ dependencies = [
"hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2250,6 +2263,7 @@ dependencies = [
name = "substrate-misbehavior-check"
version = "0.1.0"
dependencies = [
"rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0",
"substrate-codec 0.1.0",
"substrate-keyring 0.1.0",
@@ -2645,6 +2659,7 @@ dependencies = [
name = "substrate-test-client"
version = "0.1.0"
dependencies = [
"rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0",
"substrate-client 0.1.0",
"substrate-codec 0.1.0",
@@ -3422,6 +3437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum regex-syntax 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7d707a4fa2637f2dca2ef9fd02225ec7661fe01a53623c1e6515b6916511f7a7"
"checksum relay 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1576e382688d7e9deecea24417e350d3062d97e32e45d70b1cde65994ff1489a"
"checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5"
"checksum rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9e38401cc1b63e71ec9119115c7e1354fcf54c8006ad59a22409dd8bd93737b2"
"checksum ring 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6f7d28b30a72c01b458428e0ae988d4149c20d902346902be881e3edc4bb325c"
"checksum rlp 0.2.1 (git+https://github.com/paritytech/parity.git)" = "<none>"
"checksum rlp 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "89db7f8dfdd5eb7ab3ac3ece7a07fd273a680b4b224cb231181280e8996f9f0b"
+1
View File
@@ -11,6 +11,7 @@ ed25519 = { path = "../../substrate/ed25519" }
error-chain = "0.12"
log = "0.3"
exit-future = "0.1"
rhododendron = "0.2"
polkadot-api = { path = "../api" }
polkadot-parachain = { path = "../parachain" }
polkadot-primitives = { path = "../primitives" }
+2 -1
View File
@@ -47,6 +47,7 @@ extern crate substrate_client as client;
extern crate exit_future;
extern crate tokio;
extern crate rhododendron;
#[macro_use]
extern crate error_chain;
@@ -530,7 +531,7 @@ impl<C> bft::Proposer<Block> for Proposer<C>
}
fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior<Hash>)>) {
use bft::generic::Misbehavior as GenericMisbehavior;
use rhododendron::Misbehavior as GenericMisbehavior;
use runtime_primitives::bft::{MisbehaviorKind, MisbehaviorReport};
use runtime_primitives::MaybeUnsigned;
use polkadot_runtime::{Call, Extrinsic, BareExtrinsic, UncheckedExtrinsic, ConsensusCall};
+1
View File
@@ -20,3 +20,4 @@ ed25519 = { path = "../../substrate/ed25519" }
futures = "0.1"
tokio = "0.1.7"
log = "0.4"
rhododendron = "0.2"
+19 -16
View File
@@ -53,8 +53,8 @@ impl<E> Sink for BftSink<E> {
fn start_send(&mut self, message: bft::Communication<Block>) -> ::futures::StartSend<bft::Communication<Block>, E> {
let network_message = net::LocalizedBftMessage {
message: match message {
bft::generic::Communication::Consensus(c) => msg::BftMessage::Consensus(match c {
bft::generic::LocalizedMessage::Propose(proposal) => msg::SignedConsensusMessage::Propose(msg::SignedConsensusProposal {
::rhododendron::Communication::Consensus(c) => msg::BftMessage::Consensus(match c {
::rhododendron::LocalizedMessage::Propose(proposal) => msg::SignedConsensusMessage::Propose(msg::SignedConsensusProposal {
round_number: proposal.round_number as u32,
proposal: proposal.proposal,
digest: proposal.digest,
@@ -62,17 +62,20 @@ impl<E> Sink for BftSink<E> {
digest_signature: proposal.digest_signature.signature,
full_signature: proposal.full_signature.signature,
}),
bft::generic::LocalizedMessage::Vote(vote) => msg::SignedConsensusMessage::Vote(msg::SignedConsensusVote {
::rhododendron::LocalizedMessage::Vote(vote) => msg::SignedConsensusMessage::Vote(msg::SignedConsensusVote {
sender: vote.sender,
signature: vote.signature.signature,
vote: match vote.vote {
bft::generic::Vote::Prepare(r, h) => msg::ConsensusVote::Prepare(r as u32, h),
bft::generic::Vote::Commit(r, h) => msg::ConsensusVote::Commit(r as u32, h),
bft::generic::Vote::AdvanceRound(r) => msg::ConsensusVote::AdvanceRound(r as u32),
::rhododendron::Vote::Prepare(r, h) => msg::ConsensusVote::Prepare(r as u32, h),
::rhododendron::Vote::Commit(r, h) => msg::ConsensusVote::Commit(r as u32, h),
::rhododendron::Vote::AdvanceRound(r) => msg::ConsensusVote::AdvanceRound(r as u32),
}
}),
}),
bft::generic::Communication::Auxiliary(justification) => msg::BftMessage::Auxiliary(justification.uncheck().into()),
::rhododendron::Communication::Auxiliary(justification) => {
let unchecked: bft::UncheckedJustification<_> = justification.uncheck().into();
msg::BftMessage::Auxiliary(unchecked.into())
}
},
parent_hash: self.parent_hash,
};
@@ -90,10 +93,10 @@ impl<E> Sink for BftSink<E> {
// check signature and authority validity of message.
fn process_bft_message(msg: msg::LocalizedBftMessage<Block, Hash>, local_id: &SessionKey, authorities: &[SessionKey]) -> Result<Option<bft::Communication<Block>>, bft::Error> {
Ok(Some(match msg.message {
msg::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
msg::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
msg::BftMessage::Consensus(c) => ::rhododendron::Communication::Consensus(match c {
msg::SignedConsensusMessage::Propose(proposal) => ::rhododendron::LocalizedMessage::Propose({
if &proposal.sender == local_id { return Ok(None) }
let proposal = bft::generic::LocalizedProposal {
let proposal = ::rhododendron::LocalizedProposal {
round_number: proposal.round_number as usize,
proposal: proposal.proposal,
digest: proposal.digest,
@@ -112,18 +115,18 @@ fn process_bft_message(msg: msg::LocalizedBftMessage<Block, Hash>, local_id: &Se
trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, Hash::from(proposal.sender.0));
proposal
}),
msg::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
msg::SignedConsensusMessage::Vote(vote) => ::rhododendron::LocalizedMessage::Vote({
if &vote.sender == local_id { return Ok(None) }
let vote = bft::generic::LocalizedVote {
let vote = ::rhododendron::LocalizedVote {
sender: vote.sender,
signature: ed25519::LocalizedSignature {
signature: vote.signature,
signer: ed25519::Public(vote.sender.0),
},
vote: match vote.vote {
msg::ConsensusVote::Prepare(r, h) => bft::generic::Vote::Prepare(r as usize, h),
msg::ConsensusVote::Commit(r, h) => bft::generic::Vote::Commit(r as usize, h),
msg::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize),
msg::ConsensusVote::Prepare(r, h) => ::rhododendron::Vote::Prepare(r as usize, h),
msg::ConsensusVote::Commit(r, h) => ::rhododendron::Vote::Commit(r as usize, h),
msg::ConsensusVote::AdvanceRound(r) => ::rhododendron::Vote::AdvanceRound(r as usize),
}
};
bft::check_vote::<Block>(authorities, &msg.parent_hash, &vote)?;
@@ -137,7 +140,7 @@ fn process_bft_message(msg: msg::LocalizedBftMessage<Block, Hash>, local_id: &Se
// TODO: get proper error
let justification: Result<_, bft::Error> = bft::check_prepare_justification::<Block>(authorities, msg.parent_hash, justification)
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
bft::generic::Communication::Auxiliary(justification?)
::rhododendron::Communication::Auxiliary(justification?)
},
}))
}
+1
View File
@@ -38,6 +38,7 @@ extern crate ed25519;
extern crate futures;
extern crate parking_lot;
extern crate tokio;
extern crate rhododendron;
#[macro_use]
extern crate log;
+1
View File
@@ -14,6 +14,7 @@ tokio = "0.1.7"
parking_lot = "0.4"
error-chain = "0.12"
log = "0.3"
rhododendron = "0.2"
[dev-dependencies]
substrate-keyring = { path = "../keyring" }
+2 -2
View File
@@ -74,8 +74,8 @@ error_chain! {
}
}
impl From<::generic::InputStreamConcluded> for Error {
fn from(_: ::generic::InputStreamConcluded) -> Error {
impl From<::rhododendron::InputStreamConcluded> for Error {
fn from(_: ::rhododendron::InputStreamConcluded) -> Error {
ErrorKind::IoTerminated.into()
}
}
@@ -1,883 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Vote accumulator for each round of BFT consensus.
use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::hash::Hash;
use generic::{Vote, LocalizedMessage, LocalizedProposal};
/// Justification for some state at a given round.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UncheckedJustification<D, S> {
/// The round.
pub round_number: usize,
/// The digest prepared for.
pub digest: D,
/// Signatures for the prepare messages.
pub signatures: Vec<S>,
}
impl<D, S> UncheckedJustification<D, S> {
/// Fails if there are duplicate signatures or invalid.
///
/// Provide a closure for checking whether the signature is valid on a
/// digest.
///
/// The closure should returns a checked justification iff the round number, digest, and signature
/// represent a valid message and the signer was authorized to issue
/// it.
///
/// The `check_message` closure may vary based on context.
pub fn check<F, V>(self, threshold: usize, mut check_message: F)
-> Result<Justification<D, S>, Self>
where
F: FnMut(usize, &D, &S) -> Option<V>,
V: Hash + Eq,
{
let checks_out = {
let mut checks_out = || {
let mut voted = HashSet::new();
for signature in &self.signatures {
match check_message(self.round_number, &self.digest, signature) {
None => return false,
Some(v) => {
if !voted.insert(v) {
return false;
}
}
}
}
voted.len() >= threshold
};
checks_out()
};
if checks_out {
Ok(Justification(self))
} else {
Err(self)
}
}
}
/// A checked justification.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Justification<D,S>(UncheckedJustification<D,S>);
impl<D, S> Justification<D, S> {
/// Convert this justification back to unchecked.
pub fn uncheck(self) -> UncheckedJustification<D, S> {
self.0
}
}
impl<D, S> ::std::ops::Deref for Justification<D, S> {
type Target = UncheckedJustification<D, S>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Type alias to represent a justification specifically for a prepare.
pub type PrepareJustification<D, S> = Justification<D, S>;
/// The round's state, based on imported messages.
#[derive(PartialEq, Eq, Debug)]
pub enum State<Candidate, Digest, Signature> {
/// No proposal yet.
Begin,
/// Proposal received.
Proposed(Candidate),
/// Seen n - f prepares for this digest.
Prepared(PrepareJustification<Digest, Signature>),
/// Seen n - f commits for a digest.
Committed(Justification<Digest, Signature>),
/// Seen n - f round-advancement messages.
Advanced(Option<PrepareJustification<Digest, Signature>>),
}
#[derive(Debug, Default)]
struct VoteCounts {
prepared: usize,
committed: usize,
}
#[derive(Debug)]
struct Proposal<Candidate, Digest, Signature> {
proposal: Candidate,
digest: Digest,
digest_signature: Signature,
}
/// Misbehavior which can occur.
#[derive(Debug, Clone)]
pub enum Misbehavior<Digest, Signature> {
/// Proposed out-of-turn.
ProposeOutOfTurn(usize, Digest, Signature),
/// Issued two conflicting proposals.
DoublePropose(usize, (Digest, Signature), (Digest, Signature)),
/// Issued two conflicting prepare messages.
DoublePrepare(usize, (Digest, Signature), (Digest, Signature)),
/// Issued two conflicting commit messages.
DoubleCommit(usize, (Digest, Signature), (Digest, Signature)),
}
/// Accumulates messages for a given round of BFT consensus.
///
/// This isn't tied to the "view" of a single authority. It
/// keeps accurate track of the state of the BFT consensus based
/// on all messages imported.
#[derive(Debug)]
pub struct Accumulator<Candidate, Digest, AuthorityId, Signature>
where
Candidate: Eq + Clone,
Digest: Hash + Eq + Clone,
AuthorityId: Hash + Eq + Clone,
Signature: Eq + Clone,
{
round_number: usize,
threshold: usize,
round_proposer: AuthorityId,
proposal: Option<Proposal<Candidate, Digest, Signature>>,
prepares: HashMap<AuthorityId, (Digest, Signature)>,
commits: HashMap<AuthorityId, (Digest, Signature)>,
vote_counts: HashMap<Digest, VoteCounts>,
advance_round: HashSet<AuthorityId>,
state: State<Candidate, Digest, Signature>,
}
impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, AuthorityId, Signature>
where
Candidate: Eq + Clone,
Digest: Hash + Eq + Clone,
AuthorityId: Hash + Eq + Clone,
Signature: Eq + Clone,
{
/// Create a new state accumulator.
pub fn new(round_number: usize, threshold: usize, round_proposer: AuthorityId) -> Self {
Accumulator {
round_number,
threshold,
round_proposer,
proposal: None,
prepares: HashMap::new(),
commits: HashMap::new(),
vote_counts: HashMap::new(),
advance_round: HashSet::new(),
state: State::Begin,
}
}
/// How advance votes we have seen.
pub fn advance_votes(&self) -> usize {
self.advance_round.len()
}
/// Get the round number.
pub fn round_number(&self) -> usize {
self.round_number.clone()
}
pub fn proposal(&self) -> Option<&Candidate> {
self.proposal.as_ref().map(|p| &p.proposal)
}
/// Inspect the current consensus state.
pub fn state(&self) -> &State<Candidate, Digest, Signature> {
&self.state
}
/// Import a message. Importing duplicates is fine, but the signature
/// and authorization should have already been checked.
pub fn import_message(
&mut self,
message: LocalizedMessage<Candidate, Digest, AuthorityId, Signature>,
) -> Result<(), Misbehavior<Digest, Signature>> {
// message from different round.
if message.round_number() != self.round_number {
return Ok(());
}
match message {
LocalizedMessage::Propose(proposal) => self.import_proposal(proposal),
LocalizedMessage::Vote(vote) => {
let (sender, signature) = (vote.sender, vote.signature);
match vote.vote {
Vote::Prepare(_, d) => self.import_prepare(d, sender, signature),
Vote::Commit(_, d) => self.import_commit(d, sender, signature),
Vote::AdvanceRound(_) => self.import_advance_round(sender),
}
}
}
}
fn import_proposal(
&mut self,
proposal: LocalizedProposal<Candidate, Digest, AuthorityId, Signature>,
) -> Result<(), Misbehavior<Digest, Signature>> {
let sender = proposal.sender;
if sender != self.round_proposer {
return Err(Misbehavior::ProposeOutOfTurn(
self.round_number,
proposal.digest,
proposal.digest_signature)
);
}
match self.proposal {
Some(ref p) if &p.digest != &proposal.digest => {
return Err(Misbehavior::DoublePropose(
self.round_number,
{
let old = self.proposal.as_ref().expect("just checked to be Some; qed");
(old.digest.clone(), old.digest_signature.clone())
},
(proposal.digest.clone(), proposal.digest_signature.clone())
))
}
_ => {},
}
debug!(target: "bft", "Importing proposal for round {}", self.round_number);
self.proposal = Some(Proposal {
proposal: proposal.proposal.clone(),
digest: proposal.digest,
digest_signature: proposal.digest_signature,
});
if let State::Begin = self.state {
self.state = State::Proposed(proposal.proposal);
}
Ok(())
}
fn import_prepare(
&mut self,
digest: Digest,
sender: AuthorityId,
signature: Signature,
) -> Result<(), Misbehavior<Digest, Signature>> {
// ignore any subsequent prepares by the same sender.
let threshold_prepared = match self.prepares.entry(sender.clone()) {
Entry::Vacant(vacant) => {
vacant.insert((digest.clone(), signature));
let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default);
count.prepared += 1;
if count.prepared >= self.threshold {
Some(digest)
} else {
None
}
}
Entry::Occupied(occupied) => {
// if digest is different, that's misbehavior.
if occupied.get().0 != digest {
return Err(Misbehavior::DoublePrepare(
self.round_number,
occupied.get().clone(),
(digest, signature)
));
}
None
}
};
// only allow transition to prepare from begin or proposed state.
let valid_transition = match self.state {
State::Begin | State::Proposed(_) => true,
_ => false,
};
if let (true, Some(threshold_prepared)) = (valid_transition, threshold_prepared) {
let signatures = self.prepares
.values()
.filter(|&&(ref d, _)| d == &threshold_prepared)
.map(|&(_, ref s)| s.clone())
.collect();
trace!(target: "bft", "observed threshold-prepare for round {}", self.round_number);
self.state = State::Prepared(Justification(UncheckedJustification {
round_number: self.round_number,
digest: threshold_prepared,
signatures: signatures,
}));
}
Ok(())
}
fn import_commit(
&mut self,
digest: Digest,
sender: AuthorityId,
signature: Signature,
) -> Result<(), Misbehavior<Digest, Signature>> {
// ignore any subsequent commits by the same sender.
let threshold_committed = match self.commits.entry(sender.clone()) {
Entry::Vacant(vacant) => {
vacant.insert((digest.clone(), signature));
let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default);
count.committed += 1;
if count.committed >= self.threshold {
Some(digest)
} else {
None
}
}
Entry::Occupied(occupied) => {
// if digest is different, that's misbehavior.
if occupied.get().0 != digest {
return Err(Misbehavior::DoubleCommit(
self.round_number,
occupied.get().clone(),
(digest, signature)
));
}
None
}
};
// transition to concluded state always valid.
// only weird case is if the prior state was "advanced",
// but technically it's the same behavior as if the order of receiving
// the last "advance round" and "commit" messages were reversed.
if let Some(threshold_committed) = threshold_committed {
let signatures = self.commits
.values()
.filter(|&&(ref d, _)| d == &threshold_committed)
.map(|&(_, ref s)| s.clone())
.collect();
trace!(target: "bft", "observed threshold-commit for round {}", self.round_number);
self.state = State::Committed(Justification(UncheckedJustification {
round_number: self.round_number,
digest: threshold_committed,
signatures: signatures,
}));
}
Ok(())
}
fn import_advance_round(
&mut self,
sender: AuthorityId,
) -> Result<(), Misbehavior<Digest, Signature>> {
self.advance_round.insert(sender);
if self.advance_round.len() < self.threshold { return Ok(()) }
trace!(target: "bft", "Witnessed threshold advance-round messages for round {}", self.round_number);
// allow transition to new round only if we haven't produced a justification
// yet.
self.state = match ::std::mem::replace(&mut self.state, State::Begin) {
State::Committed(j) => State::Committed(j),
State::Prepared(j) => State::Advanced(Some(j)),
State::Advanced(j) => State::Advanced(j),
State::Begin | State::Proposed(_) => State::Advanced(None),
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use generic::{LocalizedMessage, LocalizedProposal, LocalizedVote};
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Candidate(usize);
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
pub struct Digest(usize);
#[derive(Hash, PartialEq, Eq, Debug, Clone)]
pub struct AuthorityId(usize);
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Signature(usize, usize);
#[test]
fn justification_checks_out() {
let mut justification = UncheckedJustification {
round_number: 2,
digest: Digest(600),
signatures: (0..10).map(|i| Signature(600, i)).collect(),
};
let check_message = |r, d: &Digest, s: &Signature| {
if r == 2 && d.0 == 600 && s.0 == 600 {
Some(AuthorityId(s.1))
} else {
None
}
};
assert!(justification.clone().check(7, &check_message).is_ok());
assert!(justification.clone().check(11, &check_message).is_err());
{
// one bad signature is enough to spoil it.
justification.signatures.push(Signature(1001, 255));
assert!(justification.clone().check(7, &check_message).is_err());
justification.signatures.pop();
}
// duplicates not allowed.
justification.signatures.extend((0..10).map(|i| Signature(600, i)));
assert!(justification.clone().check(11, &check_message).is_err());
}
#[test]
fn accepts_proposal_from_proposer_only() {
let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
let res = accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(5),
full_signature: Signature(999, 5),
digest_signature: Signature(999, 5),
proposal: Candidate(999),
digest: Digest(999),
round_number: 1,
}));
assert!(res.is_err());
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
proposal: Candidate(999),
digest: Digest(999),
round_number: 1,
})).unwrap();
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
}
#[test]
fn reaches_prepare_phase() {
let mut accumulator = Accumulator::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
for i in 0..6 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Prepare(1, Digest(999)),
}.into()).unwrap();
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
}
accumulator.import_message(LocalizedVote {
sender: AuthorityId(7),
signature: Signature(999, 7),
vote: Vote::Prepare(1, Digest(999)),
}.into()).unwrap();
match accumulator.state() {
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
}
#[test]
fn prepare_to_commit() {
let mut accumulator = Accumulator::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
for i in 0..6 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Prepare(1, Digest(999)),
}.into()).unwrap();
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
}
accumulator.import_message(LocalizedVote {
sender: AuthorityId(7),
signature: Signature(999, 7),
vote: Vote::Prepare(1, Digest(999)),
}.into()).unwrap();
match accumulator.state() {
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
for i in 0..6 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Commit(1, Digest(999)),
}.into()).unwrap();
match accumulator.state() {
&State::Prepared(_) => {},
s => panic!("wrong state: {:?}", s),
}
}
accumulator.import_message(LocalizedVote {
sender: AuthorityId(7),
signature: Signature(999, 7),
vote: Vote::Commit(1, Digest(999)),
}.into()).unwrap();
match accumulator.state() {
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
}
#[test]
fn prepare_to_advance() {
let mut accumulator = Accumulator::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();
assert_eq!(accumulator.state(), &State::Proposed(Candidate(999)));
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Prepare(1, Digest(999)),
}.into()).unwrap();
}
match accumulator.state() {
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
for i in 0..6 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::AdvanceRound(1),
}.into()).unwrap();
match accumulator.state() {
&State::Prepared(_) => {},
s => panic!("wrong state: {:?}", s),
}
}
accumulator.import_message(LocalizedVote {
sender: AuthorityId(7),
signature: Signature(999, 7),
vote: Vote::AdvanceRound(1),
}.into()).unwrap();
match accumulator.state() {
&State::Advanced(Some(_)) => {},
s => panic!("wrong state: {:?}", s),
}
}
#[test]
fn conclude_different_than_proposed() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Prepare(1, Digest(999)),
}.into()).unwrap();
}
match accumulator.state() {
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Commit(1, Digest(999)),
}.into()).unwrap();
}
match accumulator.state() {
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
}
#[test]
fn propose_after_prepared_does_not_clobber_state() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Prepare(1, Digest(999)),
}.into()).unwrap();
}
match accumulator.state() {
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();
match accumulator.state() {
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
}
#[test]
fn propose_after_committed_does_not_clobber_state() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Commit(1, Digest(999)),
}.into()).unwrap();
}
match accumulator.state() {
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();
match accumulator.state() {
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
}
#[test]
fn propose_after_advance_does_not_clobber_state() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(1, i),
vote: Vote::AdvanceRound(1),
}.into()).unwrap();
}
match accumulator.state() {
&State::Advanced(_) => {}
s => panic!("wrong state: {:?}", s),
}
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();
match accumulator.state() {
&State::Advanced(_) => {}
s => panic!("wrong state: {:?}", s),
}
}
#[test]
fn begin_to_advance() {
let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(1, i),
vote: Vote::AdvanceRound(1),
}.into()).unwrap();
}
match accumulator.state() {
&State::Advanced(ref j) => assert!(j.is_none()),
s => panic!("wrong state: {:?}", s),
}
}
#[test]
fn conclude_without_prepare() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Commit(1, Digest(999)),
}.into()).unwrap();
}
match accumulator.state() {
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
}
#[test]
fn double_prepare_is_misbehavior() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Prepare(1, Digest(999)),
}.into()).unwrap();
let res = accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(123, i),
vote: Vote::Prepare(1, Digest(123)),
}.into());
assert!(res.is_err());
}
}
#[test]
fn double_commit_is_misbehavior() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Commit(1, Digest(999)),
}.into()).unwrap();
let res = accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(123, i),
vote: Vote::Commit(1, Digest(123)),
}.into());
assert!(res.is_err());
}
}
#[test]
fn double_propose_is_misbehavior() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();
let res = accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(500, 8),
digest_signature: Signature(500, 8),
round_number: 1,
proposal: Candidate(500),
digest: Digest(500),
}));
assert!(res.is_err());
}
}
-832
View File
@@ -1,832 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! BFT Agreement based on a rotating proposer in different rounds.
//! Very general implementation.
use std::collections::{HashMap, BTreeMap, VecDeque};
use std::collections::hash_map;
use std::fmt::Debug;
use std::hash::Hash;
use futures::{future, Future, Stream, Sink, Poll, Async, AsyncSink};
use self::accumulator::State;
pub use self::accumulator::{Accumulator, Justification, PrepareJustification, UncheckedJustification, Misbehavior};
mod accumulator;
#[cfg(test)]
mod tests;
/// Votes during a round.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Vote<D> {
/// Prepare to vote for proposal with digest D.
Prepare(usize, D),
/// Commit to proposal with digest D..
Commit(usize, D),
/// Propose advancement to a new round.
AdvanceRound(usize),
}
impl<D> Vote<D> {
/// Extract the round number.
pub fn round_number(&self) -> usize {
match *self {
Vote::Prepare(round, _) => round,
Vote::Commit(round, _) => round,
Vote::AdvanceRound(round) => round,
}
}
}
/// Messages over the proposal.
/// Each message carries an associated round number.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Message<C, D> {
/// A proposal itself.
Propose(usize, C),
/// A vote of some kind, localized to a round number.
Vote(Vote<D>),
}
impl<C, D> From<Vote<D>> for Message<C, D> {
fn from(vote: Vote<D>) -> Self {
Message::Vote(vote)
}
}
/// A localized proposal message. Contains two signed pieces of data.
#[derive(Debug, Clone)]
pub struct LocalizedProposal<C, D, V, S> {
/// The round number.
pub round_number: usize,
/// The proposal sent.
pub proposal: C,
/// The digest of the proposal.
pub digest: D,
/// The sender of the proposal
pub sender: V,
/// The signature on the message (propose, round number, digest)
pub digest_signature: S,
/// The signature on the message (propose, round number, proposal)
pub full_signature: S,
}
/// A localized vote message, including the sender.
#[derive(Debug, Clone)]
pub struct LocalizedVote<D, V, S> {
/// The message sent.
pub vote: Vote<D>,
/// The sender of the message
pub sender: V,
/// The signature of the message.
pub signature: S,
}
/// A localized message.
#[derive(Debug, Clone)]
pub enum LocalizedMessage<C, D, V, S> {
/// A proposal.
Propose(LocalizedProposal<C, D, V, S>),
/// A vote.
Vote(LocalizedVote<D, V, S>),
}
impl<C, D, V, S> LocalizedMessage<C, D, V, S> {
/// Extract the sender.
pub fn sender(&self) -> &V {
match *self {
LocalizedMessage::Propose(ref proposal) => &proposal.sender,
LocalizedMessage::Vote(ref vote) => &vote.sender,
}
}
/// Extract the round number.
pub fn round_number(&self) -> usize {
match *self {
LocalizedMessage::Propose(ref proposal) => proposal.round_number,
LocalizedMessage::Vote(ref vote) => vote.vote.round_number(),
}
}
}
impl<C, D, V, S> From<LocalizedVote<D, V, S>> for LocalizedMessage<C, D, V, S> {
fn from(vote: LocalizedVote<D, V, S>) -> Self {
LocalizedMessage::Vote(vote)
}
}
/// Context necessary for agreement.
///
/// Provides necessary types for protocol messages, and functions necessary for a
/// participant to evaluate and create those messages.
pub trait Context {
/// Errors which can occur from the futures in this context.
type Error: From<InputStreamConcluded>;
/// Candidate proposed.
type Candidate: Debug + Eq + Clone;
/// Candidate digest.
type Digest: Debug + Hash + Eq + Clone;
/// Authority ID.
type AuthorityId: Debug + Hash + Eq + Clone;
/// Signature.
type Signature: Debug + Eq + Clone;
/// A future that resolves when a round timeout is concluded.
type RoundTimeout: Future<Item=(), Error=Self::Error>;
/// A future that resolves when a proposal is ready.
type CreateProposal: Future<Item=Self::Candidate, Error=Self::Error>;
/// A future that resolves when a proposal has been evaluated.
type EvaluateProposal: Future<Item=bool, Error=Self::Error>;
/// Get the local authority ID.
fn local_id(&self) -> Self::AuthorityId;
/// Get the best proposal.
fn proposal(&self) -> Self::CreateProposal;
/// Get the digest of a candidate.
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
/// Sign a message using the local authority ID.
/// In the case of a proposal message, it should sign on the hash and
/// the bytes of the proposal.
fn sign_local(&self, message: Message<Self::Candidate, Self::Digest>)
-> LocalizedMessage<Self::Candidate, Self::Digest, Self::AuthorityId, Self::Signature>;
/// Get the proposer for a given round of consensus.
fn round_proposer(&self, round: usize) -> Self::AuthorityId;
/// Whether the proposal is valid.
fn proposal_valid(&self, proposal: &Self::Candidate) -> Self::EvaluateProposal;
/// Create a round timeout. The context will determine the correct timeout
/// length, and create a future that will resolve when the timeout is
/// concluded.
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout;
}
/// Communication that can occur between participants in consensus.
#[derive(Debug, Clone)]
pub enum Communication<C, D, V, S> {
/// A consensus message (proposal or vote)
Consensus(LocalizedMessage<C, D, V, S>),
/// Auxiliary communication (just proof-of-lock for now).
Auxiliary(PrepareJustification<D, S>),
}
/// Hack to get around type alias warning.
pub trait TypeResolve {
/// Communication type.
type Communication;
}
impl<C: Context> TypeResolve for C {
type Communication = Communication<C::Candidate, C::Digest, C::AuthorityId, C::Signature>;
}
#[derive(Debug)]
struct Sending<T> {
items: VecDeque<T>,
flushing: bool,
}
impl<T> Sending<T> {
fn with_capacity(n: usize) -> Self {
Sending {
items: VecDeque::with_capacity(n),
flushing: false,
}
}
fn push(&mut self, item: T) {
self.items.push_back(item);
self.flushing = false;
}
// process all the sends into the sink.
fn process_all<S: Sink<SinkItem=T>>(&mut self, sink: &mut S) -> Poll<(), S::SinkError> {
while let Some(item) = self.items.pop_front() {
match sink.start_send(item) {
Err(e) => return Err(e),
Ok(AsyncSink::NotReady(item)) => {
self.items.push_front(item);
return Ok(Async::NotReady);
}
Ok(AsyncSink::Ready) => { self.flushing = true; }
}
}
if self.flushing {
match sink.poll_complete() {
Err(e) => return Err(e),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(())) => { self.flushing = false; }
}
}
Ok(Async::Ready(()))
}
}
/// Error returned when the input stream concludes.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InputStreamConcluded;
impl ::std::fmt::Display for InputStreamConcluded {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "{}", ::std::error::Error::description(self))
}
}
impl ::std::error::Error for InputStreamConcluded {
fn description(&self) -> &str {
"input stream of messages concluded prematurely"
}
}
// get the "full BFT" threshold based on an amount of nodes and
// a maximum faulty. if nodes == 3f + 1, then threshold == 2f + 1.
fn bft_threshold(nodes: usize, max_faulty: usize) -> usize {
nodes - max_faulty
}
/// Committed successfully.
#[derive(Debug, Clone)]
pub struct Committed<C, D, S> {
/// The candidate committed for. This will be unknown if
/// we never witnessed the proposal of the last round.
pub candidate: Option<C>,
/// A justification for the candidate.
pub justification: Justification<D, S>,
}
struct Locked<D, S> {
justification: PrepareJustification<D, S>,
}
impl<D, S> Locked<D, S> {
fn digest(&self) -> &D {
&self.justification.digest
}
}
// the state of the local node during the current state of consensus.
//
// behavior is different when locked on a proposal.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LocalState {
Start,
Proposed,
Prepared(bool), // whether we thought it valid.
Committed,
VoteAdvance,
}
// This structure manages a single "view" of consensus.
//
// We maintain two message accumulators: one for the round we are currently in,
// and one for a future round.
//
// We advance the round accumulators when one of two conditions is met:
// - we witness consensus of advancement in the current round. in this case we
// advance by one.
// - a higher threshold-prepare is broadcast to us. in this case we can
// advance to the round of the threshold-prepare. this is an indication
// that we have experienced severe asynchrony/clock drift with the remainder
// of the other authorities, and it is unlikely that we can assist in
// consensus meaningfully. nevertheless we make an attempt.
struct Strategy<C: Context> {
nodes: usize,
max_faulty: usize,
fetching_proposal: Option<C::CreateProposal>,
evaluating_proposal: Option<C::EvaluateProposal>,
round_timeout: future::Fuse<C::RoundTimeout>,
local_state: LocalState,
locked: Option<Locked<C::Digest, C::Signature>>,
notable_candidates: HashMap<C::Digest, C::Candidate>,
current_accumulator: Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
future_accumulators: BTreeMap<usize, Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>>,
local_id: C::AuthorityId,
misbehavior: HashMap<C::AuthorityId, Misbehavior<C::Digest, C::Signature>>,
}
impl<C: Context> Strategy<C> {
fn create(context: &C, nodes: usize, max_faulty: usize) -> Self {
let timeout = context.begin_round_timeout(0);
let threshold = bft_threshold(nodes, max_faulty);
let current_accumulator = Accumulator::new(
0,
threshold,
context.round_proposer(0),
);
Strategy {
nodes,
max_faulty,
current_accumulator,
future_accumulators: BTreeMap::new(),
fetching_proposal: None,
evaluating_proposal: None,
local_state: LocalState::Start,
locked: None,
notable_candidates: HashMap::new(),
round_timeout: timeout.fuse(),
local_id: context.local_id(),
misbehavior: HashMap::new(),
}
}
fn current_round(&self) -> usize {
self.current_accumulator.round_number()
}
fn import_message(
&mut self,
context: &C,
msg: LocalizedMessage<C::Candidate, C::Digest, C::AuthorityId, C::Signature>
) {
let round_number = msg.round_number();
let sender = msg.sender().clone();
let current_round = self.current_round();
let misbehavior = if round_number == current_round {
self.current_accumulator.import_message(msg)
} else if round_number > current_round {
let threshold = bft_threshold(self.nodes, self.max_faulty);
let mut future_acc = self.future_accumulators.entry(round_number).or_insert_with(|| {
Accumulator::new(
round_number,
threshold,
context.round_proposer(round_number),
)
});
future_acc.import_message(msg)
} else {
Ok(())
};
if let Err(misbehavior) = misbehavior {
self.misbehavior.insert(sender, misbehavior);
}
}
fn import_lock_proof(
&mut self,
context: &C,
justification: PrepareJustification<C::Digest, C::Signature>,
) {
// TODO: find a way to avoid processing of the signatures if the sender is
// not the primary or the round number is low.
if justification.round_number > self.current_round() {
// jump ahead to the prior round as this is an indication of a supermajority
// good nodes being at least on that round.
self.advance_to_round(context, justification.round_number);
}
let lock_to_new = self.locked.as_ref()
.map_or(true, |l| l.justification.round_number < justification.round_number);
if lock_to_new {
self.locked = Some(Locked { justification })
}
}
// poll the strategy: this will queue messages to be sent and advance
// rounds if necessary.
//
// only call within the context of a `Task`.
fn poll(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, C::Error>
{
let mut last_watermark = (self.current_round(), self.local_state);
// poll until either completion or state doesn't change.
loop {
trace!(target: "bft", "Polling BFT logic. State={:?}", last_watermark);
match self.poll_once(context, sending)? {
Async::Ready(x) => return Ok(Async::Ready(x)),
Async::NotReady => {
let new_watermark = (self.current_round(), self.local_state);
if new_watermark == last_watermark {
return Ok(Async::NotReady)
} else {
last_watermark = new_watermark;
}
}
}
}
}
// perform one round of polling: attempt to broadcast messages and change the state.
// if the round or internal round-state changes, this should be called again.
fn poll_once(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, C::Error>
{
self.propose(context, sending)?;
self.prepare(context, sending)?;
self.commit(context, sending);
self.vote_advance(context, sending)?;
let advance = match self.current_accumulator.state() {
&State::Advanced(ref p_just) => {
// lock to any witnessed prepare justification.
if let Some(p_just) = p_just.as_ref() {
self.locked = Some(Locked { justification: p_just.clone() });
}
let round_number = self.current_round();
Some(round_number + 1)
}
&State::Committed(ref just) => {
// fetch the agreed-upon candidate:
// - we may not have received the proposal in the first place
// - there is no guarantee that the proposal we got was agreed upon
// (can happen if faulty primary)
// - look in the candidates of prior rounds just in case.
let candidate = self.current_accumulator
.proposal()
.and_then(|c| if context.candidate_digest(c) == just.digest {
Some(c.clone())
} else {
None
})
.or_else(|| self.notable_candidates.get(&just.digest).cloned());
let committed = Committed {
candidate,
justification: just.clone()
};
return Ok(Async::Ready(committed))
}
_ => None,
};
if let Some(new_round) = advance {
self.advance_to_round(context, new_round);
}
Ok(Async::NotReady)
}
fn propose(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Result<(), C::Error>
{
if let LocalState::Start = self.local_state {
let mut propose = false;
if let &State::Begin = self.current_accumulator.state() {
let round_number = self.current_round();
let primary = context.round_proposer(round_number);
propose = self.local_id == primary;
};
if !propose { return Ok(()) }
// obtain the proposal to broadcast.
let proposal = match self.locked {
Some(ref locked) => {
// TODO: it's possible but very unlikely that we don't have the
// corresponding proposal for what we are locked to.
//
// since this is an edge case on an edge case, it is fine
// to eat the round timeout for now, but it can be optimized by
// broadcasting an advance vote.
self.notable_candidates.get(locked.digest()).cloned()
}
None => {
let res = self.fetching_proposal
.get_or_insert_with(|| context.proposal())
.poll()?;
match res {
Async::Ready(p) => Some(p),
Async::NotReady => None,
}
}
};
if let Some(proposal) = proposal {
self.fetching_proposal = None;
let message = Message::Propose(
self.current_round(),
proposal
);
self.import_and_send_message(message, context, sending);
// broadcast the justification along with the proposal if we are locked.
if let Some(ref locked) = self.locked {
sending.push(
Communication::Auxiliary(locked.justification.clone())
);
}
self.local_state = LocalState::Proposed;
}
}
Ok(())
}
fn prepare(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Result<(), C::Error>
{
// prepare only upon start or having proposed.
match self.local_state {
LocalState::Start | LocalState::Proposed => {},
_ => return Ok(())
};
let mut prepare_for = None;
// we can't prepare until something was proposed.
if let &State::Proposed(ref candidate) = self.current_accumulator.state() {
let digest = context.candidate_digest(candidate);
// vote to prepare only if we believe the candidate to be valid and
// we are not locked on some other candidate.
match self.locked {
Some(ref locked) if locked.digest() != &digest => {}
Some(_) => {
// don't check validity if we are locked.
// this is necessary to preserve the liveness property.
self.local_state = LocalState::Prepared(true);
prepare_for = Some(digest);
}
None => {
let res = self.evaluating_proposal
.get_or_insert_with(|| context.proposal_valid(candidate))
.poll()?;
if let Async::Ready(valid) = res {
self.evaluating_proposal = None;
self.local_state = LocalState::Prepared(valid);
if valid {
prepare_for = Some(digest);
}
}
}
}
}
if let Some(digest) = prepare_for {
let message = Vote::Prepare(
self.current_round(),
digest
).into();
self.import_and_send_message(message, context, sending);
}
Ok(())
}
fn commit(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
) {
// commit only if we haven't voted to advance or committed already
match self.local_state {
LocalState::Committed | LocalState::VoteAdvance => return,
_ => {}
}
let mut commit_for = None;
if let &State::Prepared(ref p_just) = self.current_accumulator.state() {
// we are now locked to this prepare justification.
let digest = p_just.digest.clone();
self.locked = Some(Locked { justification: p_just.clone() });
commit_for = Some(digest);
}
if let Some(digest) = commit_for {
let message = Vote::Commit(
self.current_round(),
digest
).into();
self.import_and_send_message(message, context, sending);
self.local_state = LocalState::Committed;
}
}
fn vote_advance(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Result<(), C::Error>
{
// we can vote for advancement under all circumstances unless we have already.
if let LocalState::VoteAdvance = self.local_state { return Ok(()) }
// if we got f + 1 advance votes, or the timeout has fired, and we haven't
// sent an AdvanceRound message yet, do so.
let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty;
// if we evaluated the proposal and it was bad, vote to advance round.
if let LocalState::Prepared(false) = self.local_state {
attempt_advance = true;
}
// if the timeout has fired, vote to advance round.
if let Async::Ready(_) = self.round_timeout.poll()? {
attempt_advance = true;
}
if attempt_advance {
let message = Vote::AdvanceRound(
self.current_round(),
).into();
self.import_and_send_message(message, context, sending);
self.local_state = LocalState::VoteAdvance;
}
Ok(())
}
fn advance_to_round(&mut self, context: &C, round: usize) {
assert!(round > self.current_round());
trace!(target: "bft", "advancing to round {}", round);
self.fetching_proposal = None;
self.evaluating_proposal = None;
self.round_timeout = context.begin_round_timeout(round).fuse();
self.local_state = LocalState::Start;
// when advancing from a round, store away the witnessed proposal.
//
// if we or other participants end up locked on that candidate,
// we will have it.
if let Some(proposal) = self.current_accumulator.proposal() {
let digest = context.candidate_digest(proposal);
self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone());
}
// if we jump ahead more than one round, get rid of the ones in between.
for irrelevant in (self.current_round() + 1)..round {
self.future_accumulators.remove(&irrelevant);
}
// use stored future accumulator for given round or create if it doesn't exist.
self.current_accumulator = match self.future_accumulators.remove(&round) {
Some(x) => x,
None => Accumulator::new(
round,
bft_threshold(self.nodes, self.max_faulty),
context.round_proposer(round),
),
};
}
fn import_and_send_message(
&mut self,
message: Message<C::Candidate, C::Digest>,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
) {
let signed_message = context.sign_local(message);
self.import_message(context, signed_message.clone());
sending.push(Communication::Consensus(signed_message));
}
}
/// Future that resolves upon BFT agreement for a candidate.
#[must_use = "futures do nothing unless polled"]
pub struct Agreement<C: Context, I, O> {
context: C,
input: I,
output: O,
concluded: Option<Committed<C::Candidate, C::Digest, C::Signature>>,
sending: Sending<<C as TypeResolve>::Communication>,
strategy: Strategy<C>,
}
impl<C, I, O> Future for Agreement<C, I, O>
where
C: Context,
I: Stream<Item=<C as TypeResolve>::Communication,Error=C::Error>,
O: Sink<SinkItem=<C as TypeResolve>::Communication,SinkError=C::Error>,
{
type Item = Committed<C::Candidate, C::Digest, C::Signature>;
type Error = C::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// even if we've observed the conclusion, wait until all
// pending outgoing messages are flushed.
if let Some(just) = self.concluded.take() {
return Ok(match self.sending.process_all(&mut self.output)? {
Async::Ready(()) => Async::Ready(just),
Async::NotReady => {
self.concluded = Some(just);
Async::NotReady
}
})
}
// drive state machine as long as there are new messages.
let mut driving = true;
while driving {
driving = match self.input.poll()? {
Async::Ready(msg) => {
match msg.ok_or(InputStreamConcluded)? {
Communication::Consensus(message) => self.strategy.import_message(&self.context, message),
Communication::Auxiliary(lock_proof)
=> self.strategy.import_lock_proof(&self.context, lock_proof),
}
true
}
Async::NotReady => false,
};
// drive state machine after handling new input.
if let Async::Ready(just) = self.strategy.poll(&self.context, &mut self.sending)? {
self.concluded = Some(just);
return self.poll();
}
}
// make progress on flushing all pending messages.
let _ = self.sending.process_all(&mut self.output)?;
Ok(Async::NotReady)
}
}
impl<C: Context, I, O> Agreement<C, I, O> {
/// Get a reference to the underlying context.
pub fn context(&self) -> &C {
&self.context
}
/// Drain the misbehavior vector.
pub fn drain_misbehavior(&mut self) -> hash_map::Drain<C::AuthorityId, Misbehavior<C::Digest, C::Signature>> {
self.strategy.misbehavior.drain()
}
}
/// Attempt to reach BFT agreement on a candidate.
///
/// `nodes` is the number of nodes in the system.
/// `max_faulty` is the maximum number of faulty nodes. Should be less than
/// 1/3 of `nodes`, otherwise agreement may never be reached.
///
/// The input and output streams should follow the constraints documented in the crate root.
pub fn agree<C: Context, I, O>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
-> Agreement<C, I, O>
where
C: Context,
I: Stream<Item=<C as TypeResolve>::Communication,Error=C::Error>,
O: Sink<SinkItem=<C as TypeResolve>::Communication,SinkError=C::Error>,
{
let strategy = Strategy::create(&context, nodes, max_faulty);
Agreement {
context,
input,
output,
concluded: None,
sending: Sending::with_capacity(4),
strategy: strategy,
}
}
@@ -1,469 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Tests for the candidate agreement strategy.
use super::*;
use std::collections::BTreeSet;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Instant, Duration};
use futures::sync::mpsc;
use futures::future::{self, FutureResult};
use tokio::timer::Delay;
use tokio::runtime;
use tokio::prelude::*;
const ROUND_DURATION: Duration = Duration::from_millis(50);
struct Network<T> {
endpoints: Vec<mpsc::UnboundedSender<T>>,
input: mpsc::UnboundedReceiver<(usize, T)>,
}
impl<T: Clone + Send + 'static> Network<T> {
fn new(nodes: usize)
-> (Self, Vec<mpsc::UnboundedSender<(usize, T)>>, Vec<mpsc::UnboundedReceiver<T>>)
{
let mut inputs = Vec::with_capacity(nodes);
let mut outputs = Vec::with_capacity(nodes);
let mut endpoints = Vec::with_capacity(nodes);
let (in_tx, in_rx) = mpsc::unbounded();
for _ in 0..nodes {
let (out_tx, out_rx) = mpsc::unbounded();
inputs.push(in_tx.clone());
outputs.push(out_rx);
endpoints.push(out_tx);
}
let network = Network {
endpoints,
input: in_rx,
};
(network, inputs, outputs)
}
fn route_in_background(self) {
::tokio::executor::spawn(self);
}
}
impl<T: Clone> Future for Network<T> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), Self::Error> {
match try_ready!(self.input.poll()) {
None => Ok(Async::Ready(())),
Some((sender, item)) => {
{
let receiving_endpoints = self.endpoints
.iter()
.enumerate()
.filter(|&(i, _)| i != sender)
.map(|(_, x)| x);
for endpoint in receiving_endpoints {
let _ = endpoint.unbounded_send(item.clone());
}
}
self.poll()
}
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct Candidate(usize);
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
struct Digest(usize);
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct AuthorityId(usize);
#[derive(Debug, PartialEq, Eq, Clone)]
struct Signature(Message<Candidate, Digest>, AuthorityId);
#[derive(Debug)]
struct Error;
impl From<InputStreamConcluded> for Error {
fn from(_: InputStreamConcluded) -> Error {
Error
}
}
struct TestContext {
local_id: AuthorityId,
proposal: Mutex<usize>,
node_count: usize,
current_round: Arc<AtomicUsize>,
evaluated: Mutex<BTreeSet<usize>>,
}
impl Context for TestContext {
type Error = Error;
type Candidate = Candidate;
type Digest = Digest;
type AuthorityId = AuthorityId;
type Signature = Signature;
type RoundTimeout = Box<Future<Item=(), Error=Error>>;
type CreateProposal = FutureResult<Candidate, Error>;
type EvaluateProposal = FutureResult<bool, Error>;
fn local_id(&self) -> AuthorityId {
self.local_id.clone()
}
fn proposal(&self) -> Self::CreateProposal {
let proposal = {
let mut p = self.proposal.lock().unwrap();
let x = *p;
*p += self.node_count;
x
};
Ok(Candidate(proposal)).into_future()
}
fn candidate_digest(&self, candidate: &Candidate) -> Digest {
Digest(candidate.0)
}
fn sign_local(&self, message: Message<Candidate, Digest>)
-> LocalizedMessage<Candidate, Digest, AuthorityId, Signature>
{
let signature = Signature(message.clone(), self.local_id.clone());
match message {
Message::Propose(r, proposal) => LocalizedMessage::Propose(LocalizedProposal {
round_number: r,
digest: Digest(proposal.0),
proposal,
digest_signature: signature.clone(),
full_signature: signature,
sender: self.local_id.clone(),
}),
Message::Vote(vote) => LocalizedMessage::Vote(LocalizedVote {
vote,
signature,
sender: self.local_id.clone(),
}),
}
}
fn round_proposer(&self, round: usize) -> AuthorityId {
AuthorityId(round % self.node_count)
}
fn proposal_valid(&self, proposal: &Candidate) -> FutureResult<bool, Error> {
if !self.evaluated.lock().unwrap().insert(proposal.0) {
panic!("Evaluated proposal {:?} twice", proposal.0);
}
Ok(proposal.0 % 3 != 0).into_future()
}
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
if round < self.current_round.load(Ordering::SeqCst) {
Box::new(Ok(()).into_future())
} else {
let mut round_duration = ROUND_DURATION;
for _ in 0..round {
round_duration *= 2;
}
let current_round = self.current_round.clone();
let timeout = Delay::new(Instant::now() + round_duration)
.map(move |_| {
current_round.compare_and_swap(round, round + 1, Ordering::SeqCst);
})
.map_err(|_| Error);
Box::new(timeout)
}
}
}
fn test_harness<F, X, T, U>(f: F) -> Result<T, U> where
F: FnOnce() -> X,
X: IntoFuture<Item=T,Error=U>,
{
let mut runtime = runtime::current_thread::Runtime::new().unwrap();
runtime.block_on(future::lazy(f))
}
#[test]
fn consensus_completes_with_minimum_good() {
let results = test_harness(|| {
let node_count = 10;
let max_faulty = 3;
let (network, net_send, net_recv) = Network::new(node_count);
network.route_in_background();
let nodes = net_send
.into_iter()
.zip(net_recv)
.take(node_count - max_faulty)
.enumerate()
.map(|(i, (tx, rx))| {
let ctx = TestContext {
local_id: AuthorityId(i),
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
agree(
ctx,
node_count,
max_faulty,
rx.map_err(|_| Error),
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
)
})
.collect::<Vec<_>>();
::futures::future::join_all(nodes)
.deadline(Instant::now() + Duration::from_millis(500))
}).unwrap();
for result in &results {
assert_eq!(&result.justification.digest, &results[0].justification.digest);
}
}
#[test]
fn consensus_completes_with_minimum_good_all_initial_proposals_bad() {
let results = test_harness(|| {
let node_count = 10;
let max_faulty = 3;
let (network, net_send, net_recv) = Network::new(node_count);
network.route_in_background();
let nodes = net_send
.into_iter()
.zip(net_recv)
.take(node_count - max_faulty)
.enumerate()
.map(|(i, (tx, rx))| {
// the first 5 proposals are going to be bad.
let proposal = if i < 5 {
i * 3 // proposals considered bad in the tests if they are % 3
} else {
(i * 3) + 1
};
let ctx = TestContext {
local_id: AuthorityId(i),
proposal: Mutex::new(proposal),
current_round: Arc::new(AtomicUsize::new(0)),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
agree(
ctx,
node_count,
max_faulty,
rx.map_err(|_| Error),
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
)
})
.collect::<Vec<_>>();
::futures::future::join_all(nodes)
.deadline(Instant::now() + Duration::from_millis(500))
}).unwrap();
for result in &results {
assert_eq!(&result.justification.digest, &results[0].justification.digest);
}
}
#[test]
fn consensus_does_not_complete_without_enough_nodes() {
let result = test_harness(|| {
let node_count = 10;
let max_faulty = 3;
let (network, net_send, net_recv) = Network::new(node_count);
network.route_in_background();
let nodes = net_send
.into_iter()
.zip(net_recv)
.take(node_count - max_faulty - 1)
.enumerate()
.map(|(i, (tx, rx))| {
let ctx = TestContext {
local_id: AuthorityId(i),
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
agree(
ctx,
node_count,
max_faulty,
rx.map_err(|_| Error),
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
)
})
.collect::<Vec<_>>();
::futures::future::join_all(nodes)
.deadline(Instant::now() + Duration::from_millis(500))
});
match result {
Ok(_) => panic!("completed wrongly"),
Err(ref e) if e.is_inner() => panic!("failed for wrong reason"),
Err(_) => {},
}
}
#[test]
fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
let locked_digest = Digest(999_999_999);
let results = test_harness(move || {
let node_count = 10;
let max_faulty = 3;
let locked_proposal = Candidate(999_999_999);
let locked_round = 1;
let justification = UncheckedJustification {
round_number: locked_round,
digest: locked_digest.clone(),
signatures: (0..7)
.map(|i| Signature(Message::Vote(Vote::Prepare(locked_round, locked_digest.clone())), AuthorityId(i)))
.collect()
}.check(7, |_, _, s| Some(s.1.clone())).unwrap();
let (network, net_send, net_recv) = Network::new(node_count);
network.route_in_background();
let nodes = net_send
.into_iter()
.zip(net_recv)
.enumerate()
.map(|(i, (tx, rx))| {
let ctx = TestContext {
local_id: AuthorityId(i),
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(locked_round + 1)),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
let mut agreement = agree(
ctx,
node_count,
max_faulty,
rx.map_err(|_| Error),
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
);
agreement.strategy.advance_to_round(
&agreement.context,
locked_round + 1
);
if i <= max_faulty {
agreement.strategy.locked = Some(Locked {
justification: justification.clone(),
})
}
if i == max_faulty {
agreement.strategy.notable_candidates.insert(
locked_digest.clone(),
locked_proposal.clone(),
);
}
agreement
})
.collect::<Vec<_>>();
::futures::future::join_all(nodes)
.deadline(Instant::now() + Duration::from_millis(1000))
}).unwrap();
for result in &results {
assert_eq!(&result.justification.digest, &locked_digest);
}
}
#[test]
fn consensus_completes_even_when_nodes_start_with_a_delay() {
let results = test_harness(|| {
let node_count = 10;
let max_faulty = 3;
let base_sleep = Duration::from_millis(75);
let now = Instant::now();
let (network, net_send, net_recv) = Network::new(node_count);
network.route_in_background();
let nodes = net_send
.into_iter()
.zip(net_recv)
.take(node_count - max_faulty)
.enumerate()
.map(move |(i, (tx, rx))| {
let ctx = TestContext {
local_id: AuthorityId(i),
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
let sleep_duration = base_sleep * i as u32;
Delay::new(now + sleep_duration).map_err(|_| Error).and_then(move |_| {
agree(
ctx,
node_count,
max_faulty,
rx.map_err(|_| Error),
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
)
})
})
.collect::<Vec<_>>();
::futures::future::join_all(nodes)
.deadline(now + Duration::from_millis(750))
}).unwrap();
for result in &results {
assert_eq!(&result.justification.digest, &results[0].justification.digest);
}
}
+73 -56
View File
@@ -31,7 +31,6 @@
//! set for this block height.
pub mod error;
pub mod generic;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
@@ -40,6 +39,7 @@ extern crate substrate_runtime_primitives as runtime_primitives;
extern crate ed25519;
extern crate tokio;
extern crate parking_lot;
extern crate rhododendron;
#[macro_use]
extern crate log;
@@ -66,15 +66,15 @@ use futures::sync::oneshot;
use tokio::timer::Delay;
use parking_lot::Mutex;
pub use generic::InputStreamConcluded;
pub use rhododendron::InputStreamConcluded;
pub use error::{Error, ErrorKind};
/// Messages over the proposal.
/// Each message carries an associated round number.
pub type Message<B> = generic::Message<B, <B as Block>::Hash>;
pub type Message<B> = rhododendron::Message<B, <B as Block>::Hash>;
/// Localized message type.
pub type LocalizedMessage<B> = generic::LocalizedMessage<
pub type LocalizedMessage<B> = rhododendron::LocalizedMessage<
B,
<B as Block>::Hash,
AuthorityId,
@@ -82,45 +82,62 @@ pub type LocalizedMessage<B> = generic::LocalizedMessage<
>;
/// Justification of some hash.
pub type Justification<H> = generic::Justification<H, LocalizedSignature>;
pub type Justification<H> = rhododendron::Justification<H, LocalizedSignature>;
/// Justification of a prepare message.
pub type PrepareJustification<H> = generic::PrepareJustification<H, LocalizedSignature>;
pub type PrepareJustification<H> = rhododendron::PrepareJustification<H, LocalizedSignature>;
/// Unchecked justification.
pub type UncheckedJustification<H> = generic::UncheckedJustification<H, LocalizedSignature>;
pub struct UncheckedJustification<H>(rhododendron::UncheckedJustification<H, LocalizedSignature>);
impl<H> UncheckedJustification<H> {
/// Create a new, unchecked justification.
pub fn new(digest: H, signatures: Vec<LocalizedSignature>, round_number: usize) -> Self {
UncheckedJustification(rhododendron::UncheckedJustification {
digest,
signatures,
round_number,
})
}
}
impl<H> From<rhododendron::UncheckedJustification<H, LocalizedSignature>> for UncheckedJustification<H> {
fn from(inner: rhododendron::UncheckedJustification<H, LocalizedSignature>) -> Self {
UncheckedJustification(inner)
}
}
impl<H> From<PrimitiveJustification<H>> for UncheckedJustification<H> {
fn from(just: PrimitiveJustification<H>) -> Self {
UncheckedJustification {
UncheckedJustification(rhododendron::UncheckedJustification {
round_number: just.round_number as usize,
digest: just.hash,
signatures: just.signatures.into_iter().map(|(from, sig)| LocalizedSignature {
signer: from.into(),
signature: sig,
}).collect(),
}
})
}
}
impl<H> Into<PrimitiveJustification<H>> for UncheckedJustification<H> {
fn into(self) -> PrimitiveJustification<H> {
PrimitiveJustification {
round_number: self.round_number as u32,
hash: self.digest,
signatures: self.signatures.into_iter().map(|s| (s.signer.into(), s.signature)).collect(),
round_number: self.0.round_number as u32,
hash: self.0.digest,
signatures: self.0.signatures.into_iter().map(|s| (s.signer.into(), s.signature)).collect(),
}
}
}
/// Result of a committed round of BFT
pub type Committed<B> = generic::Committed<B, <B as Block>::Hash, LocalizedSignature>;
pub type Committed<B> = rhododendron::Committed<B, <B as Block>::Hash, LocalizedSignature>;
/// Communication between BFT participants.
pub type Communication<B> = generic::Communication<B, <B as Block>::Hash, AuthorityId, LocalizedSignature>;
pub type Communication<B> = rhododendron::Communication<B, <B as Block>::Hash, AuthorityId, LocalizedSignature>;
/// Misbehavior observed from BFT participants.
pub type Misbehavior<H> = generic::Misbehavior<H, LocalizedSignature>;
pub type Misbehavior<H> = rhododendron::Misbehavior<H, LocalizedSignature>;
/// Environment producer for a BFT instance. Creates proposer instance and communication streams.
pub trait Environment<B: Block> {
@@ -187,7 +204,7 @@ struct BftInstance<B: Block, P> {
proposer: P,
}
impl<B: Block, P: Proposer<B>> generic::Context for BftInstance<B, P>
impl<B: Block, P: Proposer<B>> rhododendron::Context for BftInstance<B, P>
where
B: Clone + Eq,
B::Hash: ::std::hash::Hash,
@@ -250,7 +267,7 @@ pub struct BftFuture<B, P, I, InStream, OutSink> where
InStream: Stream<Item=Communication<B>, Error=P::Error>,
OutSink: Sink<SinkItem=Communication<B>, SinkError=P::Error>,
{
inner: generic::Agreement<BftInstance<B, P>, InStream, OutSink>,
inner: rhododendron::Agreement<BftInstance<B, P>, InStream, OutSink>,
cancel: Arc<AtomicBool>,
send_task: Option<oneshot::Sender<task::Task>>,
import: Arc<I>,
@@ -411,7 +428,7 @@ impl<B, P, I> BftService<B, P, I>
authorities: authorities,
};
let agreement = generic::agree(
let agreement = rhododendron::agree(
bft_instance,
n,
max_faulty,
@@ -467,7 +484,7 @@ fn check_justification_signed_message<H>(authorities: &[AuthorityId], message: &
-> Result<Justification<H>, UncheckedJustification<H>>
{
// TODO: return additional error information.
just.check(authorities.len() - max_faulty_of(authorities.len()), |_, _, sig| {
just.0.check(authorities.len() - max_faulty_of(authorities.len()), |_, _, sig| {
let auth_id = sig.signer.clone().into();
if !authorities.contains(&auth_id) { return None }
@@ -476,7 +493,7 @@ fn check_justification_signed_message<H>(authorities: &[AuthorityId], message: &
} else {
None
}
})
}).map_err(UncheckedJustification)
}
/// Check a full justification for a header hash.
@@ -488,7 +505,7 @@ pub fn check_justification<B: Block>(authorities: &[AuthorityId], parent: B::Has
{
let message = Slicable::encode(&PrimitiveMessage::<B, _> {
parent,
action: PrimitiveAction::Commit(just.round_number as u32, just.digest.clone()),
action: PrimitiveAction::Commit(just.0.round_number as u32, just.0.digest.clone()),
});
check_justification_signed_message(authorities, &message[..], just)
@@ -503,7 +520,7 @@ pub fn check_prepare_justification<B: Block>(authorities: &[AuthorityId], parent
{
let message = Slicable::encode(&PrimitiveMessage::<B, _> {
parent,
action: PrimitiveAction::Prepare(just.round_number as u32, just.digest.clone()),
action: PrimitiveAction::Prepare(just.0.round_number as u32, just.0.digest.clone()),
});
check_justification_signed_message(authorities, &message[..], just)
@@ -514,7 +531,7 @@ pub fn check_prepare_justification<B: Block>(authorities: &[AuthorityId], parent
pub fn check_proposal<B: Block + Clone>(
authorities: &[AuthorityId],
parent_hash: &B::Hash,
propose: &::generic::LocalizedProposal<B, B::Hash, AuthorityId, LocalizedSignature>)
propose: &::rhododendron::LocalizedProposal<B, B::Hash, AuthorityId, LocalizedSignature>)
-> Result<(), Error>
{
if !authorities.contains(&propose.sender) {
@@ -532,7 +549,7 @@ pub fn check_proposal<B: Block + Clone>(
pub fn check_vote<B: Block>(
authorities: &[AuthorityId],
parent_hash: &B::Hash,
vote: &::generic::LocalizedVote<B::Hash, AuthorityId, LocalizedSignature>)
vote: &::rhododendron::LocalizedVote<B::Hash, AuthorityId, LocalizedSignature>)
-> Result<(), Error>
{
if !authorities.contains(&vote.sender) {
@@ -540,9 +557,9 @@ pub fn check_vote<B: Block>(
}
let action = match vote.vote {
::generic::Vote::Prepare(r, ref h) => PrimitiveAction::Prepare(r as u32, h.clone()),
::generic::Vote::Commit(r, ref h) => PrimitiveAction::Commit(r as u32, h.clone()),
::generic::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32),
::rhododendron::Vote::Prepare(r, ref h) => PrimitiveAction::Prepare(r as u32, h.clone()),
::rhododendron::Vote::Commit(r, ref h) => PrimitiveAction::Commit(r as u32, h.clone()),
::rhododendron::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32),
};
check_action::<B>(action, parent_hash, &vote.signature)
}
@@ -579,12 +596,12 @@ pub fn sign_message<B: Block + Clone>(message: Message<B>, key: &ed25519::Pair,
};
match message {
::generic::Message::Propose(r, proposal) => {
::rhododendron::Message::Propose(r, proposal) => {
let header_hash = proposal.hash();
let action_header = PrimitiveAction::ProposeHeader(r as u32, header_hash.clone());
let action_propose = PrimitiveAction::Propose(r as u32, proposal.clone());
::generic::LocalizedMessage::Propose(::generic::LocalizedProposal {
::rhododendron::LocalizedMessage::Propose(::rhododendron::LocalizedProposal {
round_number: r,
proposal,
digest: header_hash,
@@ -593,14 +610,14 @@ pub fn sign_message<B: Block + Clone>(message: Message<B>, key: &ed25519::Pair,
full_signature: sign_action(action_propose),
})
}
::generic::Message::Vote(vote) => {
::rhododendron::Message::Vote(vote) => {
let action = match vote {
::generic::Vote::Prepare(r, ref h) => PrimitiveAction::Prepare(r as u32, h.clone()),
::generic::Vote::Commit(r, ref h) => PrimitiveAction::Commit(r as u32, h.clone()),
::generic::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32),
::rhododendron::Vote::Prepare(r, ref h) => PrimitiveAction::Prepare(r as u32, h.clone()),
::rhododendron::Vote::Commit(r, ref h) => PrimitiveAction::Commit(r as u32, h.clone()),
::rhododendron::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32),
};
::generic::LocalizedMessage::Vote(::generic::LocalizedVote {
::rhododendron::LocalizedMessage::Vote(::rhododendron::LocalizedVote {
vote: vote,
sender: signer.clone().into(),
signature: sign_action(action),
@@ -708,9 +725,9 @@ mod tests {
}
}
fn sign_vote(vote: ::generic::Vote<H256>, key: &ed25519::Pair, parent_hash: H256) -> LocalizedSignature {
fn sign_vote(vote: ::rhododendron::Vote<H256>, key: &ed25519::Pair, parent_hash: H256) -> LocalizedSignature {
match sign_message::<TestBlock>(vote.into(), key, parent_hash) {
::generic::LocalizedMessage::Vote(vote) => vote.signature,
::rhododendron::LocalizedMessage::Vote(vote) => vote.signature,
_ => panic!("signing vote leads to signed vote"),
}
}
@@ -792,45 +809,45 @@ mod tests {
Keyring::Eve.into(),
];
let unchecked = UncheckedJustification {
let unchecked = UncheckedJustification(rhododendron::UncheckedJustification {
digest: hash,
round_number: 1,
signatures: authorities_keys.iter().take(3).map(|key| {
sign_vote(generic::Vote::Commit(1, hash).into(), key, parent_hash)
sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash)
}).collect(),
};
});
assert!(check_justification::<TestBlock>(&authorities, parent_hash, unchecked).is_ok());
let unchecked = UncheckedJustification {
let unchecked = UncheckedJustification(rhododendron::UncheckedJustification {
digest: hash,
round_number: 0, // wrong round number (vs. the signatures)
signatures: authorities_keys.iter().take(3).map(|key| {
sign_vote(generic::Vote::Commit(1, hash).into(), key, parent_hash)
sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash)
}).collect(),
};
});
assert!(check_justification::<TestBlock>(&authorities, parent_hash, unchecked).is_err());
// not enough signatures.
let unchecked = UncheckedJustification {
let unchecked = UncheckedJustification(rhododendron::UncheckedJustification {
digest: hash,
round_number: 1,
signatures: authorities_keys.iter().take(2).map(|key| {
sign_vote(generic::Vote::Commit(1, hash).into(), key, parent_hash)
sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash)
}).collect(),
};
});
assert!(check_justification::<TestBlock>(&authorities, parent_hash, unchecked).is_err());
// wrong hash.
let unchecked = UncheckedJustification {
let unchecked = UncheckedJustification(rhododendron::UncheckedJustification {
digest: [0xfe; 32].into(),
round_number: 1,
signatures: authorities_keys.iter().take(3).map(|key| {
sign_vote(generic::Vote::Commit(1, hash).into(), key, parent_hash)
sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash)
}).collect(),
};
});
assert!(check_justification::<TestBlock>(&authorities, parent_hash, unchecked).is_err());
}
@@ -849,8 +866,8 @@ mod tests {
extrinsics: Default::default()
};
let proposal = sign_message(::generic::Message::Propose(1, block.clone()), &Keyring::Alice.pair(), parent_hash);;
if let ::generic::LocalizedMessage::Propose(proposal) = proposal {
let proposal = sign_message(::rhododendron::Message::Propose(1, block.clone()), &Keyring::Alice.pair(), parent_hash);;
if let ::rhododendron::LocalizedMessage::Propose(proposal) = proposal {
assert!(check_proposal(&authorities, &parent_hash, &proposal).is_ok());
let mut invalid_round = proposal.clone();
invalid_round.round_number = 0;
@@ -863,8 +880,8 @@ mod tests {
}
// Not an authority
let proposal = sign_message::<TestBlock>(::generic::Message::Propose(1, block), &Keyring::Bob.pair(), parent_hash);;
if let ::generic::LocalizedMessage::Propose(proposal) = proposal {
let proposal = sign_message::<TestBlock>(::rhododendron::Message::Propose(1, block), &Keyring::Bob.pair(), parent_hash);;
if let ::rhododendron::LocalizedMessage::Propose(proposal) = proposal {
assert!(check_proposal(&authorities, &parent_hash, &proposal).is_err());
} else {
assert!(false);
@@ -881,8 +898,8 @@ mod tests {
Keyring::Eve.to_raw_public().into(),
];
let vote = sign_message::<TestBlock>(::generic::Message::Vote(::generic::Vote::Prepare(1, hash)), &Keyring::Alice.pair(), parent_hash);;
if let ::generic::LocalizedMessage::Vote(vote) = vote {
let vote = sign_message::<TestBlock>(::rhododendron::Message::Vote(::rhododendron::Vote::Prepare(1, hash)), &Keyring::Alice.pair(), parent_hash);;
if let ::rhododendron::LocalizedMessage::Vote(vote) = vote {
assert!(check_vote::<TestBlock>(&authorities, &parent_hash, &vote).is_ok());
let mut invalid_sender = vote.clone();
invalid_sender.signature.signer = Keyring::Eve.into();
@@ -892,8 +909,8 @@ mod tests {
}
// Not an authority
let vote = sign_message::<TestBlock>(::generic::Message::Vote(::generic::Vote::Prepare(1, hash)), &Keyring::Bob.pair(), parent_hash);;
if let ::generic::LocalizedMessage::Vote(vote) = vote {
let vote = sign_message::<TestBlock>(::rhododendron::Message::Vote(::rhododendron::Vote::Prepare(1, hash)), &Keyring::Bob.pair(), parent_hash);;
if let ::rhododendron::LocalizedMessage::Vote(vote) = vote {
assert!(check_vote::<TestBlock>(&authorities, &parent_hash, &vote).is_err());
} else {
assert!(false);
+2 -1
View File
@@ -335,7 +335,8 @@ impl<B, E, Block> Client<B, E, Block> where
let is_new_best = header.number() == &(self.backend.blockchain().info()?.best_number + One::one());
trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number(), is_new_best, origin);
transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?;
let unchecked: bft::UncheckedJustification<_> = justification.uncheck().into();
transaction.set_block_data(header.clone(), body, Some(unchecked.into()), is_new_best)?;
if let Some(storage_update) = storage_update {
transaction.update_storage(storage_update)?;
}
@@ -11,6 +11,7 @@ substrate-runtime-io = { path = "../runtime-io", default-features = false }
[dev-dependencies]
substrate-bft = { path = "../bft" }
rhododendron = "0.2"
substrate-keyring = { path = "../keyring" }
[features]
@@ -27,6 +27,8 @@ extern crate substrate_runtime_primitives as runtime_primitives;
extern crate substrate_bft;
#[cfg(test)]
extern crate substrate_keyring as keyring;
#[cfg(test)]
extern crate rhododendron;
use codec::Slicable;
use primitives::{AuthorityId, Signature};
@@ -85,7 +87,6 @@ pub fn evaluate_misbehavior<B: Slicable, H: Slicable + Copy>(
mod tests {
use super::*;
use substrate_bft::generic;
use keyring::ed25519;
use keyring::Keyring;
@@ -95,26 +96,26 @@ mod tests {
fn sign_prepare(key: &ed25519::Pair, round: u32, hash: H256, parent_hash: H256) -> (H256, Signature) {
let msg = substrate_bft::sign_message::<Block>(
generic::Message::Vote(generic::Vote::Prepare(round as _, hash)),
rhododendron::Message::Vote(rhododendron::Vote::Prepare(round as _, hash)),
key,
parent_hash
);
match msg {
generic::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature),
rhododendron::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature),
_ => panic!("signing vote leads to signed vote"),
}
}
fn sign_commit(key: &ed25519::Pair, round: u32, hash: H256, parent_hash: H256) -> (H256, Signature) {
let msg = substrate_bft::sign_message::<Block>(
generic::Message::Vote(generic::Vote::Commit(round as _, hash)),
rhododendron::Message::Vote(rhododendron::Vote::Commit(round as _, hash)),
key,
parent_hash
);
match msg {
generic::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature),
rhododendron::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature),
_ => panic!("signing vote leads to signed vote"),
}
}
@@ -4,6 +4,7 @@ version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
rhododendron = "0.2"
substrate-bft = { path = "../bft" }
substrate-client = { path = "../client" }
substrate-codec = { path = "../codec" }
@@ -68,22 +68,22 @@ fn fake_justify(header: &runtime::Header) -> bft::UncheckedJustification<runtime
Keyring::Charlie.into(),
];
bft::UncheckedJustification {
digest: hash,
signatures: authorities.iter().map(|key| {
bft::UncheckedJustification::new(
hash,
authorities.iter().map(|key| {
let msg = bft::sign_message::<runtime::Block>(
bft::generic::Vote::Commit(1, hash).into(),
::rhododendron::Vote::Commit(1, hash).into(),
key,
header.parent_hash
);
match msg {
bft::generic::LocalizedMessage::Vote(vote) => vote.signature,
::rhododendron::LocalizedMessage::Vote(vote) => vote.signature,
_ => panic!("signing vote leads to signed vote"),
}
}).collect(),
round_number: 1,
}
1,
)
}
fn genesis_config() -> GenesisConfig {
@@ -18,6 +18,7 @@
#![warn(missing_docs)]
extern crate rhododendron;
extern crate substrate_bft as bft;
extern crate substrate_codec as codec;
extern crate substrate_keyring as keyring;