Merge branch 'master' into gav-optional-storage

This commit is contained in:
Gav
2018-02-15 17:55:01 +01:00
42 changed files with 1611 additions and 1540 deletions
+46 -9
View File
@@ -970,15 +970,6 @@ dependencies = [
"substrate-state-machine 0.1.0",
]
[[package]]
name = "polkadot-candidate-agreement"
version = "0.1.0"
dependencies = [
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "polkadot-cli"
version = "0.1.0"
@@ -1011,6 +1002,21 @@ dependencies = [
"substrate-primitives 0.1.0",
]
[[package]]
name = "polkadot-consensus"
version = "0.1.0"
dependencies = [
"ed25519 0.1.0",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"polkadot-primitives 0.1.0",
"polkadot-statement-table 0.1.0",
"substrate-bft 0.1.0",
"substrate-codec 0.1.0",
"substrate-primitives 0.1.0",
"tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "polkadot-executor"
version = "0.1.0"
@@ -1058,6 +1064,14 @@ dependencies = [
"substrate-runtime-support 0.1.0",
]
[[package]]
name = "polkadot-statement-table"
version = "0.1.0"
dependencies = [
"polkadot-primitives 0.1.0",
"substrate-primitives 0.1.0",
]
[[package]]
name = "polkadot-validator"
version = "0.1.0"
@@ -1378,6 +1392,24 @@ name = "strsim"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "substrate-bft"
version = "0.1.0"
dependencies = [
"ed25519 0.1.0",
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-client 0.1.0",
"substrate-codec 0.1.0",
"substrate-executor 0.1.0",
"substrate-keyring 0.1.0",
"substrate-primitives 0.1.0",
"substrate-state-machine 0.1.0",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "substrate-client"
version = "0.1.0"
@@ -1451,10 +1483,14 @@ dependencies = [
"serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-client 0.1.0",
"substrate-codec 0.1.0",
"substrate-executor 0.1.0",
"substrate-keyring 0.1.0",
"substrate-primitives 0.1.0",
"substrate-runtime-support 0.1.0",
"substrate-serializer 0.1.0",
"substrate-state-machine 0.1.0",
"substrate-test-runtime 0.1.0",
]
[[package]]
@@ -1487,6 +1523,7 @@ dependencies = [
"substrate-client 0.1.0",
"substrate-executor 0.1.0",
"substrate-primitives 0.1.0",
"substrate-runtime-support 0.1.0",
"substrate-state-machine 0.1.0",
]
+3 -1
View File
@@ -14,13 +14,15 @@ polkadot-cli = { path = "polkadot/cli" }
[workspace]
members = [
"polkadot/api",
"polkadot/candidate-agreement",
"polkadot/cli",
"polkadot/collator",
"polkadot/consensus",
"polkadot/executor",
"polkadot/primitives",
"polkadot/runtime",
"polkadot/statement-table",
"polkadot/validator",
"substrate/bft",
"substrate/client",
"substrate/codec",
"substrate/environmental",
@@ -1,9 +0,0 @@
[package]
name = "polkadot-candidate-agreement"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1.17"
parking_lot = "0.4"
tokio-timer = "0.1.2"
@@ -1,214 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A stream that handles incoming messages to the BFT agreement module and statement
//! table. It forwards as necessary, and dispatches requests for determining availability
//! and validity of candidates as necessary.
use std::collections::HashSet;
use futures::prelude::*;
use futures::stream::{Fuse, FuturesUnordered};
use futures::sync::mpsc;
use table::{self, Statement, Context as TableContext};
use super::{Context, CheckedMessage, SharedTable, TypeResolve};
enum CheckResult {
Available,
Unavailable,
Valid,
Invalid,
}
enum Checking<D, A, V> {
Availability(D, A),
Validity(D, V),
}
impl<D, A, V, E> Future for Checking<D, A, V>
where
D: Clone,
A: Future<Item=bool,Error=E>,
V: Future<Item=bool,Error=E>,
{
type Item = (D, CheckResult);
type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(match *self {
Checking::Availability(ref digest, ref mut f) => {
match try_ready!(f.poll()) {
true => (digest.clone(), CheckResult::Available),
false => (digest.clone(), CheckResult::Unavailable),
}
}
Checking::Validity(ref digest, ref mut f) => {
match try_ready!(f.poll()) {
true => (digest.clone(), CheckResult::Valid),
false => (digest.clone(), CheckResult::Invalid),
}
}
}))
}
}
/// Handles incoming messages to the BFT service and statement table.
///
/// Also triggers requests for determining validity and availability of other
/// parachain candidates.
pub struct HandleIncoming<C: Context, I> {
table: SharedTable<C>,
messages_in: Fuse<I>,
bft_out: mpsc::UnboundedSender<<C as TypeResolve>::BftCommunication>,
local_id: C::AuthorityId,
requesting_about: FuturesUnordered<Checking<
C::Digest,
<C::CheckAvailability as IntoFuture>::Future,
<C::CheckCandidate as IntoFuture>::Future,
>>,
checked_validity: HashSet<C::Digest>,
checked_availability: HashSet<C::Digest>,
}
impl<C: Context, I> HandleIncoming<C, I> {
fn sign_and_import_statement(&self, digest: C::Digest, result: CheckResult) {
let statement = match result {
CheckResult::Valid => Statement::Valid(digest),
CheckResult::Invalid => Statement::Invalid(digest),
CheckResult::Available => Statement::Available(digest),
CheckResult::Unavailable => return, // no such statement and not provable.
};
// TODO: trigger broadcast to peers immediately?
self.table.sign_and_import(statement);
}
fn import_message(&mut self, origin: C::AuthorityId, message: CheckedMessage<C>) {
match message {
CheckedMessage::Bft(msg) => { let _ = self.bft_out.unbounded_send(msg); }
CheckedMessage::Table(table_messages) => {
// import all table messages and check for any that we
// need to produce statements for.
let msg_iter = table_messages
.into_iter()
.map(|m| (m, Some(origin.clone())));
let summaries: Vec<_> = self.table.import_statements(msg_iter);
for summary in summaries {
self.dispatch_on_summary(summary)
}
}
}
}
// on new candidates in our group, begin checking validity.
// on new candidates in our availability sphere, begin checking availability.
fn dispatch_on_summary(&mut self, summary: table::Summary<C::Digest, C::GroupId>) {
let is_validity_member =
self.table.context().is_member_of(&self.local_id, &summary.group_id);
let is_availability_member =
self.table.context().is_availability_guarantor_of(&self.local_id, &summary.group_id);
let digest = &summary.candidate;
// TODO: consider a strategy based on the number of candidate votes as well.
let checking_validity =
is_validity_member &&
self.checked_validity.insert(digest.clone()) &&
self.table.proposed_digest() != Some(digest.clone());
let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone());
if checking_validity || checking_availability {
let context = &*self.table.context();
let requesting_about = &mut self.requesting_about;
self.table.with_candidate(digest, |c| match c {
None => {} // TODO: handle table inconsistency somehow?
Some(candidate) => {
if checking_validity {
let future = context.check_validity(candidate).into_future();
let checking = Checking::Validity(digest.clone(), future);
requesting_about.push(checking);
}
if checking_availability {
let future = context.check_availability(candidate).into_future();
let checking = Checking::Availability(digest.clone(), future);
requesting_about.push(checking);
}
}
})
}
}
}
impl<C, I, E> HandleIncoming<C, I>
where
C: Context,
I: Stream<Item=(C::AuthorityId, CheckedMessage<C>),Error=E>,
C::CheckAvailability: IntoFuture<Error=E>,
C::CheckCandidate: IntoFuture<Error=E>,
{
pub fn new(
table: SharedTable<C>,
messages_in: I,
bft_out: mpsc::UnboundedSender<<C as TypeResolve>::BftCommunication>,
) -> Self {
let local_id = table.context().local_id();
HandleIncoming {
table,
bft_out,
local_id,
messages_in: messages_in.fuse(),
requesting_about: FuturesUnordered::new(),
checked_validity: HashSet::new(),
checked_availability: HashSet::new(),
}
}
}
impl<C, I, E> Future for HandleIncoming<C, I>
where
C: Context,
I: Stream<Item=(C::AuthorityId, CheckedMessage<C>),Error=E>,
C::CheckAvailability: IntoFuture<Error=E>,
C::CheckCandidate: IntoFuture<Error=E>,
{
type Item = ();
type Error = E;
fn poll(&mut self) -> Poll<(), E> {
loop {
// FuturesUnordered is safe to poll after it has completed.
while let Async::Ready(Some((d, r))) = self.requesting_about.poll()? {
self.sign_and_import_statement(d, r);
}
match try_ready!(self.messages_in.poll()) {
None => if self.requesting_about.is_empty() {
return Ok(Async::Ready(()))
} else {
return Ok(Async::NotReady)
},
Some((origin, msg)) => self.import_message(origin, msg),
}
}
}
}
@@ -1,625 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Propagation and agreement of candidates.
//!
//! Authorities are split into groups by parachain, and each authority might come
//! up its own candidate for their parachain. Within groups, authorities pass around
//! their candidates and produce statements of validity.
//!
//! Any candidate that receives majority approval by the authorities in a group
//! may be subject to inclusion, unless any authorities flag that candidate as invalid.
//!
//! Wrongly flagging as invalid should be strongly disincentivized, so that in the
//! equilibrium state it is not expected to happen. Likewise with the submission
//! of invalid blocks.
//!
//! Groups themselves may be compromised by malicious authorities.
#[macro_use]
extern crate futures;
extern crate parking_lot;
extern crate tokio_timer;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
use futures::prelude::*;
use futures::sync::{mpsc, oneshot};
use parking_lot::Mutex;
use tokio_timer::Timer;
use table::Table;
mod bft;
mod handle_incoming;
mod round_robin;
mod table;
#[cfg(test)]
pub mod tests;
/// Context necessary for agreement.
pub trait Context: Send + Clone {
/// A authority ID
type AuthorityId: Debug + Hash + Eq + Clone + Ord;
/// The digest (hash or other unique attribute) of a candidate.
type Digest: Debug + Hash + Eq + Clone;
/// The group ID type
type GroupId: Debug + Hash + Ord + Eq + Clone;
/// A signature type.
type Signature: Debug + Eq + Clone;
/// Candidate type. In practice this will be a candidate receipt.
type ParachainCandidate: Debug + Ord + Eq + Clone;
/// The actual block proposal type. This is what is agreed upon, and
/// is composed of multiple candidates.
type Proposal: Debug + Eq + Clone;
/// A future that resolves when a candidate is checked for validity.
///
/// In Polkadot, this will involve fetching the corresponding block data,
/// producing the necessary ingress, and running the parachain validity function.
type CheckCandidate: IntoFuture<Item=bool>;
/// A future that resolves when availability of a candidate's external
/// data is checked.
type CheckAvailability: IntoFuture<Item=bool>;
/// The statement batch type.
type StatementBatch: StatementBatch<
Self::AuthorityId,
table::SignedStatement<Self::ParachainCandidate, Self::Digest, Self::AuthorityId, Self::Signature>,
>;
/// Get the digest of a candidate.
fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest;
/// Get the digest of a proposal.
fn proposal_digest(proposal: &Self::Proposal) -> Self::Digest;
/// Get the group of a candidate.
fn candidate_group(candidate: &Self::ParachainCandidate) -> Self::GroupId;
/// Get the primary for a given round.
fn round_proposer(&self, round: usize) -> Self::AuthorityId;
/// Check a candidate for validity.
fn check_validity(&self, candidate: &Self::ParachainCandidate) -> Self::CheckCandidate;
/// Check availability of candidate data.
fn check_availability(&self, candidate: &Self::ParachainCandidate) -> Self::CheckAvailability;
/// Attempt to combine a set of parachain candidates into a proposal.
///
/// This may arbitrarily return `None`, but the intent is for `Some`
/// to only be returned when candidates from enough groups are known.
///
/// "enough" may be subjective as well.
fn create_proposal(&self, candidates: Vec<&Self::ParachainCandidate>)
-> Option<Self::Proposal>;
/// Check validity of a proposal. This should call out to the `check_candidate`
/// function for all parachain candidates contained within it, as well as
/// checking other validity constraints of the proposal.
fn proposal_valid<F>(&self, proposal: &Self::Proposal, check_candidate: F) -> bool
where F: FnMut(&Self::ParachainCandidate) -> bool;
/// Get the local authority ID.
fn local_id(&self) -> Self::AuthorityId;
/// Sign a table validity statement with the local key.
fn sign_table_statement(
&self,
statement: &table::Statement<Self::ParachainCandidate, Self::Digest>
) -> Self::Signature;
/// Sign a BFT agreement message.
fn sign_bft_message(&self, &bft::Message<Self::Proposal, Self::Digest>) -> Self::Signature;
}
/// Helper for type resolution for contexts until type aliases apply bounds.
pub trait TypeResolve {
type SignedTableStatement;
type BftCommunication;
type BftCommitted;
type Misbehavior;
}
impl<C: Context> TypeResolve for C {
type SignedTableStatement = table::SignedStatement<C::ParachainCandidate, C::Digest, C::AuthorityId, C::Signature>;
type BftCommunication = bft::Communication<C::Proposal, C::Digest, C::AuthorityId, C::Signature>;
type BftCommitted = bft::Committed<C::Proposal,C::Digest,C::Signature>;
type Misbehavior = table::Misbehavior<C::ParachainCandidate, C::Digest, C::AuthorityId, C::Signature>;
}
/// Information about a specific group.
#[derive(Debug, Clone)]
pub struct GroupInfo<V: Hash + Eq> {
/// Authorities meant to check validity of candidates.
pub validity_guarantors: HashSet<V>,
/// Authorities meant to check availability of candidate data.
pub availability_guarantors: HashSet<V>,
/// Number of votes needed for validity.
pub needed_validity: usize,
/// Number of votes needed for availability.
pub needed_availability: usize,
}
struct TableContext<C: Context> {
context: C,
groups: HashMap<C::GroupId, GroupInfo<C::AuthorityId>>,
}
impl<C: Context> ::std::ops::Deref for TableContext<C> {
type Target = C;
fn deref(&self) -> &C {
&self.context
}
}
impl<C: Context> table::Context for TableContext<C> {
type AuthorityId = C::AuthorityId;
type Digest = C::Digest;
type GroupId = C::GroupId;
type Signature = C::Signature;
type Candidate = C::ParachainCandidate;
fn candidate_digest(candidate: &Self::Candidate) -> Self::Digest {
C::candidate_digest(candidate)
}
fn candidate_group(candidate: &Self::Candidate) -> Self::GroupId {
C::candidate_group(candidate)
}
fn is_member_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool {
self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority))
}
fn is_availability_guarantor_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool {
self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority))
}
fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize) {
self.groups.get(group).map_or(
(usize::max_value(), usize::max_value()),
|g| (g.needed_validity, g.needed_availability),
)
}
}
// A shared table object.
struct SharedTableInner<C: Context> {
table: Table<TableContext<C>>,
proposed_digest: Option<C::Digest>,
awaiting_proposal: Vec<oneshot::Sender<C::Proposal>>,
}
impl<C: Context> SharedTableInner<C> {
fn import_statement(
&mut self,
context: &TableContext<C>,
statement: <C as TypeResolve>::SignedTableStatement,
received_from: Option<C::AuthorityId>
) -> Option<table::Summary<C::Digest, C::GroupId>> {
self.table.import_statement(context, statement, received_from)
}
fn update_proposal(&mut self, context: &TableContext<C>) {
if self.awaiting_proposal.is_empty() { return }
let proposal_candidates = self.table.proposed_candidates(context);
if let Some(proposal) = context.context.create_proposal(proposal_candidates) {
for sender in self.awaiting_proposal.drain(..) {
let _ = sender.send(proposal.clone());
}
}
}
fn get_proposal(&mut self, context: &TableContext<C>) -> oneshot::Receiver<C::Proposal> {
let (tx, rx) = oneshot::channel();
self.awaiting_proposal.push(tx);
self.update_proposal(context);
rx
}
fn proposal_valid(&mut self, context: &TableContext<C>, proposal: &C::Proposal) -> bool {
context.context.proposal_valid(proposal, |contained_candidate| {
// check that the candidate is valid (has enough votes)
let digest = C::candidate_digest(contained_candidate);
self.table.candidate_includable(&digest, context)
})
}
}
/// A shared table object.
pub struct SharedTable<C: Context> {
context: Arc<TableContext<C>>,
inner: Arc<Mutex<SharedTableInner<C>>>,
}
impl<C: Context> Clone for SharedTable<C> {
fn clone(&self) -> Self {
SharedTable {
context: self.context.clone(),
inner: self.inner.clone()
}
}
}
impl<C: Context> SharedTable<C> {
/// Create a new shared table.
pub fn new(context: C, groups: HashMap<C::GroupId, GroupInfo<C::AuthorityId>>) -> Self {
SharedTable {
context: Arc::new(TableContext { context, groups }),
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
awaiting_proposal: Vec::new(),
proposed_digest: None,
}))
}
}
/// Import a single statement.
pub fn import_statement(
&self,
statement: <C as TypeResolve>::SignedTableStatement,
received_from: Option<C::AuthorityId>,
) -> Option<table::Summary<C::Digest, C::GroupId>> {
self.inner.lock().import_statement(&*self.context, statement, received_from)
}
/// Sign and import a local statement.
pub fn sign_and_import(
&self,
statement: table::Statement<C::ParachainCandidate, C::Digest>,
) -> Option<table::Summary<C::Digest, C::GroupId>> {
let proposed_digest = match statement {
table::Statement::Candidate(ref c) => Some(C::candidate_digest(c)),
_ => None,
};
let signed_statement = table::SignedStatement {
signature: self.context.sign_table_statement(&statement),
sender: self.context.local_id(),
statement,
};
let mut inner = self.inner.lock();
if proposed_digest.is_some() {
inner.proposed_digest = proposed_digest;
}
inner.import_statement(&*self.context, signed_statement, None)
}
/// Import many statements at once.
///
/// Provide an iterator yielding pairs of (statement, received_from).
pub fn import_statements<I, U>(&self, iterable: I) -> U
where
I: IntoIterator<Item=(<C as TypeResolve>::SignedTableStatement, Option<C::AuthorityId>)>,
U: ::std::iter::FromIterator<table::Summary<C::Digest, C::GroupId>>,
{
let mut inner = self.inner.lock();
iterable.into_iter().filter_map(move |(statement, received_from)| {
inner.import_statement(&*self.context, statement, received_from)
}).collect()
}
/// Update the proposal sealing.
pub fn update_proposal(&self) {
self.inner.lock().update_proposal(&*self.context)
}
/// Register interest in receiving a proposal when ready.
/// If one is ready immediately, it will be provided.
pub fn get_proposal(&self) -> oneshot::Receiver<C::Proposal> {
self.inner.lock().get_proposal(&*self.context)
}
/// Check if a proposal is valid.
pub fn proposal_valid(&self, proposal: &C::Proposal) -> bool {
self.inner.lock().proposal_valid(&*self.context, proposal)
}
/// Execute a closure using a specific candidate.
///
/// Deadlocks if called recursively.
pub fn with_candidate<F, U>(&self, digest: &C::Digest, f: F) -> U
where F: FnOnce(Option<&C::ParachainCandidate>) -> U
{
let inner = self.inner.lock();
f(inner.table.get_candidate(digest))
}
/// Get all witnessed misbehavior.
pub fn get_misbehavior(&self) -> HashMap<C::AuthorityId, <C as TypeResolve>::Misbehavior> {
self.inner.lock().table.get_misbehavior().clone()
}
/// Fill a statement batch.
pub fn fill_batch(&self, batch: &mut C::StatementBatch) {
self.inner.lock().table.fill_batch(batch);
}
/// Get the local proposed candidate digest.
pub fn proposed_digest(&self) -> Option<C::Digest> {
self.inner.lock().proposed_digest.clone()
}
// Get a handle to the table context.
fn context(&self) -> &TableContext<C> {
&*self.context
}
}
/// Errors that can occur during agreement.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Error {
IoTerminated,
FaultyTimer,
CannotPropose,
}
impl From<bft::InputStreamConcluded> for Error {
fn from(_: bft::InputStreamConcluded) -> Error {
Error::IoTerminated
}
}
/// Context owned by the BFT future necessary to execute the logic.
pub struct BftContext<C: Context> {
context: C,
table: SharedTable<C>,
timer: Timer,
round_timeout_multiplier: u64,
}
impl<C: Context> bft::Context for BftContext<C>
where C::Proposal: 'static,
{
type AuthorityId = C::AuthorityId;
type Digest = C::Digest;
type Signature = C::Signature;
type Candidate = C::Proposal;
type RoundTimeout = Box<Future<Item=(),Error=Error>>;
type CreateProposal = Box<Future<Item=Self::Candidate,Error=Error>>;
fn local_id(&self) -> Self::AuthorityId {
self.context.local_id()
}
fn proposal(&self) -> Self::CreateProposal {
Box::new(self.table.get_proposal().map_err(|_| Error::CannotPropose))
}
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest {
C::proposal_digest(candidate)
}
fn sign_local(&self, message: bft::Message<Self::Candidate, Self::Digest>)
-> bft::LocalizedMessage<Self::Candidate, Self::Digest, Self::AuthorityId, Self::Signature>
{
let sender = self.local_id();
let signature = self.context.sign_bft_message(&message);
bft::LocalizedMessage {
message,
sender,
signature,
}
}
fn round_proposer(&self, round: usize) -> Self::AuthorityId {
self.context.round_proposer(round)
}
fn candidate_valid(&self, proposal: &Self::Candidate) -> bool {
self.table.proposal_valid(proposal)
}
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
let round = ::std::cmp::min(63, round) as u32;
let timeout = 1u64.checked_shl(round)
.unwrap_or_else(u64::max_value)
.saturating_mul(self.round_timeout_multiplier);
Box::new(self.timer.sleep(Duration::from_secs(timeout))
.map_err(|_| Error::FaultyTimer))
}
}
/// Parameters necessary for agreement.
pub struct AgreementParams<C: Context> {
/// The context itself.
pub context: C,
/// For scheduling timeouts.
pub timer: Timer,
/// The statement table.
pub table: SharedTable<C>,
/// The number of nodes.
pub nodes: usize,
/// The maximum number of faulty nodes.
pub max_faulty: usize,
/// The round timeout multiplier: 2^round_number is multiplied by this.
pub round_timeout_multiplier: u64,
/// The maximum amount of messages to queue.
pub message_buffer_size: usize,
/// Interval to attempt forming proposals over.
pub form_proposal_interval: Duration,
}
/// Recovery for messages
pub trait MessageRecovery<C: Context> {
/// The unchecked message type. This implies that work hasn't been done
/// to decode the payload and check and authenticate a signature.
type UncheckedMessage;
/// Attempt to transform a checked message into an unchecked.
fn check_message(&self, Self::UncheckedMessage) -> Option<CheckedMessage<C>>;
}
/// A batch of statements to send out.
pub trait StatementBatch<V, T> {
/// Get the target authorities of these statements.
fn targets(&self) -> &[V];
/// If the batch is empty.
fn is_empty(&self) -> bool;
/// Push a statement onto the batch. Returns false when the batch is full.
///
/// This is meant to do work like incrementally serializing the statements
/// into a vector of bytes while making sure the length is below a certain
/// amount.
fn push(&mut self, statement: T) -> bool;
}
/// Recovered and fully checked messages.
pub enum CheckedMessage<C: Context> {
/// Messages meant for the BFT agreement logic.
Bft(<C as TypeResolve>::BftCommunication),
/// Statements circulating about the table.
Table(Vec<<C as TypeResolve>::SignedTableStatement>),
}
/// Outgoing messages to the network.
#[derive(Debug, Clone)]
pub enum OutgoingMessage<C: Context> {
/// Messages meant for BFT agreement peers.
Bft(<C as TypeResolve>::BftCommunication),
/// Batches of table statements.
Table(C::StatementBatch),
}
/// Create an agreement future, and I/O streams.
// TODO: kill 'static bounds and use impl Future.
pub fn agree<
Context,
NetIn,
NetOut,
Recovery,
PropagateStatements,
LocalCandidate,
Err,
>(
params: AgreementParams<Context>,
net_in: NetIn,
net_out: NetOut,
recovery: Recovery,
propagate_statements: PropagateStatements,
local_candidate: LocalCandidate,
)
-> Box<Future<Item=<Context as TypeResolve>::BftCommitted,Error=Error>>
where
Context: ::Context + 'static,
Context::CheckCandidate: IntoFuture<Error=Err>,
Context::CheckAvailability: IntoFuture<Error=Err>,
NetIn: Stream<Item=(Context::AuthorityId, Vec<Recovery::UncheckedMessage>),Error=Err> + 'static,
NetOut: Sink<SinkItem=OutgoingMessage<Context>> + 'static,
Recovery: MessageRecovery<Context> + 'static,
PropagateStatements: Stream<Item=Context::StatementBatch,Error=Err> + 'static,
LocalCandidate: IntoFuture<Item=Context::ParachainCandidate> + 'static
{
let (bft_in_in, bft_in_out) = mpsc::unbounded();
let (bft_out_in, bft_out_out) = mpsc::unbounded();
let agreement = {
let bft_context = BftContext {
context: params.context,
table: params.table.clone(),
timer: params.timer.clone(),
round_timeout_multiplier: params.round_timeout_multiplier,
};
bft::agree(
bft_context,
params.nodes,
params.max_faulty,
bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated),
bft_out_in.sink_map_err(|_| Error::IoTerminated),
)
};
let route_messages_in = {
let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size);
let round_robin_recovered = round_robin
.filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x)));
handle_incoming::HandleIncoming::new(
params.table.clone(),
round_robin_recovered,
bft_in_in,
).map_err(|_| Error::IoTerminated)
};
let route_messages_out = {
let table = params.table.clone();
let periodic_table_statements = propagate_statements
.or_else(|_| ::futures::future::empty()) // halt the stream instead of error.
.map(move |mut batch| { table.fill_batch(&mut batch); batch })
.filter(|b| !b.is_empty())
.map(OutgoingMessage::Table);
let complete_out_stream = bft_out_out
.map_err(|_| Error::IoTerminated)
.map(|bft::ContextCommunication(x)| x)
.map(OutgoingMessage::Bft)
.select(periodic_table_statements);
net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream)
};
let import_local_candidate = {
let table = params.table.clone();
local_candidate
.into_future()
.map(table::Statement::Candidate)
.map(Some)
.or_else(|_| Ok(None))
.map(move |s| if let Some(s) = s {
table.sign_and_import(s);
})
};
let create_proposal_on_interval = {
let table = params.table;
params.timer.interval(params.form_proposal_interval)
.map_err(|_| Error::FaultyTimer)
.for_each(move |_| { table.update_proposal(); Ok(()) })
};
// if these auxiliary futures terminate before the agreement, then
// that is an error.
let auxiliary_futures = route_messages_in.join4(
create_proposal_on_interval,
route_messages_out,
import_local_candidate,
).and_then(|_| Err(Error::IoTerminated));
let future = agreement
.select(auxiliary_futures)
.map(|(committed, _)| committed)
.map_err(|(e, _)| e);
Box::new(future)
}
@@ -1,164 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Round-robin buffer for incoming messages.
//!
//! This takes batches of messages associated with a sender as input,
//! and yields messages in a fair order by sender.
use std::collections::{Bound, BTreeMap, VecDeque};
use futures::prelude::*;
use futures::stream::Fuse;
/// Implementation of the round-robin buffer for incoming messages.
#[derive(Debug)]
pub struct RoundRobinBuffer<V: Ord + Eq, S, M> {
buffer: BTreeMap<V, VecDeque<M>>,
last_processed_from: Option<V>,
stored_messages: usize,
max_messages: usize,
inner: Fuse<S>,
}
impl<V: Ord + Eq + Clone, S: Stream, M> RoundRobinBuffer<V, S, M> {
/// Create a new round-robin buffer which holds up to a maximum
/// amount of messages.
pub fn new(stream: S, buffer_size: usize) -> Self {
RoundRobinBuffer {
buffer: BTreeMap::new(),
last_processed_from: None,
stored_messages: 0,
max_messages: buffer_size,
inner: stream.fuse(),
}
}
}
impl<V: Ord + Eq + Clone, S, M> RoundRobinBuffer<V, S, M> {
fn next_message(&mut self) -> Option<(V, M)> {
if self.stored_messages == 0 {
return None
}
// first pick up from the last authority we processed a message from
let mut next = {
let lower_bound = match self.last_processed_from {
None => Bound::Unbounded,
Some(ref x) => Bound::Excluded(x.clone()),
};
self.buffer.range_mut((lower_bound, Bound::Unbounded))
.filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v)))
.next()
};
// but wrap around to the beginning again if we got nothing.
if next.is_none() {
next = self.buffer.iter_mut()
.filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v)))
.next();
}
if let Some((ref authority, _)) = next {
self.stored_messages -= 1;
self.last_processed_from = Some(authority.clone());
}
next
}
// import messages, discarding when the buffer is full.
fn import_messages(&mut self, sender: V, messages: Vec<M>) {
let space_remaining = self.max_messages - self.stored_messages;
self.stored_messages += ::std::cmp::min(space_remaining, messages.len());
let v = self.buffer.entry(sender).or_insert_with(VecDeque::new);
v.extend(messages.into_iter().take(space_remaining));
}
}
impl<V: Ord + Eq + Clone, S, M> Stream for RoundRobinBuffer<V, S, M>
where S: Stream<Item=(V, Vec<M>)>
{
type Item = (V, M);
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, S::Error> {
loop {
match self.inner.poll()? {
Async::NotReady | Async::Ready(None) => break,
Async::Ready(Some((sender, msgs))) => self.import_messages(sender, msgs),
}
}
let done = self.inner.is_done();
Ok(match self.next_message() {
Some(msg) => Async::Ready(Some(msg)),
None => if done { Async::Ready(None) } else { Async::NotReady },
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream::{self, Stream};
#[derive(Debug, PartialEq, Eq)]
struct UncheckedMessage { data: Vec<u8> }
#[test]
fn is_fair_and_wraps_around() {
let stream = stream::iter_ok(vec![
(1, vec![
UncheckedMessage { data: vec![1, 3, 5] },
UncheckedMessage { data: vec![3, 5, 7] },
UncheckedMessage { data: vec![5, 7, 9] },
]),
(2, vec![
UncheckedMessage { data: vec![2, 4, 6] },
UncheckedMessage { data: vec![4, 6, 8] },
UncheckedMessage { data: vec![6, 8, 10] },
]),
]);
let round_robin = RoundRobinBuffer::new(stream, 100);
let output = round_robin.wait().collect::<Result<Vec<_>, ()>>().unwrap();
assert_eq!(output, vec![
(1, UncheckedMessage { data: vec![1, 3, 5] }),
(2, UncheckedMessage { data: vec![2, 4, 6] }),
(1, UncheckedMessage { data: vec![3, 5, 7] }),
(2, UncheckedMessage { data: vec![4, 6, 8] }),
(1, UncheckedMessage { data: vec![5, 7, 9] }),
(2, UncheckedMessage { data: vec![6, 8, 10] }),
]);
}
#[test]
fn discards_when_full() {
let stream = stream::iter_ok(vec![
(1, (0..200).map(|i| UncheckedMessage { data: vec![i] }).collect())
]);
let round_robin = RoundRobinBuffer::new(stream, 100);
let output = round_robin.wait().collect::<Result<Vec<_>, ()>>().unwrap();
assert_eq!(output.len(), 100);
}
}
@@ -1,385 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Tests and test helpers for the candidate agreement.
const VALIDITY_CHECK_DELAY_MS: u64 = 100;
const AVAILABILITY_CHECK_DELAY_MS: u64 = 100;
const PROPOSAL_FORMATION_TICK_MS: u64 = 50;
const PROPAGATE_STATEMENTS_TICK_MS: u64 = 200;
const TIMER_TICK_DURATION_MS: u64 = 10;
use std::collections::HashMap;
use futures::prelude::*;
use futures::sync::mpsc;
use tokio_timer::Timer;
use super::*;
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)]
struct AuthorityId(usize);
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
struct Digest(Vec<usize>);
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
struct GroupId(usize);
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
struct ParachainCandidate {
group: GroupId,
data: usize,
}
#[derive(PartialEq, Eq, Debug, Clone)]
struct Proposal {
candidates: Vec<ParachainCandidate>,
}
#[derive(PartialEq, Eq, Debug, Clone)]
enum Signature {
Table(AuthorityId, table::Statement<ParachainCandidate, Digest>),
Bft(AuthorityId, bft::Message<Proposal, Digest>),
}
enum Error {
Timer(tokio_timer::TimerError),
NetOut,
NetIn,
}
#[derive(Debug, Clone)]
struct SharedTestContext {
n_authorities: usize,
n_groups: usize,
timer: Timer,
}
#[derive(Debug, Clone)]
struct TestContext {
shared: Arc<SharedTestContext>,
local_id: AuthorityId,
}
impl Context for TestContext {
type AuthorityId = AuthorityId;
type Digest = Digest;
type GroupId = GroupId;
type Signature = Signature;
type Proposal = Proposal;
type ParachainCandidate = ParachainCandidate;
type CheckCandidate = Box<Future<Item=bool,Error=Error>>;
type CheckAvailability = Box<Future<Item=bool,Error=Error>>;
type StatementBatch = VecBatch<
AuthorityId,
table::SignedStatement<ParachainCandidate, Digest, AuthorityId, Signature>
>;
fn candidate_digest(candidate: &ParachainCandidate) -> Digest {
Digest(vec![candidate.group.0, candidate.data])
}
fn proposal_digest(candidate: &Proposal) -> Digest {
Digest(candidate.candidates.iter().fold(Vec::new(), |mut a, c| {
a.extend(Self::candidate_digest(c).0);
a
}))
}
fn candidate_group(candidate: &ParachainCandidate) -> GroupId {
candidate.group.clone()
}
fn round_proposer(&self, round: usize) -> AuthorityId {
AuthorityId(round % self.shared.n_authorities)
}
fn check_validity(&self, _candidate: &ParachainCandidate) -> Self::CheckCandidate {
let future = self.shared.timer
.sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS))
.map_err(Error::Timer)
.map(|_| true);
Box::new(future)
}
fn check_availability(&self, _candidate: &ParachainCandidate) -> Self::CheckAvailability {
let future = self.shared.timer
.sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS))
.map_err(Error::Timer)
.map(|_| true);
Box::new(future)
}
fn create_proposal(&self, candidates: Vec<&ParachainCandidate>)
-> Option<Proposal>
{
let t = self.shared.n_groups * 2 / 3;
if candidates.len() >= t {
Some(Proposal {
candidates: candidates.iter().map(|x| (&**x).clone()).collect()
})
} else {
None
}
}
fn proposal_valid<F>(&self, proposal: &Proposal, check_candidate: F) -> bool
where F: FnMut(&ParachainCandidate) -> bool
{
if proposal.candidates.len() >= self.shared.n_groups * 2 / 3 {
proposal.candidates.iter().all(check_candidate)
} else {
false
}
}
fn local_id(&self) -> AuthorityId {
self.local_id.clone()
}
fn sign_table_statement(
&self,
statement: &table::Statement<ParachainCandidate, Digest>
) -> Signature {
Signature::Table(self.local_id(), statement.clone())
}
fn sign_bft_message(&self, message: &bft::Message<Proposal, Digest>) -> Signature {
Signature::Bft(self.local_id(), message.clone())
}
}
struct TestRecovery;
impl MessageRecovery<TestContext> for TestRecovery {
type UncheckedMessage = OutgoingMessage<TestContext>;
fn check_message(&self, msg: Self::UncheckedMessage) -> Option<CheckedMessage<TestContext>> {
Some(match msg {
OutgoingMessage::Bft(c) => CheckedMessage::Bft(c),
OutgoingMessage::Table(batch) => CheckedMessage::Table(batch.items),
})
}
}
pub struct Network<T> {
endpoints: Vec<mpsc::UnboundedSender<T>>,
input: mpsc::UnboundedReceiver<(usize, T)>,
}
impl<T: Clone + Send + 'static> Network<T> {
pub fn new(nodes: usize)
-> (Self, Vec<mpsc::UnboundedSender<(usize, T)>>, Vec<mpsc::UnboundedReceiver<T>>)
{
let mut inputs = Vec::with_capacity(nodes);
let mut outputs = Vec::with_capacity(nodes);
let mut endpoints = Vec::with_capacity(nodes);
let (in_tx, in_rx) = mpsc::unbounded();
for _ in 0..nodes {
let (out_tx, out_rx) = mpsc::unbounded();
inputs.push(in_tx.clone());
outputs.push(out_rx);
endpoints.push(out_tx);
}
let network = Network {
endpoints,
input: in_rx,
};
(network, inputs, outputs)
}
pub fn route_on_thread(self) {
::std::thread::spawn(move || { let _ = self.wait(); });
}
}
impl<T: Clone> Future for Network<T> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), Self::Error> {
match try_ready!(self.input.poll()) {
None => Ok(Async::Ready(())),
Some((sender, item)) => {
{
let receiving_endpoints = self.endpoints
.iter()
.enumerate()
.filter(|&(i, _)| i != sender)
.map(|(_, x)| x);
for endpoint in receiving_endpoints {
let _ = endpoint.unbounded_send(item.clone());
}
}
self.poll()
}
}
}
}
#[derive(Debug, Clone)]
pub struct VecBatch<V, T> {
pub max_len: usize,
pub targets: Vec<V>,
pub items: Vec<T>,
}
impl<V, T> ::StatementBatch<V, T> for VecBatch<V, T> {
fn targets(&self) -> &[V] { &self.targets }
fn is_empty(&self) -> bool { self.items.is_empty() }
fn push(&mut self, item: T) -> bool {
if self.items.len() == self.max_len {
false
} else {
self.items.push(item);
true
}
}
}
fn make_group_assignments(n_authorities: usize, n_groups: usize)
-> HashMap<GroupId, GroupInfo<AuthorityId>>
{
let mut map = HashMap::new();
let threshold = (n_authorities / n_groups) / 2;
let make_blank_group = || {
GroupInfo {
validity_guarantors: HashSet::new(),
availability_guarantors: HashSet::new(),
needed_validity: threshold,
needed_availability: threshold,
}
};
// every authority checks validity of his ID modulo n_groups and
// guarantees availability for the group above that.
for a_id in 0..n_authorities {
let primary_group = a_id % n_groups;
let availability_groups = [
(a_id + 1) % n_groups,
a_id.wrapping_sub(1) % n_groups,
];
map.entry(GroupId(primary_group))
.or_insert_with(&make_blank_group)
.validity_guarantors
.insert(AuthorityId(a_id));
for &availability_group in &availability_groups {
map.entry(GroupId(availability_group))
.or_insert_with(&make_blank_group)
.availability_guarantors
.insert(AuthorityId(a_id));
}
}
map
}
fn make_blank_batch<T>(n_authorities: usize) -> VecBatch<AuthorityId, T> {
VecBatch {
max_len: 20,
targets: (0..n_authorities).map(AuthorityId).collect(),
items: Vec::new(),
}
}
#[test]
fn consensus_completes_with_minimum_good() {
let n = 50;
let f = 16;
let n_groups = 10;
let timer = ::tokio_timer::wheel()
.tick_duration(Duration::from_millis(TIMER_TICK_DURATION_MS))
.num_slots(1 << 16)
.build();
let (network, inputs, outputs) = Network::<(AuthorityId, OutgoingMessage<TestContext>)>::new(n - f);
network.route_on_thread();
let shared_test_context = Arc::new(SharedTestContext {
n_authorities: n,
n_groups: n_groups,
timer: timer.clone(),
});
let groups = make_group_assignments(n, n_groups);
let authorities = inputs.into_iter().zip(outputs).enumerate().map(|(raw_id, (input, output))| {
let id = AuthorityId(raw_id);
let context = TestContext {
shared: shared_test_context.clone(),
local_id: id,
};
let shared_table = SharedTable::new(context.clone(), groups.clone());
let params = AgreementParams {
context,
timer: timer.clone(),
table: shared_table,
nodes: n,
max_faulty: f,
round_timeout_multiplier: 4,
message_buffer_size: 100,
form_proposal_interval: Duration::from_millis(PROPOSAL_FORMATION_TICK_MS),
};
let net_out = input
.sink_map_err(|_| Error::NetOut)
.with(move |x| Ok::<_, Error>((id.0, (id, x))) );
let net_in = output
.map_err(|_| Error::NetIn)
.map(move |(v, msg)| (v, vec![msg]));
let propagate_statements = timer
.interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS))
.map(move |()| make_blank_batch(n))
.map_err(Error::Timer);
let local_candidate = if raw_id < n_groups {
let candidate = ParachainCandidate {
group: GroupId(raw_id),
data: raw_id,
};
::futures::future::Either::A(Ok::<_, Error>(candidate).into_future())
} else {
::futures::future::Either::B(::futures::future::empty())
};
agree::<_, _, _, _, _, _, Error>(
params,
net_in,
net_out,
TestRecovery,
propagate_statements,
local_candidate
)
}).collect::<Vec<_>>();
futures::future::join_all(authorities).wait().unwrap();
}
+15
View File
@@ -0,0 +1,15 @@
[package]
name = "polkadot-consensus"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1.17"
parking_lot = "0.4"
tokio-timer = "0.1.2"
ed25519 = { path = "../../substrate/ed25519" }
polkadot-primitives = { path = "../primitives" }
polkadot-statement-table = { path = "../statement-table" }
substrate-bft = { path = "../../substrate/bft" }
substrate-codec = { path = "../../substrate/codec" }
substrate-primitives = { path = "../../substrate/primitives" }
+243
View File
@@ -0,0 +1,243 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Propagation and agreement of candidates.
//!
//! Authorities are split into groups by parachain, and each authority might come
//! up its own candidate for their parachain. Within groups, authorities pass around
//! their candidates and produce statements of validity.
//!
//! Any candidate that receives majority approval by the authorities in a group
//! may be subject to inclusion, unless any authorities flag that candidate as invalid.
//!
//! Wrongly flagging as invalid should be strongly disincentivized, so that in the
//! equilibrium state it is not expected to happen. Likewise with the submission
//! of invalid blocks.
//!
//! Groups themselves may be compromised by malicious authorities.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use codec::Slicable;
use table::Table;
use table::generic::Statement as GenericStatement;
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{Id as ParaId, CandidateReceipt};
use primitives::block::Block as SubstrateBlock;
use primitives::AuthorityId;
use parking_lot::Mutex;
extern crate futures;
extern crate ed25519;
extern crate parking_lot;
extern crate tokio_timer;
extern crate polkadot_statement_table as table;
extern crate polkadot_primitives;
extern crate substrate_bft as bft;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
/// Information about a specific group.
#[derive(Debug, Clone)]
pub struct GroupInfo {
/// Authorities meant to check validity of candidates.
pub validity_guarantors: HashSet<AuthorityId>,
/// Authorities meant to check availability of candidate data.
pub availability_guarantors: HashSet<AuthorityId>,
/// Number of votes needed for validity.
pub needed_validity: usize,
/// Number of votes needed for availability.
pub needed_availability: usize,
}
struct TableContext {
parent_hash: Hash,
key: Arc<ed25519::Pair>,
groups: HashMap<ParaId, GroupInfo>,
}
impl table::Context for TableContext {
fn is_member_of(&self, authority: &AuthorityId, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority))
}
fn is_availability_guarantor_of(&self, authority: &AuthorityId, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority))
}
fn requisite_votes(&self, group: &ParaId) -> (usize, usize) {
self.groups.get(group).map_or(
(usize::max_value(), usize::max_value()),
|g| (g.needed_validity, g.needed_availability),
)
}
}
impl TableContext {
fn sign_statement(&self, statement: table::Statement) -> table::SignedStatement {
let signature = sign_table_statement(&statement, &self.key, &self.parent_hash);
let local_id = self.key.public().0;
table::SignedStatement {
statement,
signature,
sender: local_id,
}
}
}
/// Sign a table statement against a parent hash.
/// The actual message signed is the encoded statement concatenated with the
/// parent hash.
pub fn sign_table_statement(statement: &table::Statement, key: &ed25519::Pair, parent_hash: &Hash) -> ed25519::Signature {
use polkadot_primitives::parachain::Statement as RawStatement;
let raw = match *statement {
GenericStatement::Candidate(ref c) => RawStatement::Candidate(c.clone()),
GenericStatement::Valid(h) => RawStatement::Valid(h),
GenericStatement::Invalid(h) => RawStatement::Invalid(h),
GenericStatement::Available(h) => RawStatement::Available(h),
};
let mut encoded = raw.encode();
encoded.extend(&parent_hash.0);
key.sign(&encoded)
}
// A shared table object.
struct SharedTableInner {
table: Table<TableContext>,
proposed_digest: Option<Hash>,
}
impl SharedTableInner {
fn import_statement(
&mut self,
context: &TableContext,
statement: ::table::SignedStatement,
received_from: Option<AuthorityId>,
) -> Option<table::Summary> {
self.table.import_statement(context, statement, received_from)
}
}
/// A shared table object.
pub struct SharedTable {
context: Arc<TableContext>,
inner: Arc<Mutex<SharedTableInner>>,
}
impl Clone for SharedTable {
fn clone(&self) -> Self {
SharedTable {
context: self.context.clone(),
inner: self.inner.clone()
}
}
}
impl SharedTable {
/// Create a new shared table.
///
/// Provide the key to sign with, and the parent hash of the relay chain
/// block being built.
pub fn new(groups: HashMap<ParaId, GroupInfo>, key: Arc<ed25519::Pair>, parent_hash: Hash) -> Self {
SharedTable {
context: Arc::new(TableContext { groups, key, parent_hash }),
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
proposed_digest: None,
}))
}
}
/// Import a single statement.
pub fn import_statement(
&self,
statement: table::SignedStatement,
received_from: Option<AuthorityId>,
) -> Option<table::Summary> {
self.inner.lock().import_statement(&*self.context, statement, received_from)
}
/// Sign and import a local statement.
pub fn sign_and_import(
&self,
statement: table::Statement,
) -> Option<table::Summary> {
let proposed_digest = match statement {
GenericStatement::Candidate(ref c) => Some(c.hash()),
_ => None,
};
let signed_statement = self.context.sign_statement(statement);
let mut inner = self.inner.lock();
if proposed_digest.is_some() {
inner.proposed_digest = proposed_digest;
}
inner.import_statement(&*self.context, signed_statement, None)
}
/// Import many statements at once.
///
/// Provide an iterator yielding pairs of (statement, received_from).
pub fn import_statements<I, U>(&self, iterable: I) -> U
where
I: IntoIterator<Item=(table::SignedStatement, Option<AuthorityId>)>,
U: ::std::iter::FromIterator<table::Summary>,
{
let mut inner = self.inner.lock();
iterable.into_iter().filter_map(move |(statement, received_from)| {
inner.import_statement(&*self.context, statement, received_from)
}).collect()
}
/// Check if a proposal is valid.
pub fn proposal_valid(&self, _proposal: &SubstrateBlock) -> bool {
false // TODO
}
/// Execute a closure using a specific candidate.
///
/// Deadlocks if called recursively.
pub fn with_candidate<F, U>(&self, digest: &Hash, f: F) -> U
where F: FnOnce(Option<&CandidateReceipt>) -> U
{
let inner = self.inner.lock();
f(inner.table.get_candidate(digest))
}
/// Get all witnessed misbehavior.
pub fn get_misbehavior(&self) -> HashMap<AuthorityId, table::Misbehavior> {
self.inner.lock().table.get_misbehavior().clone()
}
/// Fill a statement batch.
pub fn fill_batch<B: table::StatementBatch>(&self, batch: &mut B) {
self.inner.lock().table.fill_batch(batch);
}
/// Get the local proposed block's hash.
pub fn proposed_hash(&self) -> Option<Hash> {
self.inner.lock().proposed_digest.clone()
}
}
+120 -1
View File
@@ -20,7 +20,9 @@
use primitives::bytes;
use primitives;
use codec::{Input, Slicable, NonTrivialSlicable};
use rstd::cmp::{PartialOrd, Ord, Ordering};
use rstd::vec::Vec;
use ::Hash;
/// Unique identifier of a parachain.
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
@@ -159,6 +161,55 @@ pub struct CandidateReceipt {
pub fees: u64,
}
impl Slicable for CandidateReceipt {
fn encode(&self) -> Vec<u8> {
let mut v = Vec::new();
self.parachain_index.using_encoded(|s| v.extend(s));
self.collator.using_encoded(|s| v.extend(s));
self.head_data.0.using_encoded(|s| v.extend(s));
self.balance_uploads.using_encoded(|s| v.extend(s));
self.egress_queue_roots.using_encoded(|s| v.extend(s));
self.fees.using_encoded(|s| v.extend(s));
v
}
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(CandidateReceipt {
parachain_index: try_opt!(Slicable::decode(input)),
collator: try_opt!(Slicable::decode(input)),
head_data: try_opt!(Slicable::decode(input).map(HeadData)),
balance_uploads: try_opt!(Slicable::decode(input)),
egress_queue_roots: try_opt!(Slicable::decode(input)),
fees: try_opt!(Slicable::decode(input)),
})
}
}
impl CandidateReceipt {
/// Get the blake2_256 hash
#[cfg(feature = "std")]
pub fn hash(&self) -> Hash {
let encoded = self.encode();
primitives::hashing::blake2_256(&encoded).into()
}
}
impl PartialOrd for CandidateReceipt {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for CandidateReceipt {
fn cmp(&self, other: &Self) -> Ordering {
// TODO: compare signatures or something more sane
self.parachain_index.cmp(&other.parachain_index)
.then_with(|| self.head_data.cmp(&other.head_data))
}
}
/// Parachain ingress queue message.
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
@@ -185,7 +236,7 @@ pub struct BlockData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u
pub struct Header(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
/// Parachain head data included in the chain.
#[derive(PartialEq, Eq, Clone)]
#[derive(PartialEq, Eq, Clone, PartialOrd, Ord)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
pub struct HeadData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
@@ -209,6 +260,74 @@ impl Slicable for Activity {
}
}
#[derive(Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Debug))]
#[repr(u8)]
enum StatementKind {
Candidate = 1,
Valid = 2,
Invalid = 3,
Available = 4,
}
/// Statements which can be made about parachain candidates.
#[derive(Clone, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Debug))]
pub enum Statement {
/// Proposal of a parachain candidate.
Candidate(CandidateReceipt),
/// State that a parachain candidate is valid.
Valid(Hash),
/// Vote to commit to a candidate.
Invalid(Hash),
/// Vote to advance round after inactive primary.
Available(Hash),
}
impl Slicable for Statement {
fn encode(&self) -> Vec<u8> {
let mut v = Vec::new();
match *self {
Statement::Candidate(ref candidate) => {
v.push(StatementKind::Candidate as u8);
candidate.using_encoded(|s| v.extend(s));
}
Statement::Valid(ref hash) => {
v.push(StatementKind::Valid as u8);
hash.using_encoded(|s| v.extend(s));
}
Statement::Invalid(ref hash) => {
v.push(StatementKind::Invalid as u8);
hash.using_encoded(|s| v.extend(s));
}
Statement::Available(ref hash) => {
v.push(StatementKind::Available as u8);
hash.using_encoded(|s| v.extend(s));
}
}
v
}
fn decode<I: Input>(value: &mut I) -> Option<Self> {
match u8::decode(value) {
Some(x) if x == StatementKind::Candidate as u8 => {
Slicable::decode(value).map(Statement::Candidate)
}
Some(x) if x == StatementKind::Valid as u8 => {
Slicable::decode(value).map(Statement::Valid)
}
Some(x) if x == StatementKind::Invalid as u8 => {
Slicable::decode(value).map(Statement::Invalid)
}
Some(x) if x == StatementKind::Available as u8 => {
Slicable::decode(value).map(Statement::Available)
}
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -152,7 +152,6 @@ impl Slicable for Proposal {
}
}
/// Public functions that can be dispatched to.
#[derive(Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
@@ -314,7 +313,7 @@ pub struct UncheckedTransaction {
impl Slicable for UncheckedTransaction {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
// This is a little more complicated than usua since the binary format must be compatible
// This is a little more complicated than usual since the binary format must be compatible
// with substrate's generic `Vec<u8>` type. Basically this just means accepting that there
// will be a prefix of u32, which has the total number of bytes following (we don't need
// to use this).
@@ -0,0 +1,8 @@
[package]
name = "polkadot-statement-table"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
substrate-primitives = { path = "../../substrate/primitives" }
polkadot-primitives = { path = "../primitives" }
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The statement table.
//! The statement table: generic implementation.
//!
//! This stores messages other authorities issue about candidates.
//!
@@ -32,7 +32,21 @@ use std::collections::hash_map::{HashMap, Entry};
use std::hash::Hash;
use std::fmt::Debug;
use super::StatementBatch;
/// A batch of statements to send out.
pub trait StatementBatch<V, T> {
/// Get the target authorities of these statements.
fn targets(&self) -> &[V];
/// If the batch is empty.
fn is_empty(&self) -> bool;
/// Push a statement onto the batch. Returns false when the batch is full.
///
/// This is meant to do work like incrementally serializing the statements
/// into a vector of bytes while making sure the length is below a certain
/// amount.
fn push(&mut self, statement: T) -> bool;
}
/// Context for the statement table.
pub trait Context {
@@ -380,7 +394,7 @@ impl<C: Context> Table<C> {
&self.detected_misbehavior
}
/// Fill a statement batch and note messages seen by the targets.
/// Fill a statement batch and note messages as seen by the targets.
pub fn fill_batch<B>(&mut self, batch: &mut B)
where B: StatementBatch<
C::AuthorityId,
@@ -709,9 +723,28 @@ impl<C: Context> Table<C> {
#[cfg(test)]
mod tests {
use super::*;
use ::tests::VecBatch;
use std::collections::HashMap;
#[derive(Debug, Clone)]
struct VecBatch<V, T> {
pub max_len: usize,
pub targets: Vec<V>,
pub items: Vec<T>,
}
impl<V, T> ::generic::StatementBatch<V, T> for VecBatch<V, T> {
fn targets(&self) -> &[V] { &self.targets }
fn is_empty(&self) -> bool { self.items.is_empty() }
fn push(&mut self, item: T) -> bool {
if self.items.len() == self.max_len {
false
} else {
self.items.push(item);
true
}
}
}
fn create<C: Context>() -> Table<C> {
Table {
authority_data: HashMap::default(),
@@ -0,0 +1,108 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The statement table.
//!
//! This stores messages other authorities issue about candidates.
//!
//! These messages are used to create a proposal submitted to a BFT consensus process.
//!
//! Proposals are formed of sets of candidates which have the requisite number of
//! validity and availability votes.
//!
//! Each parachain is associated with two sets of authorities: those which can
//! propose and attest to validity of candidates, and those who can only attest
//! to availability.
extern crate substrate_primitives;
extern crate polkadot_primitives as primitives;
pub mod generic;
pub use generic::Table;
use primitives::parachain::{Id, CandidateReceipt};
use primitives::{SessionKey, Hash, Signature};
/// Statements about candidates on the network.
pub type Statement = generic::Statement<CandidateReceipt, Hash>;
/// Signed statements about candidates.
pub type SignedStatement = generic::SignedStatement<CandidateReceipt, Hash, SessionKey, Signature>;
/// Kinds of misbehavior, along with proof.
pub type Misbehavior = generic::Misbehavior<CandidateReceipt, Hash, SessionKey, Signature>;
/// A summary of import of a statement.
pub type Summary = generic::Summary<Hash, Id>;
/// Context necessary to construct a table.
pub trait Context {
/// Whether a authority is a member of a group.
/// Members are meant to submit candidates and vote on validity.
fn is_member_of(&self, authority: &SessionKey, group: &Id) -> bool;
/// Whether a authority is an availability guarantor of a group.
/// Guarantors are meant to vote on availability for candidates submitted
/// in a group.
fn is_availability_guarantor_of(
&self,
authority: &SessionKey,
group: &Id,
) -> bool;
// requisite number of votes for validity and availability respectively from a group.
fn requisite_votes(&self, group: &Id) -> (usize, usize);
}
impl<C: Context> generic::Context for C {
type AuthorityId = SessionKey;
type Digest = Hash;
type GroupId = Id;
type Signature = Signature;
type Candidate = CandidateReceipt;
fn candidate_digest(candidate: &CandidateReceipt) -> Hash {
candidate.hash()
}
fn candidate_group(candidate: &CandidateReceipt) -> Id {
candidate.parachain_index.clone()
}
fn is_member_of(&self, authority: &SessionKey, group: &Id) -> bool {
Context::is_member_of(self, authority, group)
}
fn is_availability_guarantor_of(&self, authority: &SessionKey, group: &Id) -> bool {
Context::is_availability_guarantor_of(self, authority, group)
}
fn requisite_votes(&self, group: &Id) -> (usize, usize) {
Context::requisite_votes(self, group)
}
}
/// A batch of statements to send out.
pub trait StatementBatch {
/// Get the target authorities of these statements.
fn targets(&self) -> &[SessionKey];
/// If the batch is empty.
fn is_empty(&self) -> bool;
/// Push a statement onto the batch. Returns false when the batch is full.
///
/// This is meant to do work like incrementally serializing the statements
/// into a vector of bytes while making sure the length is below a certain
/// amount.
fn push(&mut self, statement: SignedStatement) -> bool;
}
impl<T: StatementBatch> generic::StatementBatch<SessionKey, SignedStatement> for T {
fn targets(&self) -> &[SessionKey] { StatementBatch::targets(self ) }
fn is_empty(&self) -> bool { StatementBatch::is_empty(self) }
fn push(&mut self, statement: SignedStatement) -> bool {
StatementBatch::push(self, statement)
}
}
+20
View File
@@ -0,0 +1,20 @@
[package]
name = "substrate-bft"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1.17"
substrate-client = { path = "../client" }
substrate-codec = { path = "../codec" }
substrate-primitives = { path = "../primitives" }
substrate-state-machine = { path = "../state-machine" }
ed25519 = { path = "../ed25519" }
tokio-timer = "0.1.2"
parking_lot = "0.4"
error-chain = "0.11"
[dev-dependencies]
substrate-keyring = { path = "../keyring" }
substrate-executor = { path = "../executor" }
tokio-core = "0.1.12"
+57
View File
@@ -0,0 +1,57 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Error types in the BFT service.
error_chain! {
errors {
/// Missing state at block with given Id.
StateUnavailable(b: ::client::BlockId) {
description("State missing at given block."),
display("State unavailable at block {:?}", b),
}
/// I/O terminated unexpectedly
IoTerminated {
description("I/O terminated unexpectedly."),
display("I/O terminated unexpectedly."),
}
/// Unable to schedule wakeup.
FaultyTimer {
description("Faulty timer: unable to schedule wakeup"),
display("Faulty timer: unable to schedule wakeup"),
}
/// Unable to propose a block.
CannotPropose {
description("Unable to create block proposal."),
display("Unable to create block proposal."),
}
/// Error dispatching the agreement future onto the executor.
Executor(e: ::futures::future::ExecuteErrorKind) {
description("Unable to dispatch agreement future"),
display("Unable to dispatch agreement future: {:?}", e),
}
}
}
impl From<::generic::InputStreamConcluded> for Error {
fn from(_: ::generic::InputStreamConcluded) -> Error {
ErrorKind::IoTerminated.into()
}
}
@@ -1,18 +1,18 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// This file is part of Substrate.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Message accumulator for each round of BFT consensus.
@@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::hash::Hash;
use super::{Message, LocalizedMessage};
use generic::{Message, LocalizedMessage};
/// Justification for some state at a given round.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -1,25 +1,21 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// This file is part of Substrate.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! BFT Agreement based on a rotating proposer in different rounds.
mod accumulator;
#[cfg(test)]
mod tests;
//! Very general implementation.
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
@@ -31,6 +27,11 @@ use self::accumulator::State;
pub use self::accumulator::{Accumulator, Justification, PrepareJustification, UncheckedJustification};
mod accumulator;
#[cfg(test)]
mod tests;
/// Messages over the proposal.
/// Each message carries an associated round number.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -46,7 +47,8 @@ pub enum Message<C, D> {
}
impl<C, D> Message<C, D> {
fn round_number(&self) -> usize {
/// Extract the round number.
pub fn round_number(&self) -> usize {
match *self {
Message::Propose(round, _) => round,
Message::Prepare(round, _) => round,
@@ -119,18 +121,14 @@ pub enum Communication<C, D, V, S> {
Auxiliary(PrepareJustification<D, S>),
}
/// Type alias for a localized message using only type parameters from `Context`.
// TODO: actual type alias when it's no longer a warning.
pub struct ContextCommunication<C: Context + ?Sized>(pub Communication<C::Candidate, C::Digest, C::AuthorityId, C::Signature>);
/// Hack to get around type alias warning.
pub trait TypeResolve {
/// Communication type.
type Communication;
}
impl<C: Context + ?Sized> Clone for ContextCommunication<C>
where
LocalizedMessage<C::Candidate, C::Digest, C::AuthorityId, C::Signature>: Clone,
PrepareJustification<C::Digest, C::Signature>: Clone,
{
fn clone(&self) -> Self {
ContextCommunication(self.0.clone())
}
impl<C: Context> TypeResolve for C {
type Communication = Communication<C::Candidate, C::Digest, C::AuthorityId, C::Signature>;
}
#[derive(Debug)]
@@ -326,7 +324,11 @@ impl<C: Context> Strategy<C> {
// rounds if necessary.
//
// only call within the context of a `Task`.
fn poll<E>(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
fn poll<E>(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
where
C::RoundTimeout: Future<Error=E>,
@@ -359,7 +361,11 @@ impl<C: Context> Strategy<C> {
// perform one round of polling: attempt to broadcast messages and change the state.
// if the round or internal round-state changes, this should be called again.
fn poll_once<E>(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
fn poll_once<E>(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
where
C::RoundTimeout: Future<Error=E>,
@@ -412,7 +418,11 @@ impl<C: Context> Strategy<C> {
Ok(Async::NotReady)
}
fn propose(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
fn propose(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Result<(), <C::CreateProposal as Future>::Error>
{
if let LocalState::Start = self.local_state {
@@ -461,7 +471,7 @@ impl<C: Context> Strategy<C> {
// broadcast the justification along with the proposal if we are locked.
if let Some(ref locked) = self.locked {
sending.push(
ContextCommunication(Communication::Auxiliary(locked.justification.clone()))
Communication::Auxiliary(locked.justification.clone())
);
}
@@ -472,7 +482,11 @@ impl<C: Context> Strategy<C> {
Ok(())
}
fn prepare(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>) {
fn prepare(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
) {
// prepare only upon start or having proposed.
match self.local_state {
LocalState::Start | LocalState::Proposed => {},
@@ -511,7 +525,11 @@ impl<C: Context> Strategy<C> {
}
}
fn commit(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>) {
fn commit(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
) {
// commit only if we haven't voted to advance or committed already
match self.local_state {
LocalState::Committed | LocalState::VoteAdvance => return,
@@ -538,7 +556,11 @@ impl<C: Context> Strategy<C> {
}
}
fn vote_advance(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
fn vote_advance(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Result<(), <C::RoundTimeout as Future>::Error>
{
// we can vote for advancement under all circumstances unless we have already.
@@ -606,11 +628,11 @@ impl<C: Context> Strategy<C> {
&mut self,
message: Message<C::Candidate, C::Digest>,
context: &C,
sending: &mut Sending<ContextCommunication<C>>
sending: &mut Sending<<C as TypeResolve>::Communication>
) {
let signed_message = context.sign_local(message);
self.import_message(signed_message.clone());
sending.push(ContextCommunication(Communication::Consensus(signed_message)));
sending.push(Communication::Consensus(signed_message));
}
}
@@ -621,7 +643,7 @@ pub struct Agreement<C: Context, I, O> {
input: I,
output: O,
concluded: Option<Committed<C::Candidate, C::Digest, C::Signature>>,
sending: Sending<ContextCommunication<C>>,
sending: Sending<<C as TypeResolve>::Communication>,
strategy: Strategy<C>,
}
@@ -630,8 +652,8 @@ impl<C, I, O, E> Future for Agreement<C, I, O>
C: Context,
C::RoundTimeout: Future<Error=E>,
C::CreateProposal: Future<Error=E>,
I: Stream<Item=ContextCommunication<C>,Error=E>,
O: Sink<SinkItem=ContextCommunication<C>,SinkError=E>,
I: Stream<Item=<C as TypeResolve>::Communication,Error=E>,
O: Sink<SinkItem=<C as TypeResolve>::Communication,SinkError=E>,
E: From<InputStreamConcluded>,
{
type Item = Committed<C::Candidate, C::Digest, C::Signature>;
@@ -656,7 +678,7 @@ impl<C, I, O, E> Future for Agreement<C, I, O>
Async::NotReady => break,
};
match message.0 {
match message {
Communication::Consensus(message) => self.strategy.import_message(message),
Communication::Auxiliary(lock_proof)
=> self.strategy.import_lock_proof(&self.context, lock_proof),
@@ -705,8 +727,8 @@ pub fn agree<C: Context, I, O, E>(context: C, nodes: usize, max_faulty: usize, i
C: Context,
C::RoundTimeout: Future<Error=E>,
C::CreateProposal: Future<Error=E>,
I: Stream<Item=ContextCommunication<C>,Error=E>,
O: Sink<SinkItem=ContextCommunication<C>,SinkError=E>,
I: Stream<Item=<C as TypeResolve>::Communication,Error=E>,
O: Sink<SinkItem=<C as TypeResolve>::Communication,SinkError=E>,
E: From<InputStreamConcluded>,
{
let strategy = Strategy::create(&context, nodes, max_faulty);
@@ -1,32 +1,90 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Tests for the candidate agreement strategy.
use super::*;
use tests::Network;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::prelude::*;
use futures::sync::oneshot;
use futures::sync::{oneshot, mpsc};
use futures::future::FutureResult;
struct Network<T> {
endpoints: Vec<mpsc::UnboundedSender<T>>,
input: mpsc::UnboundedReceiver<(usize, T)>,
}
impl<T: Clone + Send + 'static> Network<T> {
fn new(nodes: usize)
-> (Self, Vec<mpsc::UnboundedSender<(usize, T)>>, Vec<mpsc::UnboundedReceiver<T>>)
{
let mut inputs = Vec::with_capacity(nodes);
let mut outputs = Vec::with_capacity(nodes);
let mut endpoints = Vec::with_capacity(nodes);
let (in_tx, in_rx) = mpsc::unbounded();
for _ in 0..nodes {
let (out_tx, out_rx) = mpsc::unbounded();
inputs.push(in_tx.clone());
outputs.push(out_rx);
endpoints.push(out_tx);
}
let network = Network {
endpoints,
input: in_rx,
};
(network, inputs, outputs)
}
fn route_on_thread(self) {
::std::thread::spawn(move || { let _ = self.wait(); });
}
}
impl<T: Clone> Future for Network<T> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), Self::Error> {
match try_ready!(self.input.poll()) {
None => Ok(Async::Ready(())),
Some((sender, item)) => {
{
let receiving_endpoints = self.endpoints
.iter()
.enumerate()
.filter(|&(i, _)| i != sender)
.map(|(_, x)| x);
for endpoint in receiving_endpoints {
let _ = endpoint.unbounded_send(item.clone());
}
}
self.poll()
}
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct Candidate(usize);
+473
View File
@@ -0,0 +1,473 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! BFT Agreement based on a rotating proposer in different rounds.
pub mod error;
pub mod generic;
extern crate substrate_codec as codec;
extern crate substrate_client as client;
extern crate substrate_primitives as primitives;
extern crate substrate_state_machine as state_machine;
extern crate ed25519;
extern crate tokio_timer;
extern crate parking_lot;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate error_chain;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use client::{BlockId, Client};
use client::backend::Backend;
use codec::Slicable;
use ed25519::Signature;
use primitives::block::{Block, Header, HeaderHash};
use primitives::AuthorityId;
use state_machine::CodeExecutor;
use futures::{stream, task, Async, Sink, Future, IntoFuture};
use futures::future::Executor;
use futures::sync::oneshot;
use tokio_timer::Timer;
use parking_lot::Mutex;
pub use generic::InputStreamConcluded;
pub use error::{Error, ErrorKind};
/// Messages over the proposal.
/// Each message carries an associated round number.
pub type Message = generic::Message<Block, HeaderHash>;
/// Localized message type.
pub type LocalizedMessage = generic::LocalizedMessage<
Block,
HeaderHash,
AuthorityId,
Signature
>;
/// Justification of some hash.
pub type Justification = generic::Justification<HeaderHash, Signature>;
/// Justification of a prepare message.
pub type PrepareJustification = generic::PrepareJustification<HeaderHash, Signature>;
/// Result of a committed round of BFT
pub type Committed = generic::Committed<Block, HeaderHash, Signature>;
/// Communication between BFT participants.
pub type Communication = generic::Communication<Block, HeaderHash, AuthorityId, Signature>;
/// Logic for a proposer.
///
/// This will encapsulate creation and evaluation of proposals at a specific
/// block.
pub trait Proposer: Sized {
type CreateProposal: IntoFuture<Item=Block,Error=Error>;
/// Initialize the proposal logic on top of a specific header.
// TODO: provide state context explicitly?
fn init(parent_header: &Header, sign_with: Arc<ed25519::Pair>) -> Self;
/// Create a proposal.
fn propose(&self) -> Self::CreateProposal;
/// Evaluate proposal. True means valid.
// TODO: change this to a future.
fn evaluate(&self, proposal: &Block) -> bool;
}
/// Block import trait.
pub trait BlockImport {
/// Import a block alongside its corresponding justification.
fn import_block(&self, block: Block, justification: Justification);
}
/// Trait for getting the authorities at a given block.
pub trait Authorities {
/// Get the authorities at the given block.
fn authorities(&self, at: &BlockId) -> Result<Vec<AuthorityId>, Error>;
}
impl<B, E> BlockImport for Client<B, E>
where
B: Backend,
E: CodeExecutor,
client::error::Error: From<<B::State as state_machine::backend::Backend>::Error>
{
fn import_block(&self, block: Block, _justification: Justification) {
// TODO: use justification.
let _ = self.import_block(block.header, Some(block.transactions));
}
}
impl<B, E> Authorities for Client<B, E>
where
B: Backend,
E: CodeExecutor,
client::error::Error: From<<B::State as state_machine::backend::Backend>::Error>
{
fn authorities(&self, at: &BlockId) -> Result<Vec<AuthorityId>, Error> {
self.authorities_at(at).map_err(|_| ErrorKind::StateUnavailable(*at).into())
}
}
/// Instance of BFT agreement.
struct BftInstance<P> {
key: Arc<ed25519::Pair>,
authorities: Vec<AuthorityId>,
parent_hash: HeaderHash,
timer: Timer,
round_timeout_multiplier: u64,
proposer: P,
}
impl<P: Proposer> generic::Context for BftInstance<P> {
type AuthorityId = AuthorityId;
type Digest = HeaderHash;
type Signature = Signature;
type Candidate = Block;
type RoundTimeout = Box<Future<Item=(),Error=Error> + Send>;
type CreateProposal = <P::CreateProposal as IntoFuture>::Future;
fn local_id(&self) -> AuthorityId {
self.key.public().0
}
fn proposal(&self) -> Self::CreateProposal {
self.proposer.propose().into_future()
}
fn candidate_digest(&self, proposal: &Block) -> HeaderHash {
proposal.header.hash()
}
fn sign_local(&self, message: Message) -> LocalizedMessage {
use primitives::bft::{Message as PrimitiveMessage, Action as PrimitiveAction};
let action = match message.clone() {
::generic::Message::Propose(r, p) => PrimitiveAction::Propose(r as u32, p),
::generic::Message::Prepare(r, h) => PrimitiveAction::Prepare(r as u32, h),
::generic::Message::Commit(r, h) => PrimitiveAction::Commit(r as u32, h),
::generic::Message::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32),
};
let primitive = PrimitiveMessage {
parent: self.parent_hash,
action,
};
let to_sign = Slicable::encode(&primitive);
let signature = self.key.sign(&to_sign);
LocalizedMessage {
message,
signature,
sender: self.key.public().0
}
}
fn round_proposer(&self, round: usize) -> AuthorityId {
use primitives::hashing::blake2_256;
// repeat blake2_256 on parent hash round + 1 times.
// use as index into authorities vec.
// TODO: parent hash is really insecure as a randomness beacon as
// the prior can easily influence the block hash.
let hashed = (0..round + 1).fold(self.parent_hash.0, |a, _| {
blake2_256(&a[..])
});
let index = u32::decode(&mut &hashed[..])
.expect("there are more than 4 bytes in a 32 byte hash; qed");
self.authorities[(index as usize) % self.authorities.len()]
}
fn candidate_valid(&self, proposal: &Block) -> bool {
self.proposer.evaluate(proposal)
}
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
use std::time::Duration;
let round = ::std::cmp::min(63, round) as u32;
let timeout = 1u64.checked_shl(round)
.unwrap_or_else(u64::max_value)
.saturating_mul(self.round_timeout_multiplier);
Box::new(self.timer.sleep(Duration::from_secs(timeout))
.map_err(|_| ErrorKind::FaultyTimer.into()))
}
}
type Input = stream::Empty<Communication, Error>;
// "black hole" output sink.
struct Output;
impl Sink for Output {
type SinkItem = Communication;
type SinkError = Error;
fn start_send(&mut self, _item: Communication) -> ::futures::StartSend<Communication, Error> {
Ok(::futures::AsyncSink::Ready)
}
fn poll_complete(&mut self) -> ::futures::Poll<(), Error> {
Ok(Async::Ready(()))
}
}
/// A future that resolves either when canceled (witnessing a block from the network at same height)
/// or when agreement completes.
pub struct BftFuture<P: Proposer, I> {
inner: generic::Agreement<BftInstance<P>, Input, Output>,
cancel: Arc<AtomicBool>,
send_task: Option<oneshot::Sender<task::Task>>,
import: Arc<I>,
}
impl<P: Proposer, I: BlockImport> Future for BftFuture<P, I> {
type Item = ();
type Error = ();
fn poll(&mut self) -> ::futures::Poll<(), ()> {
// send the task to the bft service so this can be cancelled.
if let Some(sender) = self.send_task.take() {
let _ = sender.send(task::current());
}
// service has canceled the future. bail
if self.cancel.load(Ordering::Acquire) {
return Ok(Async::Ready(()))
}
// TODO: handle this error, at least by logging.
let committed = try_ready!(self.inner.poll().map_err(|_| ()));
// If we didn't see the proposal (very unlikely),
// we will get the block from the network later.
if let Some(justified_block) = committed.candidate {
self.import.import_block(justified_block, committed.justification)
}
Ok(Async::Ready(()))
}
}
struct AgreementHandle {
cancel: Arc<AtomicBool>,
task: Option<oneshot::Receiver<task::Task>>,
}
impl Drop for AgreementHandle {
fn drop(&mut self) {
let task = match self.task.take() {
Some(t) => t,
None => return,
};
// if this fails, the task is definitely not live anyway.
if let Ok(task) = task.wait() {
self.cancel.store(true, Ordering::Release);
task.notify();
}
}
}
/// The BftService kicks off the agreement process on top of any blocks it
/// is notified of.
pub struct BftService<P, E, I> {
client: Arc<I>,
executor: E,
live_agreements: Mutex<HashMap<HeaderHash, AgreementHandle>>,
timer: Timer,
round_timeout_multiplier: u64,
key: Arc<ed25519::Pair>, // TODO: key changing over time.
_marker: ::std::marker::PhantomData<P>,
}
impl<P, E, I> BftService<P, E, I>
where
P: Proposer,
E: Executor<BftFuture<P, I>>,
I: BlockImport + Authorities,
{
/// Signal that a valid block with the given header has been imported.
///
/// This will begin the consensus process to build a block on top of it.
/// If the executor fails to run the future, an error will be returned.
pub fn build_upon(&self, header: &Header) -> Result<(), Error> {
let hash = header.hash();
let mut _preempted_consensus = None;
let proposer = P::init(header, self.key.clone());
// TODO: check key is one of the authorities.
let authorities = self.client.authorities(&BlockId::Hash(hash))?;
let n = authorities.len();
let max_faulty = n.saturating_sub(1) / 3;
let bft_instance = BftInstance {
proposer,
parent_hash: hash,
round_timeout_multiplier: self.round_timeout_multiplier,
timer: self.timer.clone(),
key: self.key.clone(),
authorities: authorities,
};
let agreement = generic::agree(
bft_instance,
n,
max_faulty,
stream::empty(),
Output,
);
let cancel = Arc::new(AtomicBool::new(false));
let (tx, rx) = oneshot::channel();
self.executor.execute(BftFuture {
inner: agreement,
cancel: cancel.clone(),
send_task: Some(tx),
import: self.client.clone(),
}).map_err(|e| e.kind()).map_err(ErrorKind::Executor)?;
{
let mut live = self.live_agreements.lock();
live.insert(hash, AgreementHandle {
task: Some(rx),
cancel,
});
// cancel any agreements attempted to build upon this block's parent
// as clearly agreement has already been reached.
_preempted_consensus = live.remove(&header.parent_hash);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use primitives::block;
use self::tokio_core::reactor::{Core, Handle};
use self::keyring::Keyring;
extern crate substrate_keyring as keyring;
extern crate tokio_core;
struct FakeClient {
authorities: Vec<AuthorityId>,
imported_heights: Mutex<HashSet<block::Number>>
}
impl BlockImport for FakeClient {
fn import_block(&self, block: Block, _justification: Justification) {
assert!(self.imported_heights.lock().insert(block.header.number))
}
}
impl Authorities for FakeClient {
fn authorities(&self, _at: &BlockId) -> Result<Vec<AuthorityId>, Error> {
Ok(self.authorities.clone())
}
}
struct DummyProposer(block::Number);
impl Proposer for DummyProposer {
type CreateProposal = Result<Block, Error>;
fn init(parent_header: &Header, _sign_with: Arc<ed25519::Pair>) -> Self {
DummyProposer(parent_header.number + 1)
}
fn propose(&self) -> Result<Block, Error> {
Ok(Block {
header: Header::from_block_number(self.0),
transactions: Default::default()
})
}
fn evaluate(&self, proposal: &Block) -> bool {
proposal.header.number == self.0
}
}
fn make_service(client: FakeClient, handle: Handle)
-> BftService<DummyProposer, Handle, FakeClient>
{
BftService {
client: Arc::new(client),
executor: handle,
live_agreements: Mutex::new(HashMap::new()),
timer: Timer::default(),
round_timeout_multiplier: 4,
key: Arc::new(Keyring::One.into()),
_marker: Default::default(),
}
}
#[test]
fn future_gets_preempted() {
let client = FakeClient {
authorities: vec![
Keyring::One.to_raw_public(),
Keyring::Two.to_raw_public(),
Keyring::Alice.to_raw_public(),
Keyring::Eve.to_raw_public(),
],
imported_heights: Mutex::new(HashSet::new()),
};
let mut core = Core::new().unwrap();
let service = make_service(client, core.handle());
let first = Header::from_block_number(2);
let first_hash = first.hash();
let mut second = Header::from_block_number(3);
second.parent_hash = first_hash;
let second_hash = second.hash();
service.build_upon(&first).unwrap();
assert!(service.live_agreements.lock().contains_key(&first_hash));
// turn the core so the future gets polled and sends its task to the
// service. otherwise it deadlocks.
core.turn(Some(::std::time::Duration::from_millis(100)));
service.build_upon(&second).unwrap();
assert!(!service.live_agreements.lock().contains_key(&first_hash));
assert!(service.live_agreements.lock().contains_key(&second_hash));
core.turn(Some(::std::time::Duration::from_millis(100)));
}
}
+3 -1
View File
@@ -16,5 +16,7 @@ substrate-primitives = { path = "../primitives" }
substrate-runtime-io = { path = "../runtime-io" }
substrate-runtime-support = { path = "../runtime-support" }
substrate-state-machine = { path = "../state-machine" }
substrate-test-runtime = { path = "../test-runtime" }
substrate-keyring = { path = "../../substrate/keyring" }
[dev-dependencies]
substrate-test-runtime = { path = "../test-runtime" }
+6
View File
@@ -202,6 +202,11 @@ impl<B, E> Client<B, E> where
block_builder::BlockBuilder::at_block(parent, &self)
}
/// Author a new block, filling it with valid transactions from our transaction pool.
pub fn propose_block_at(&self, parent: &BlockId) -> block::Block {
unimplemented!()
}
/// Queue a block for import.
pub fn import_block(&self, header: block::Header, body: Option<block::Body>) -> error::Result<ImportResult> {
// TODO: import lock
@@ -223,6 +228,7 @@ impl<B, E> Client<B, E> where
)?;
let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1;
trace!("Imported {}, (#{}), best={}", block::HeaderHash::from(header.blake2_256()), header.number, is_new_best);
transaction.set_block_data(header, body, is_new_best)?;
transaction.set_storage(overlay.drain())?;
self.backend.commit_operation(transaction)?;
-33
View File
@@ -197,39 +197,6 @@ impl Backend {
blockchain: Blockchain::new(),
}
}
/// Generate and import a sequence of blocks. A user supplied function is allowed to modify each block header. Useful for testing.
pub fn generate_blocks<F>(&self, count: usize, edit_header: F) where F: Fn(&mut block::Header) {
use backend::{Backend, BlockImportOperation};
let info = blockchain::Backend::info(&self.blockchain).expect("In-memory backend never fails");
let mut best_num = info.best_number;
let mut best_hash = info.best_hash;
let state_root = blockchain::Backend::header(&self.blockchain, BlockId::Hash(best_hash))
.expect("In-memory backend never fails")
.expect("Best header always exists in the blockchain")
.state_root;
for _ in 0 .. count {
best_num = best_num + 1;
let mut header = block::Header {
parent_hash: best_hash,
number: best_num,
state_root: state_root,
transaction_root: Default::default(),
digest: Default::default(),
};
edit_header(&mut header);
let mut tx = self.begin_operation(BlockId::Hash(best_hash)).expect("In-memory backend does not fail");
best_hash = header_hash(&header);
tx.set_block_data(header, Some(vec![]), true).expect("In-memory backend does not fail");
self.commit_operation(tx).expect("In-memory backend does not fail");
}
}
/// Generate and import a sequence of blocks. Useful for testing.
pub fn push_blocks(&self, count: usize) {
self.generate_blocks(count, |_| {})
}
}
impl backend::Backend for Backend {
+9 -1
View File
@@ -53,6 +53,7 @@ pub trait Slicable: Sized {
}
/// Trait to mark that a type is not trivially (essentially "in place") serialisable.
// TODO: under specialization, remove this and simply specialize in place serializable types.
pub trait NonTrivialSlicable: Slicable {}
impl<T: EndianSensitive> Slicable for T {
@@ -213,6 +214,8 @@ macro_rules! tuple_impl {
self.0.using_encoded(f)
}
}
impl<$one: NonTrivialSlicable> NonTrivialSlicable for ($one,) { }
};
($first:ident, $($rest:ident,)+) => {
impl<$first: Slicable, $($rest: Slicable),+>
@@ -248,6 +251,11 @@ macro_rules! tuple_impl {
}
}
impl<$first: Slicable, $($rest: Slicable),+>
NonTrivialSlicable
for ($first, $($rest),+)
{ }
tuple_impl!($($rest,)+);
}
}
@@ -256,7 +264,7 @@ macro_rules! tuple_impl {
mod inner_tuple_impl {
use rstd::vec::Vec;
use super::{Input, Slicable};
use super::{Input, Slicable, NonTrivialSlicable};
tuple_impl!(A, B, C, D, E, F, G, H, I, J, K,);
}
+1 -1
View File
@@ -41,7 +41,7 @@ pub fn verify(sig: &[u8], message: &[u8], public: &[u8]) -> bool {
/// A public key.
#[derive(PartialEq, Clone, Debug)]
pub struct Public ([u8; 32]);
pub struct Public(pub [u8; 32]);
/// A key pair.
pub struct Pair(signature::Ed25519KeyPair);
+4
View File
@@ -22,7 +22,11 @@ substrate-primitives = { path = "../../substrate/primitives" }
substrate-client = { path = "../../substrate/client" }
substrate-state-machine = { path = "../../substrate/state-machine" }
substrate-serializer = { path = "../../substrate/serializer" }
substrate-runtime-support = { path = "../../substrate/runtime-support" }
[dev-dependencies]
substrate-test-runtime = { path = "../test-runtime" }
substrate-executor = { path = "../../substrate/executor" }
substrate-keyring = { path = "../../substrate/keyring" }
substrate-codec = { path = "../../substrate/codec" }
env_logger = "0.4"
+10 -8
View File
@@ -27,6 +27,7 @@ extern crate substrate_primitives as primitives;
extern crate substrate_state_machine as state_machine;
extern crate substrate_serializer as ser;
extern crate substrate_client as client;
extern crate substrate_runtime_support as runtime_support;
extern crate serde;
extern crate serde_json;
#[macro_use] extern crate serde_derive;
@@ -34,6 +35,12 @@ extern crate serde_json;
#[macro_use] extern crate bitflags;
#[macro_use] extern crate error_chain;
#[cfg(test)] extern crate env_logger;
#[cfg(test)] extern crate substrate_test_runtime as test_runtime;
#[cfg(test)] extern crate substrate_keyring as keyring;
#[cfg(test)] #[macro_use] extern crate substrate_executor as executor;
#[cfg(test)] extern crate substrate_codec as codec;
mod service;
mod sync;
mod protocol;
@@ -44,13 +51,7 @@ mod config;
mod chain;
mod blocks;
#[cfg(test)]
mod test;
#[cfg(test)]
extern crate substrate_executor;
#[cfg(test)]
extern crate env_logger;
#[cfg(test)] mod test;
pub use service::Service;
pub use protocol::{ProtocolStatus};
@@ -59,5 +60,6 @@ pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, Ne
// TODO: move it elsewhere
fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash {
primitives::hashing::blake2_256(&ser::encode(header)).into()
use runtime_support::Hashable;
header.blake2_256().into()
}
+56 -13
View File
@@ -19,13 +19,21 @@ mod sync;
use std::collections::{VecDeque, HashSet, HashMap};
use std::sync::Arc;
use parking_lot::RwLock;
use client::{self, BlockId};
use primitives::block;
use substrate_executor as executor;
use client::{self, BlockId, genesis};
use client::block_builder::BlockBuilder;
use primitives;
use executor;
use io::SyncIo;
use protocol::Protocol;
use config::ProtocolConfig;
use network::{PeerId, SessionInfo, Error as NetworkError};
use test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis};
use runtime_support::Hashable;
use test_runtime;
use keyring::Keyring;
use codec::Slicable;
native_executor_instance!(Executor, test_runtime::api::dispatch, include_bytes!("../../../test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm"));
pub struct TestIo<'p> {
pub queue: &'p RwLock<VecDeque<TestPacket>>,
@@ -92,7 +100,7 @@ pub struct TestPacket {
}
pub struct Peer {
pub chain: Arc<client::Client<client::in_mem::Backend, executor::WasmExecutor>>,
chain: Arc<client::Client<client::in_mem::Backend, executor::NativeExecutor<Executor>>>,
pub sync: Protocol,
pub queue: RwLock<VecDeque<TestPacket>>,
}
@@ -149,6 +157,36 @@ impl Peer {
fn flush(&self) {
}
fn generate_blocks<F>(&self, count: usize, mut edit_block: F) where F: FnMut(&mut BlockBuilder<client::in_mem::Backend, executor::NativeExecutor<Executor>>) {
for _ in 0 .. count {
let mut builder = self.chain.new_block().unwrap();
edit_block(&mut builder);
let block = builder.bake().unwrap();
trace!("Generating {}, (#{})", primitives::block::HeaderHash::from(block.header.blake2_256()), block.header.number);
self.chain.import_block(block.header, Some(block.transactions)).unwrap();
}
}
fn push_blocks(&self, count: usize, with_tx: bool) {
let mut nonce = 0;
if with_tx {
self.generate_blocks(count, |builder| {
let tx = test_runtime::Transaction {
from: Keyring::Alice.to_raw_public(),
to: Keyring::Alice.to_raw_public(),
amount: 1,
nonce: nonce,
};
let signature = Keyring::from_raw_public(tx.from.clone()).unwrap().sign(&tx.encode());
let tx = primitives::block::Transaction::decode(&mut test_runtime::UncheckedTransaction { signature, tx: tx }.encode().as_ref()).unwrap();
builder.push(tx).unwrap();
nonce = nonce + 1;
});
} else {
self.generate_blocks(count, |_| ());
}
}
}
pub struct TestNet {
@@ -158,6 +196,19 @@ pub struct TestNet {
}
impl TestNet {
fn genesis_config() -> GenesisConfig {
GenesisConfig::new_simple(vec![
Keyring::Alice.to_raw_public(),
], 1000)
}
fn prepare_genesis() -> (primitives::block::Header, Vec<(Vec<u8>, Vec<u8>)>) {
let mut storage = Self::genesis_config().genesis_map();
let block = genesis::construct_genesis_block(&storage);
storage.extend(additional_storage_with_genesis(&block));
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
}
pub fn new(n: usize) -> Self {
Self::new_with_config(n, ProtocolConfig::default())
}
@@ -168,17 +219,9 @@ impl TestNet {
started: false,
disconnect_events: Vec::new(),
};
let test_genesis_block = block::Header {
parent_hash: 0.into(),
number: 0,
state_root: 0.into(),
transaction_root: Default::default(),
digest: Default::default(),
};
for _ in 0..n {
let chain = Arc::new(client::new_in_mem(executor::WasmExecutor,
|| (test_genesis_block.clone(), vec![])).unwrap());
let chain = Arc::new(client::new_in_mem(Executor::new(), Self::prepare_genesis).unwrap());
let sync = Protocol::new(config.clone(), chain.clone()).unwrap();
net.peers.push(Arc::new(Peer {
sync: sync,
+16 -16
View File
@@ -22,8 +22,8 @@ use super::*;
fn sync_from_two_peers_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer(1).chain.backend().push_blocks(100);
net.peer(2).chain.backend().push_blocks(100);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.sync();
assert!(net.peer(0).chain.backend().blockchain().equals_to(net.peer(1).chain.backend().blockchain()));
let status = net.peer(0).sync.status();
@@ -34,9 +34,9 @@ fn sync_from_two_peers_works() {
fn sync_from_two_peers_with_ancestry_search_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer(0).chain.backend().generate_blocks(10, |header| header.state_root = 42.into());
net.peer(1).chain.backend().push_blocks(100);
net.peer(2).chain.backend().push_blocks(100);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).chain.backend().blockchain().canon_equals_to(net.peer(1).chain.backend().blockchain()));
@@ -45,7 +45,7 @@ fn sync_from_two_peers_with_ancestry_search_works() {
#[test]
fn sync_long_chain_works() {
let mut net = TestNet::new(2);
net.peer(1).chain.backend().push_blocks(5000);
net.peer(1).push_blocks(500, false);
net.sync_steps(3);
assert_eq!(net.peer(0).sync.status().sync.state, SyncState::Downloading);
net.sync();
@@ -56,8 +56,8 @@ fn sync_long_chain_works() {
fn sync_no_common_longer_chain_fails() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer(0).chain.backend().generate_blocks(200, |header| header.state_root = 42.into());
net.peer(1).chain.backend().push_blocks(200);
net.peer(0).push_blocks(20, true);
net.peer(1).push_blocks(20, false);
net.sync();
assert!(!net.peer(0).chain.backend().blockchain().canon_equals_to(net.peer(1).chain.backend().blockchain()));
}
@@ -66,16 +66,16 @@ fn sync_no_common_longer_chain_fails() {
fn sync_after_fork_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer(0).chain.backend().push_blocks(30);
net.peer(1).chain.backend().push_blocks(30);
net.peer(2).chain.backend().push_blocks(30);
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
net.peer(2).push_blocks(30, false);
net.peer(0).chain.backend().generate_blocks(10, |header| header.state_root = 42.into()); // fork
net.peer(1).chain.backend().push_blocks(20);
net.peer(2).chain.backend().push_blocks(20);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(20, false);
net.peer(2).push_blocks(20, false);
net.peer(1).chain.backend().generate_blocks(10, |header| header.state_root = 42.into()); // second fork between 1 and 2
net.peer(2).chain.backend().push_blocks(1);
net.peer(1).push_blocks(10, true);
net.peer(2).push_blocks(1, false);
// peer 1 has the best chain
let peer1_chain = net.peer(1).chain.backend().blockchain().clone();
+122
View File
@@ -0,0 +1,122 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Message formats for the BFT consensus layer.
use block::{Block, HeaderHash};
use codec::{Slicable, Input};
use rstd::vec::Vec;
#[derive(Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Debug))]
#[repr(u8)]
enum ActionKind {
Propose = 1,
Prepare = 2,
Commit = 3,
AdvanceRound = 4,
}
/// Actions which can be taken during the BFT process.
#[derive(Clone, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Debug))]
pub enum Action {
/// Proposal of a block candidate.
Propose(u32, Block),
/// Preparation to commit for a candidate.
Prepare(u32, HeaderHash),
/// Vote to commit to a candidate.
Commit(u32, HeaderHash),
/// Vote to advance round after inactive primary.
AdvanceRound(u32),
}
impl Slicable for Action {
fn encode(&self) -> Vec<u8> {
let mut v = Vec::new();
match *self {
Action::Propose(ref round, ref block) => {
v.push(ActionKind::Propose as u8);
round.using_encoded(|s| v.extend(s));
block.using_encoded(|s| v.extend(s));
}
Action::Prepare(ref round, ref hash) => {
v.push(ActionKind::Prepare as u8);
round.using_encoded(|s| v.extend(s));
hash.using_encoded(|s| v.extend(s));
}
Action::Commit(ref round, ref hash) => {
v.push(ActionKind::Commit as u8);
round.using_encoded(|s| v.extend(s));
hash.using_encoded(|s| v.extend(s));
}
Action::AdvanceRound(ref round) => {
v.push(ActionKind::AdvanceRound as u8);
round.using_encoded(|s| v.extend(s));
}
}
v
}
fn decode<I: Input>(value: &mut I) -> Option<Self> {
match u8::decode(value) {
Some(x) if x == ActionKind::Propose as u8 => {
let (round, block) = try_opt!(Slicable::decode(value));
Some(Action::Propose(round, block))
}
Some(x) if x == ActionKind::Prepare as u8 => {
let (round, hash) = try_opt!(Slicable::decode(value));
Some(Action::Prepare(round, hash))
}
Some(x) if x == ActionKind::Commit as u8 => {
let (round, hash) = try_opt!(Slicable::decode(value));
Some(Action::Commit(round, hash))
}
Some(x) if x == ActionKind::AdvanceRound as u8 => {
Slicable::decode(value).map(Action::AdvanceRound)
}
_ => None,
}
}
}
/// Messages exchanged between participants in the BFT consensus.
#[derive(Clone, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Debug))]
pub struct Message {
/// The parent header hash this action is relative to.
pub parent: HeaderHash,
/// The action being broadcasted.
pub action: Action,
}
impl Slicable for Message {
fn encode(&self) -> Vec<u8> {
let mut v = self.parent.encode();
self.action.using_encoded(|s| v.extend(s));
v
}
fn decode<I: Input>(value: &mut I) -> Option<Self> {
Some(Message {
parent: try_opt!(Slicable::decode(value)),
action: try_opt!(Slicable::decode(value)),
})
}
}
@@ -145,6 +145,12 @@ impl Header {
digest: Default::default(),
}
}
/// Get the blake2-256 hash of this header.
#[cfg(feature = "std")]
pub fn hash(&self) -> HeaderHash {
::hashing::blake2_256(Slicable::encode(self).as_slice()).into()
}
}
impl Slicable for Header {
@@ -177,6 +183,30 @@ mod tests {
use codec::Slicable;
use substrate_serializer as ser;
#[test]
fn test_header_encoding() {
let header = Header {
parent_hash: 5.into(),
number: 67,
state_root: 3.into(),
transaction_root: 6.into(),
digest: Digest { logs: vec![Log(vec![1]), Log(vec![2])] },
};
assert_eq!(header.encode(), vec![
// parent_hash
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5,
// number
67, 0, 0, 0, 0, 0, 0, 0,
// state_root
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3,
// transaction_root
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6,
// digest (length, log1, log2)
2, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 2
]);
}
#[test]
fn test_header_serialization() {
let header = Header {
@@ -202,4 +232,27 @@ mod tests {
let v = header.encode();
assert_eq!(Header::decode(&mut &v[..]).unwrap(), header);
}
#[test]
fn test_block_encoding() {
let block = Block {
header: Header::from_block_number(12),
transactions: vec![Transaction(vec!(4))],
};
assert_eq!(block.encode(), vec![
// parent_hash
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
// number
12, 0, 0, 0, 0, 0, 0, 0,
// state_root
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
// transaction_root
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
// digest
0, 0, 0, 0,
// transactions (length, tx...)
1, 0, 0, 0, 1, 0, 0, 0, 4
]);
}
}
+4 -3
View File
@@ -80,10 +80,11 @@ pub use hashing::{blake2_256, twox_128, twox_256};
#[cfg(feature = "std")]
pub mod hexdisplay;
pub mod storage;
pub mod hash;
pub mod uint;
pub mod bft;
pub mod block;
pub mod hash;
pub mod storage;
pub mod uint;
#[cfg(test)]
mod tests;
+1
View File
@@ -15,3 +15,4 @@ substrate-executor = { path = "../executor" }
[dev-dependencies]
assert_matches = "1.1"
substrate-executor = { path = "../executor" }
substrate-runtime-support = { path = "../runtime-support" }
+2 -1
View File
@@ -16,6 +16,7 @@
use substrate_executor as executor;
use client;
use runtime_support::Hashable;
use super::*;
#[test]
@@ -31,7 +32,7 @@ fn should_return_header() {
let client = client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap();
assert_matches!(
ChainApi::header(&client, "af65e54217fb213853703d57b80fc5b2bb834bf923046294d7a49bff62f0a8b2".into()),
ChainApi::header(&client, test_genesis_block.blake2_256().into()),
Ok(Some(ref x)) if x == &block::Header {
parent_hash: 0.into(),
number: 0,
+2
View File
@@ -33,6 +33,8 @@ extern crate substrate_executor;
#[cfg(test)]
#[macro_use]
extern crate assert_matches;
#[cfg(test)]
extern crate substrate_runtime_support as runtime_support;
pub mod chain;
pub mod state;
+3 -2
View File
@@ -17,6 +17,7 @@
use super::*;
use substrate_executor as executor;
use self::error::{Error, ErrorKind};
use runtime_support::Hashable;
use client;
#[test]
@@ -30,7 +31,7 @@ fn should_return_storage() {
};
let client = client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap();
let genesis_hash = "af65e54217fb213853703d57b80fc5b2bb834bf923046294d7a49bff62f0a8b2".into();
let genesis_hash = test_genesis_block.blake2_256().into();
assert_matches!(
StateApi::storage(&client, StorageKey(vec![10]), genesis_hash),
@@ -51,7 +52,7 @@ fn should_call_contract() {
};
let client = client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap();
let genesis_hash = "af65e54217fb213853703d57b80fc5b2bb834bf923046294d7a49bff62f0a8b2".into();
let genesis_hash = test_genesis_block.blake2_256().into();
assert_matches!(
StateApi::call(&client, "balanceOf".into(), vec![1,2,3], genesis_hash),
@@ -54,3 +54,30 @@ impl Slicable for Transaction {
}
impl ::codec::NonTrivialSlicable for Transaction {}
#[cfg(test)]
mod tests {
use super::*;
use keyring::Keyring;
#[test]
fn test_tx_encoding() {
let tx = Transaction {
from: Keyring::Alice.to_raw_public(),
to: Keyring::Bob.to_raw_public(),
amount: 69,
nonce: 33,
};
assert_eq!(tx.encode(), vec![
// from
209, 114, 167, 76, 218, 76, 134, 89, 18, 195, 43, 160, 168, 10, 87, 174, 105, 171, 174, 65, 14, 92, 203, 89, 222, 232, 78, 47, 68, 50, 219, 79,
// to
215, 86, 142, 95, 10, 126, 218, 103, 168, 38, 145, 255, 55, 154, 196, 187, 164, 249, 201, 184, 89, 254, 119, 155, 93, 70, 54, 59, 97, 173, 45, 185,
// amount
69, 0, 0, 0, 0, 0, 0, 0,
// nonce
33, 0, 0, 0, 0, 0, 0, 0
]);
}
}
@@ -61,3 +61,37 @@ impl Slicable for UncheckedTransaction {
}
impl ::codec::NonTrivialSlicable for UncheckedTransaction {}
#[cfg(test)]
mod tests {
use super::*;
use keyring::Keyring;
use ::Transaction;
#[test]
fn test_unchecked_encoding() {
let tx = Transaction {
from: Keyring::Alice.to_raw_public(),
to: Keyring::Bob.to_raw_public(),
amount: 69,
nonce: 34,
};
let signature = Keyring::from_raw_public(tx.from).unwrap().sign(&tx.encode());
let signed = UncheckedTransaction { tx, signature };
assert_eq!(signed.encode(), vec![
// length
144, 0, 0, 0,
// from
209, 114, 167, 76, 218, 76, 134, 89, 18, 195, 43, 160, 168, 10, 87, 174, 105, 171, 174, 65, 14, 92, 203, 89, 222, 232, 78, 47, 68, 50, 219, 79,
// to
215, 86, 142, 95, 10, 126, 218, 103, 168, 38, 145, 255, 55, 154, 196, 187, 164, 249, 201, 184, 89, 254, 119, 155, 93, 70, 54, 59, 97, 173, 45, 185,
// amount
69, 0, 0, 0, 0, 0, 0, 0,
// nonce
34, 0, 0, 0, 0, 0, 0, 0,
// signature
207, 69, 156, 55, 7, 227, 202, 3, 114, 111, 43, 46, 227, 38, 39, 122, 245, 69, 195, 117, 190, 154, 89, 76, 134, 91, 251, 230, 31, 221, 1, 194, 144, 34, 33, 58, 220, 154, 205, 135, 224, 52, 248, 198, 12, 17, 96, 53, 110, 160, 194, 10, 9, 60, 40, 133, 57, 112, 151, 200, 105, 198, 245, 10
]);
}
}