Merge pull request #34 from paritytech/rh-candidate-agreement-glue

Candidate agreement "glue" code
This commit is contained in:
Gav Wood
2018-01-23 14:42:17 +01:00
committed by GitHub
10 changed files with 1816 additions and 320 deletions
+12 -1
View File
@@ -617,7 +617,8 @@ name = "polkadot-candidate-agreement"
version = "0.1.0"
dependencies = [
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"polkadot-primitives 0.1.0",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1043,6 +1044,15 @@ dependencies = [
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-timer"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "triehash"
version = "0.1.0"
@@ -1238,6 +1248,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "514aae203178929dbf03318ad7c683126672d4d96eccb77b29603d33c9e25743"
"checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389"
"checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162"
"checksum tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6131e780037787ff1b3f8aad9da83bca02438b72277850dd6ad0d455e0e20efc"
"checksum triehash 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9291c7f0fae44858b5e087dd462afb382354120003778f1695b44aab98c7abd7"
"checksum uint 0.1.0 (git+https://github.com/paritytech/primitives.git)" = "<none>"
"checksum unicase 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2e01da42520092d0cd2d6ac3ae69eb21a22ad43ff195676b86f8c37f487d6b80"
+3 -2
View File
@@ -4,5 +4,6 @@ version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1"
polkadot-primitives = { path = "../primitives" }
futures = "0.1.17"
parking_lot = "0.4"
tokio-timer = "0.1.2"
@@ -117,37 +117,37 @@ struct VoteCounts {
/// Accumulates messages for a given round of BFT consensus.
///
/// This isn't tied to the "view" of a single validator. It
/// This isn't tied to the "view" of a single authority. It
/// keeps accurate track of the state of the BFT consensus based
/// on all messages imported.
#[derive(Debug)]
pub struct Accumulator<Candidate, Digest, ValidatorId, Signature>
pub struct Accumulator<Candidate, Digest, AuthorityId, Signature>
where
Candidate: Eq + Clone,
Digest: Hash + Eq + Clone,
ValidatorId: Hash + Eq,
AuthorityId: Hash + Eq,
Signature: Eq + Clone,
{
round_number: usize,
threshold: usize,
round_proposer: ValidatorId,
round_proposer: AuthorityId,
proposal: Option<Candidate>,
prepares: HashMap<ValidatorId, (Digest, Signature)>,
commits: HashMap<ValidatorId, (Digest, Signature)>,
prepares: HashMap<AuthorityId, (Digest, Signature)>,
commits: HashMap<AuthorityId, (Digest, Signature)>,
vote_counts: HashMap<Digest, VoteCounts>,
advance_round: HashSet<ValidatorId>,
advance_round: HashSet<AuthorityId>,
state: State<Candidate, Digest, Signature>,
}
impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, ValidatorId, Signature>
impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, AuthorityId, Signature>
where
Candidate: Eq + Clone,
Digest: Hash + Eq + Clone,
ValidatorId: Hash + Eq,
AuthorityId: Hash + Eq,
Signature: Eq + Clone,
{
/// Create a new state accumulator.
pub fn new(round_number: usize, threshold: usize, round_proposer: ValidatorId) -> Self {
pub fn new(round_number: usize, threshold: usize, round_proposer: AuthorityId) -> Self {
Accumulator {
round_number,
threshold,
@@ -171,11 +171,6 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
self.round_number.clone()
}
/// Get the round proposer.
pub fn round_proposer(&self) -> &ValidatorId {
&self.round_proposer
}
pub fn proposal(&self) -> Option<&Candidate> {
self.proposal.as_ref()
}
@@ -189,7 +184,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
/// and authorization should have already been checked.
pub fn import_message(
&mut self,
message: LocalizedMessage<Candidate, Digest, ValidatorId, Signature>,
message: LocalizedMessage<Candidate, Digest, AuthorityId, Signature>,
)
{
// message from different round.
@@ -210,7 +205,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
fn import_proposal(
&mut self,
proposal: Candidate,
sender: ValidatorId,
sender: AuthorityId,
) {
if sender != self.round_proposer || self.proposal.is_some() { return }
@@ -221,7 +216,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
fn import_prepare(
&mut self,
digest: Digest,
sender: ValidatorId,
sender: AuthorityId,
signature: Signature,
) {
// ignore any subsequent prepares by the same sender.
@@ -264,7 +259,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
fn import_commit(
&mut self,
digest: Digest,
sender: ValidatorId,
sender: AuthorityId,
signature: Signature,
) {
// ignore any subsequent commits by the same sender.
@@ -304,7 +299,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
fn import_advance_round(
&mut self,
sender: ValidatorId,
sender: AuthorityId,
) {
self.advance_round.insert(sender);
@@ -332,7 +327,7 @@ mod tests {
pub struct Digest(usize);
#[derive(Hash, PartialEq, Eq, Debug)]
pub struct ValidatorId(usize);
pub struct AuthorityId(usize);
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Signature(usize, usize);
@@ -347,7 +342,7 @@ mod tests {
let check_message = |r, d: &Digest, s: &Signature| {
if r == 2 && d.0 == 600 && s.0 == 600 {
Some(ValidatorId(s.1))
Some(AuthorityId(s.1))
} else {
None
}
@@ -370,11 +365,11 @@ mod tests {
#[test]
fn accepts_proposal_from_proposer_only() {
let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, ValidatorId(8));
let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(5),
sender: AuthorityId(5),
signature: Signature(999, 5),
message: Message::Propose(1, Candidate(999)),
});
@@ -382,7 +377,7 @@ mod tests {
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(8),
sender: AuthorityId(8),
signature: Signature(999, 8),
message: Message::Propose(1, Candidate(999)),
});
@@ -392,11 +387,11 @@ mod tests {
#[test]
fn reaches_prepare_phase() {
let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
let mut accumulator = Accumulator::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(8),
sender: AuthorityId(8),
signature: Signature(999, 8),
message: Message::Propose(1, Candidate(999)),
});
@@ -405,7 +400,7 @@ mod tests {
for i in 0..6 {
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(i),
sender: AuthorityId(i),
signature: Signature(999, i),
message: Message::Prepare(1, Digest(999)),
});
@@ -414,7 +409,7 @@ mod tests {
}
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(7),
sender: AuthorityId(7),
signature: Signature(999, 7),
message: Message::Prepare(1, Digest(999)),
});
@@ -427,11 +422,11 @@ mod tests {
#[test]
fn prepare_to_commit() {
let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
let mut accumulator = Accumulator::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(8),
sender: AuthorityId(8),
signature: Signature(999, 8),
message: Message::Propose(1, Candidate(999)),
});
@@ -440,7 +435,7 @@ mod tests {
for i in 0..6 {
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(i),
sender: AuthorityId(i),
signature: Signature(999, i),
message: Message::Prepare(1, Digest(999)),
});
@@ -449,7 +444,7 @@ mod tests {
}
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(7),
sender: AuthorityId(7),
signature: Signature(999, 7),
message: Message::Prepare(1, Digest(999)),
});
@@ -461,7 +456,7 @@ mod tests {
for i in 0..6 {
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(i),
sender: AuthorityId(i),
signature: Signature(999, i),
message: Message::Commit(1, Digest(999)),
});
@@ -473,7 +468,7 @@ mod tests {
}
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(7),
sender: AuthorityId(7),
signature: Signature(999, 7),
message: Message::Commit(1, Digest(999)),
});
@@ -486,11 +481,11 @@ mod tests {
#[test]
fn prepare_to_advance() {
let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
let mut accumulator = Accumulator::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(8),
sender: AuthorityId(8),
signature: Signature(999, 8),
message: Message::Propose(1, Candidate(999)),
});
@@ -499,7 +494,7 @@ mod tests {
for i in 0..7 {
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(i),
sender: AuthorityId(i),
signature: Signature(999, i),
message: Message::Prepare(1, Digest(999)),
});
@@ -512,7 +507,7 @@ mod tests {
for i in 0..6 {
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(i),
sender: AuthorityId(i),
signature: Signature(999, i),
message: Message::AdvanceRound(1),
});
@@ -524,7 +519,7 @@ mod tests {
}
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(7),
sender: AuthorityId(7),
signature: Signature(999, 7),
message: Message::AdvanceRound(1),
});
@@ -537,12 +532,12 @@ mod tests {
#[test]
fn conclude_different_than_proposed() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, ValidatorId(8));
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(i),
sender: AuthorityId(i),
signature: Signature(999, i),
message: Message::Prepare(1, Digest(999)),
});
@@ -555,7 +550,7 @@ mod tests {
for i in 0..7 {
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(i),
sender: AuthorityId(i),
signature: Signature(999, i),
message: Message::Commit(1, Digest(999)),
});
@@ -569,12 +564,12 @@ mod tests {
#[test]
fn begin_to_advance() {
let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, ValidatorId(8));
let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(i),
sender: AuthorityId(i),
signature: Signature(1, i),
message: Message::AdvanceRound(1),
});
@@ -588,12 +583,12 @@ mod tests {
#[test]
fn conclude_without_prepare() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, ValidatorId(8));
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);
for i in 0..7 {
accumulator.import_message(LocalizedMessage {
sender: ValidatorId(i),
sender: AuthorityId(i),
signature: Signature(999, i),
message: Message::Commit(1, Digest(999)),
});
+29 -22
View File
@@ -76,30 +76,30 @@ pub trait Context {
type Candidate: Debug + Eq + Clone;
/// Candidate digest.
type Digest: Debug + Hash + Eq + Clone;
/// Validator ID.
type ValidatorId: Debug + Hash + Eq + Clone;
/// Authority ID.
type AuthorityId: Debug + Hash + Eq + Clone;
/// Signature.
type Signature: Debug + Eq + Clone;
/// A future that resolves when a round timeout is concluded.
type RoundTimeout: Future<Item=()>;
/// A future that resolves when a proposal is ready.
type Proposal: Future<Item=Self::Candidate>;
type CreateProposal: Future<Item=Self::Candidate>;
/// Get the local validator ID.
fn local_id(&self) -> Self::ValidatorId;
/// Get the local authority ID.
fn local_id(&self) -> Self::AuthorityId;
/// Get the best proposal.
fn proposal(&self) -> Self::Proposal;
fn proposal(&self) -> Self::CreateProposal;
/// Get the digest of a candidate.
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
/// Sign a message using the local validator ID.
/// Sign a message using the local authority ID.
fn sign_local(&self, message: Message<Self::Candidate, Self::Digest>)
-> LocalizedMessage<Self::Candidate, Self::Digest, Self::ValidatorId, Self::Signature>;
-> LocalizedMessage<Self::Candidate, Self::Digest, Self::AuthorityId, Self::Signature>;
/// Get the proposer for a given round of consensus.
fn round_proposer(&self, round: usize) -> Self::ValidatorId;
fn round_proposer(&self, round: usize) -> Self::AuthorityId;
/// Whether the candidate is valid.
fn candidate_valid(&self, candidate: &Self::Candidate) -> bool;
@@ -121,11 +121,11 @@ pub enum Communication<C, D, V, S> {
/// Type alias for a localized message using only type parameters from `Context`.
// TODO: actual type alias when it's no longer a warning.
pub struct ContextCommunication<C: Context + ?Sized>(pub Communication<C::Candidate, C::Digest, C::ValidatorId, C::Signature>);
pub struct ContextCommunication<C: Context + ?Sized>(pub Communication<C::Candidate, C::Digest, C::AuthorityId, C::Signature>);
impl<C: Context + ?Sized> Clone for ContextCommunication<C>
where
LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>: Clone,
LocalizedMessage<C::Candidate, C::Digest, C::AuthorityId, C::Signature>: Clone,
PrepareJustification<C::Digest, C::Signature>: Clone,
{
fn clone(&self) -> Self {
@@ -242,19 +242,19 @@ enum LocalState {
// - a higher threshold-prepare is broadcast to us. in this case we can
// advance to the round of the threshold-prepare. this is an indication
// that we have experienced severe asynchrony/clock drift with the remainder
// of the other validators, and it is unlikely that we can assist in
// of the other authorities, and it is unlikely that we can assist in
// consensus meaningfully. nevertheless we make an attempt.
struct Strategy<C: Context> {
nodes: usize,
max_faulty: usize,
fetching_proposal: Option<C::Proposal>,
fetching_proposal: Option<C::CreateProposal>,
round_timeout: future::Fuse<C::RoundTimeout>,
local_state: LocalState,
locked: Option<Locked<C::Digest, C::Signature>>,
notable_candidates: HashMap<C::Digest, C::Candidate>,
current_accumulator: Accumulator<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
future_accumulator: Accumulator<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
local_id: C::ValidatorId,
current_accumulator: Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
future_accumulator: Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
local_id: C::AuthorityId,
}
impl<C: Context> Strategy<C> {
@@ -290,7 +290,7 @@ impl<C: Context> Strategy<C> {
fn import_message(
&mut self,
msg: LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>
msg: LocalizedMessage<C::Candidate, C::Digest, C::AuthorityId, C::Signature>
) {
let round_number = msg.message.round_number();
@@ -330,7 +330,7 @@ impl<C: Context> Strategy<C> {
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
where
C::RoundTimeout: Future<Error=E>,
C::Proposal: Future<Error=E>,
C::CreateProposal: Future<Error=E>,
{
let mut last_watermark = (
self.current_accumulator.round_number(),
@@ -363,7 +363,7 @@ impl<C: Context> Strategy<C> {
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
where
C::RoundTimeout: Future<Error=E>,
C::Proposal: Future<Error=E>,
C::CreateProposal: Future<Error=E>,
{
self.propose(context, sending)?;
self.prepare(context, sending);
@@ -413,7 +413,7 @@ impl<C: Context> Strategy<C> {
}
fn propose(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
-> Result<(), <C::Proposal as Future>::Error>
-> Result<(), <C::CreateProposal as Future>::Error>
{
if let LocalState::Start = self.local_state {
let mut propose = false;
@@ -629,7 +629,7 @@ impl<C, I, O, E> Future for Agreement<C, I, O>
where
C: Context,
C::RoundTimeout: Future<Error=E>,
C::Proposal: Future<Error=E>,
C::CreateProposal: Future<Error=E>,
I: Stream<Item=ContextCommunication<C>,Error=E>,
O: Sink<SinkItem=ContextCommunication<C>,SinkError=E>,
E: From<InputStreamConcluded>,
@@ -699,8 +699,15 @@ impl<C, I, O, E> Future for Agreement<C, I, O>
/// conclude without having witnessed the conclusion.
/// In general, this future should be pre-empted by the import of a justification
/// set for this block height.
pub fn agree<C: Context, I, O>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
pub fn agree<C: Context, I, O, E>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
-> Agreement<C, I, O>
where
C: Context,
C::RoundTimeout: Future<Error=E>,
C::CreateProposal: Future<Error=E>,
I: Stream<Item=ContextCommunication<C>,Error=E>,
O: Sink<SinkItem=ContextCommunication<C>,SinkError=E>,
E: From<InputStreamConcluded>,
{
let strategy = Strategy::create(&context, nodes, max_faulty);
Agreement {
+18 -80
View File
@@ -18,11 +18,13 @@
use super::*;
use tests::Network;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::prelude::*;
use futures::sync::{oneshot, mpsc};
use futures::sync::oneshot;
use futures::future::FutureResult;
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
@@ -32,10 +34,10 @@ struct Candidate(usize);
struct Digest(usize);
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct ValidatorId(usize);
struct AuthorityId(usize);
#[derive(Debug, PartialEq, Eq, Clone)]
struct Signature(Message<Candidate, Digest>, ValidatorId);
struct Signature(Message<Candidate, Digest>, AuthorityId);
struct SharedContext {
node_count: usize,
@@ -87,13 +89,13 @@ impl SharedContext {
self.current_round += 1;
}
fn round_proposer(&self, round: usize) -> ValidatorId {
ValidatorId(round % self.node_count)
fn round_proposer(&self, round: usize) -> AuthorityId {
AuthorityId(round % self.node_count)
}
}
struct TestContext {
local_id: ValidatorId,
local_id: AuthorityId,
proposal: Mutex<usize>,
shared: Arc<Mutex<SharedContext>>,
}
@@ -101,16 +103,16 @@ struct TestContext {
impl Context for TestContext {
type Candidate = Candidate;
type Digest = Digest;
type ValidatorId = ValidatorId;
type AuthorityId = AuthorityId;
type Signature = Signature;
type RoundTimeout = Box<Future<Item=(), Error=Error>>;
type Proposal = FutureResult<Candidate, Error>;
type CreateProposal = FutureResult<Candidate, Error>;
fn local_id(&self) -> ValidatorId {
fn local_id(&self) -> AuthorityId {
self.local_id.clone()
}
fn proposal(&self) -> Self::Proposal {
fn proposal(&self) -> Self::CreateProposal {
let proposal = {
let mut p = self.proposal.lock().unwrap();
let x = *p;
@@ -126,7 +128,7 @@ impl Context for TestContext {
}
fn sign_local(&self, message: Message<Candidate, Digest>)
-> LocalizedMessage<Candidate, Digest, ValidatorId, Signature>
-> LocalizedMessage<Candidate, Digest, AuthorityId, Signature>
{
let signature = Signature(message.clone(), self.local_id.clone());
LocalizedMessage {
@@ -136,7 +138,7 @@ impl Context for TestContext {
}
}
fn round_proposer(&self, round: usize) -> ValidatorId {
fn round_proposer(&self, round: usize) -> AuthorityId {
self.shared.lock().unwrap().round_proposer(round)
}
@@ -149,70 +151,6 @@ impl Context for TestContext {
}
}
type Comm = ContextCommunication<TestContext>;
struct Network {
endpoints: Vec<mpsc::UnboundedSender<Comm>>,
input: mpsc::UnboundedReceiver<(usize, Comm)>,
}
impl Network {
fn new(nodes: usize)
-> (Network, Vec<mpsc::UnboundedSender<(usize, Comm)>>, Vec<mpsc::UnboundedReceiver<Comm>>)
{
let mut inputs = Vec::with_capacity(nodes);
let mut outputs = Vec::with_capacity(nodes);
let mut endpoints = Vec::with_capacity(nodes);
let (in_tx, in_rx) = mpsc::unbounded();
for _ in 0..nodes {
let (out_tx, out_rx) = mpsc::unbounded();
inputs.push(in_tx.clone());
outputs.push(out_rx);
endpoints.push(out_tx);
}
let network = Network {
endpoints,
input: in_rx,
};
(network, inputs, outputs)
}
fn route_on_thread(self) {
::std::thread::spawn(move || { let _ = self.wait(); });
}
}
impl Future for Network {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
match self.input.poll() {
Err(_) => Err(Error),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(Async::Ready(())),
Ok(Async::Ready(Some((sender, item)))) => {
{
let receiving_endpoints = self.endpoints
.iter()
.enumerate()
.filter(|&(i, _)| i != sender)
.map(|(_, x)| x);
for endpoint in receiving_endpoints {
let _ = endpoint.unbounded_send(item.clone());
}
}
self.poll()
}
}
}
}
fn timeout_in(t: Duration) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
::std::thread::spawn(move || {
@@ -240,7 +178,7 @@ fn consensus_completes_with_minimum_good() {
.enumerate()
.map(|(i, (tx, rx))| {
let ctx = TestContext {
local_id: ValidatorId(i),
local_id: AuthorityId(i),
proposal: Mutex::new(i),
shared: shared_context.clone(),
};
@@ -296,7 +234,7 @@ fn consensus_does_not_complete_without_enough_nodes() {
.enumerate()
.map(|(i, (tx, rx))| {
let ctx = TestContext {
local_id: ValidatorId(i),
local_id: AuthorityId(i),
proposal: Mutex::new(i),
shared: shared_context.clone(),
};
@@ -335,7 +273,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
round_number: locked_round,
digest: locked_digest.clone(),
signatures: (0..7)
.map(|i| Signature(Message::Prepare(locked_round, locked_digest.clone()), ValidatorId(i)))
.map(|i| Signature(Message::Prepare(locked_round, locked_digest.clone()), AuthorityId(i)))
.collect()
}.check(7, |_, _, s| Some(s.1.clone())).unwrap();
@@ -352,7 +290,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
.enumerate()
.map(|(i, (tx, rx))| {
let ctx = TestContext {
local_id: ValidatorId(i),
local_id: AuthorityId(i),
proposal: Mutex::new(i),
shared: shared_context.clone(),
};
@@ -0,0 +1,214 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A stream that handles incoming messages to the BFT agreement module and statement
//! table. It forwards as necessary, and dispatches requests for determining availability
//! and validity of candidates as necessary.
use std::collections::HashSet;
use futures::prelude::*;
use futures::stream::{Fuse, FuturesUnordered};
use futures::sync::mpsc;
use table::{self, Statement, Context as TableContext};
use super::{Context, CheckedMessage, SharedTable, TypeResolve};
enum CheckResult {
Available,
Unavailable,
Valid,
Invalid,
}
enum Checking<D, A, V> {
Availability(D, A),
Validity(D, V),
}
impl<D, A, V, E> Future for Checking<D, A, V>
where
D: Clone,
A: Future<Item=bool,Error=E>,
V: Future<Item=bool,Error=E>,
{
type Item = (D, CheckResult);
type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(match *self {
Checking::Availability(ref digest, ref mut f) => {
match try_ready!(f.poll()) {
true => (digest.clone(), CheckResult::Available),
false => (digest.clone(), CheckResult::Unavailable),
}
}
Checking::Validity(ref digest, ref mut f) => {
match try_ready!(f.poll()) {
true => (digest.clone(), CheckResult::Valid),
false => (digest.clone(), CheckResult::Invalid),
}
}
}))
}
}
/// Handles incoming messages to the BFT service and statement table.
///
/// Also triggers requests for determining validity and availability of other
/// parachain candidates.
pub struct HandleIncoming<C: Context, I> {
table: SharedTable<C>,
messages_in: Fuse<I>,
bft_out: mpsc::UnboundedSender<<C as TypeResolve>::BftCommunication>,
local_id: C::AuthorityId,
requesting_about: FuturesUnordered<Checking<
C::Digest,
<C::CheckAvailability as IntoFuture>::Future,
<C::CheckCandidate as IntoFuture>::Future,
>>,
checked_validity: HashSet<C::Digest>,
checked_availability: HashSet<C::Digest>,
}
impl<C: Context, I> HandleIncoming<C, I> {
fn sign_and_import_statement(&self, digest: C::Digest, result: CheckResult) {
let statement = match result {
CheckResult::Valid => Statement::Valid(digest),
CheckResult::Invalid => Statement::Invalid(digest),
CheckResult::Available => Statement::Available(digest),
CheckResult::Unavailable => return, // no such statement and not provable.
};
// TODO: trigger broadcast to peers immediately?
self.table.sign_and_import(statement);
}
fn import_message(&mut self, origin: C::AuthorityId, message: CheckedMessage<C>) {
match message {
CheckedMessage::Bft(msg) => { let _ = self.bft_out.unbounded_send(msg); }
CheckedMessage::Table(table_messages) => {
// import all table messages and check for any that we
// need to produce statements for.
let msg_iter = table_messages
.into_iter()
.map(|m| (m, Some(origin.clone())));
let summaries: Vec<_> = self.table.import_statements(msg_iter);
for summary in summaries {
self.dispatch_on_summary(summary)
}
}
}
}
// on new candidates in our group, begin checking validity.
// on new candidates in our availability sphere, begin checking availability.
fn dispatch_on_summary(&mut self, summary: table::Summary<C::Digest, C::GroupId>) {
let is_validity_member =
self.table.context().is_member_of(&self.local_id, &summary.group_id);
let is_availability_member =
self.table.context().is_availability_guarantor_of(&self.local_id, &summary.group_id);
let digest = &summary.candidate;
// TODO: consider a strategy based on the number of candidate votes as well.
let checking_validity =
is_validity_member &&
self.checked_validity.insert(digest.clone()) &&
self.table.proposed_digest() != Some(digest.clone());
let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone());
if checking_validity || checking_availability {
let context = &*self.table.context();
let requesting_about = &mut self.requesting_about;
self.table.with_candidate(digest, |c| match c {
None => {} // TODO: handle table inconsistency somehow?
Some(candidate) => {
if checking_validity {
let future = context.check_validity(candidate).into_future();
let checking = Checking::Validity(digest.clone(), future);
requesting_about.push(checking);
}
if checking_availability {
let future = context.check_availability(candidate).into_future();
let checking = Checking::Availability(digest.clone(), future);
requesting_about.push(checking);
}
}
})
}
}
}
impl<C, I, E> HandleIncoming<C, I>
where
C: Context,
I: Stream<Item=(C::AuthorityId, CheckedMessage<C>),Error=E>,
C::CheckAvailability: IntoFuture<Error=E>,
C::CheckCandidate: IntoFuture<Error=E>,
{
pub fn new(
table: SharedTable<C>,
messages_in: I,
bft_out: mpsc::UnboundedSender<<C as TypeResolve>::BftCommunication>,
) -> Self {
let local_id = table.context().local_id();
HandleIncoming {
table,
bft_out,
local_id,
messages_in: messages_in.fuse(),
requesting_about: FuturesUnordered::new(),
checked_validity: HashSet::new(),
checked_availability: HashSet::new(),
}
}
}
impl<C, I, E> Future for HandleIncoming<C, I>
where
C: Context,
I: Stream<Item=(C::AuthorityId, CheckedMessage<C>),Error=E>,
C::CheckAvailability: IntoFuture<Error=E>,
C::CheckCandidate: IntoFuture<Error=E>,
{
type Item = ();
type Error = E;
fn poll(&mut self) -> Poll<(), E> {
loop {
// FuturesUnordered is safe to poll after it has completed.
while let Async::Ready(Some((d, r))) = self.requesting_about.poll()? {
self.sign_and_import_statement(d, r);
}
match try_ready!(self.messages_in.poll()) {
None => if self.requesting_about.is_empty() {
return Ok(Async::Ready(()))
} else {
return Ok(Async::NotReady)
},
Some((origin, msg)) => self.import_message(origin, msg),
}
}
}
}
+597 -8
View File
@@ -16,21 +16,610 @@
//! Propagation and agreement of candidates.
//!
//! Validators are split into groups by parachain, and each validator might come
//! up its own candidate for their parachain. Within groups, validators pass around
//! Authorities are split into groups by parachain, and each authority might come
//! up its own candidate for their parachain. Within groups, authorities pass around
//! their candidates and produce statements of validity.
//!
//! Any candidate that receives majority approval by the validators in a group
//! may be subject to inclusion, unless any validators flag that candidate as invalid.
//! Any candidate that receives majority approval by the authorities in a group
//! may be subject to inclusion, unless any authorities flag that candidate as invalid.
//!
//! Wrongly flagging as invalid should be strongly disincentivized, so that in the
//! equilibrium state it is not expected to happen. Likewise with the submission
//! of invalid blocks.
//!
//! Groups themselves may be compromised by malicious validators.
//! Groups themselves may be compromised by malicious authorities.
#[macro_use]
extern crate futures;
extern crate polkadot_primitives as primitives;
extern crate parking_lot;
extern crate tokio_timer;
pub mod bft;
pub mod table;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
use futures::prelude::*;
use futures::sync::{mpsc, oneshot};
use parking_lot::Mutex;
use tokio_timer::Timer;
use table::Table;
mod bft;
mod handle_incoming;
mod round_robin;
mod table;
#[cfg(test)]
pub mod tests;
/// Context necessary for agreement.
pub trait Context: Send + Clone {
/// A authority ID
type AuthorityId: Debug + Hash + Eq + Clone + Ord;
/// The digest (hash or other unique attribute) of a candidate.
type Digest: Debug + Hash + Eq + Clone;
/// The group ID type
type GroupId: Debug + Hash + Ord + Eq + Clone;
/// A signature type.
type Signature: Debug + Eq + Clone;
/// Candidate type. In practice this will be a candidate receipt.
type ParachainCandidate: Debug + Ord + Eq + Clone;
/// The actual block proposal type. This is what is agreed upon, and
/// is composed of multiple candidates.
type Proposal: Debug + Eq + Clone;
/// A future that resolves when a candidate is checked for validity.
///
/// In Polkadot, this will involve fetching the corresponding block data,
/// producing the necessary ingress, and running the parachain validity function.
type CheckCandidate: IntoFuture<Item=bool>;
/// A future that resolves when availability of a candidate's external
/// data is checked.
type CheckAvailability: IntoFuture<Item=bool>;
/// The statement batch type.
type StatementBatch: StatementBatch<
Self::AuthorityId,
table::SignedStatement<Self::ParachainCandidate, Self::Digest, Self::AuthorityId, Self::Signature>,
>;
/// Get the digest of a candidate.
fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest;
/// Get the digest of a proposal.
fn proposal_digest(proposal: &Self::Proposal) -> Self::Digest;
/// Get the group of a candidate.
fn candidate_group(candidate: &Self::ParachainCandidate) -> Self::GroupId;
/// Get the primary for a given round.
fn round_proposer(&self, round: usize) -> Self::AuthorityId;
/// Check a candidate for validity.
fn check_validity(&self, candidate: &Self::ParachainCandidate) -> Self::CheckCandidate;
/// Check availability of candidate data.
fn check_availability(&self, candidate: &Self::ParachainCandidate) -> Self::CheckAvailability;
/// Attempt to combine a set of parachain candidates into a proposal.
///
/// This may arbitrarily return `None`, but the intent is for `Some`
/// to only be returned when candidates from enough groups are known.
///
/// "enough" may be subjective as well.
fn create_proposal(&self, candidates: Vec<&Self::ParachainCandidate>)
-> Option<Self::Proposal>;
/// Check validity of a proposal. This should call out to the `check_candidate`
/// function for all parachain candidates contained within it, as well as
/// checking other validity constraints of the proposal.
fn proposal_valid<F>(&self, proposal: &Self::Proposal, check_candidate: F) -> bool
where F: FnMut(&Self::ParachainCandidate) -> bool;
/// Get the local authority ID.
fn local_id(&self) -> Self::AuthorityId;
/// Sign a table validity statement with the local key.
fn sign_table_statement(
&self,
statement: &table::Statement<Self::ParachainCandidate, Self::Digest>
) -> Self::Signature;
/// Sign a BFT agreement message.
fn sign_bft_message(&self, &bft::Message<Self::Proposal, Self::Digest>) -> Self::Signature;
}
/// Helper for type resolution for contexts until type aliases apply bounds.
pub trait TypeResolve {
type SignedTableStatement;
type BftCommunication;
type BftCommitted;
type Misbehavior;
}
impl<C: Context> TypeResolve for C {
type SignedTableStatement = table::SignedStatement<C::ParachainCandidate, C::Digest, C::AuthorityId, C::Signature>;
type BftCommunication = bft::Communication<C::Proposal, C::Digest, C::AuthorityId, C::Signature>;
type BftCommitted = bft::Committed<C::Proposal,C::Digest,C::Signature>;
type Misbehavior = table::Misbehavior<C::ParachainCandidate, C::Digest, C::AuthorityId, C::Signature>;
}
/// Information about a specific group.
#[derive(Debug, Clone)]
pub struct GroupInfo<V: Hash + Eq> {
/// Authorities meant to check validity of candidates.
pub validity_guarantors: HashSet<V>,
/// Authorities meant to check availability of candidate data.
pub availability_guarantors: HashSet<V>,
/// Number of votes needed for validity.
pub needed_validity: usize,
/// Number of votes needed for availability.
pub needed_availability: usize,
}
struct TableContext<C: Context> {
context: C,
groups: HashMap<C::GroupId, GroupInfo<C::AuthorityId>>,
}
impl<C: Context> ::std::ops::Deref for TableContext<C> {
type Target = C;
fn deref(&self) -> &C {
&self.context
}
}
impl<C: Context> table::Context for TableContext<C> {
type AuthorityId = C::AuthorityId;
type Digest = C::Digest;
type GroupId = C::GroupId;
type Signature = C::Signature;
type Candidate = C::ParachainCandidate;
fn candidate_digest(candidate: &Self::Candidate) -> Self::Digest {
C::candidate_digest(candidate)
}
fn candidate_group(candidate: &Self::Candidate) -> Self::GroupId {
C::candidate_group(candidate)
}
fn is_member_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool {
self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority))
}
fn is_availability_guarantor_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool {
self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority))
}
fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize) {
self.groups.get(group).map_or(
(usize::max_value(), usize::max_value()),
|g| (g.needed_validity, g.needed_availability),
)
}
}
// A shared table object.
struct SharedTableInner<C: Context> {
table: Table<TableContext<C>>,
proposed_digest: Option<C::Digest>,
awaiting_proposal: Vec<oneshot::Sender<C::Proposal>>,
}
impl<C: Context> SharedTableInner<C> {
fn import_statement(
&mut self,
context: &TableContext<C>,
statement: <C as TypeResolve>::SignedTableStatement,
received_from: Option<C::AuthorityId>
) -> Option<table::Summary<C::Digest, C::GroupId>> {
self.table.import_statement(context, statement, received_from)
}
fn update_proposal(&mut self, context: &TableContext<C>) {
if self.awaiting_proposal.is_empty() { return }
let proposal_candidates = self.table.proposed_candidates(context);
if let Some(proposal) = context.context.create_proposal(proposal_candidates) {
for sender in self.awaiting_proposal.drain(..) {
let _ = sender.send(proposal.clone());
}
}
}
fn get_proposal(&mut self, context: &TableContext<C>) -> oneshot::Receiver<C::Proposal> {
let (tx, rx) = oneshot::channel();
self.awaiting_proposal.push(tx);
self.update_proposal(context);
rx
}
fn proposal_valid(&mut self, context: &TableContext<C>, proposal: &C::Proposal) -> bool {
context.context.proposal_valid(proposal, |contained_candidate| {
// check that the candidate is valid (has enough votes)
let digest = C::candidate_digest(contained_candidate);
self.table.candidate_includable(&digest, context)
})
}
}
/// A shared table object.
pub struct SharedTable<C: Context> {
context: Arc<TableContext<C>>,
inner: Arc<Mutex<SharedTableInner<C>>>,
}
impl<C: Context> Clone for SharedTable<C> {
fn clone(&self) -> Self {
SharedTable {
context: self.context.clone(),
inner: self.inner.clone()
}
}
}
impl<C: Context> SharedTable<C> {
/// Create a new shared table.
pub fn new(context: C, groups: HashMap<C::GroupId, GroupInfo<C::AuthorityId>>) -> Self {
SharedTable {
context: Arc::new(TableContext { context, groups }),
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
awaiting_proposal: Vec::new(),
proposed_digest: None,
}))
}
}
/// Import a single statement.
pub fn import_statement(
&self,
statement: <C as TypeResolve>::SignedTableStatement,
received_from: Option<C::AuthorityId>,
) -> Option<table::Summary<C::Digest, C::GroupId>> {
self.inner.lock().import_statement(&*self.context, statement, received_from)
}
/// Sign and import a local statement.
pub fn sign_and_import(
&self,
statement: table::Statement<C::ParachainCandidate, C::Digest>,
) -> Option<table::Summary<C::Digest, C::GroupId>> {
let proposed_digest = match statement {
table::Statement::Candidate(ref c) => Some(C::candidate_digest(c)),
_ => None,
};
let signed_statement = table::SignedStatement {
signature: self.context.sign_table_statement(&statement),
sender: self.context.local_id(),
statement,
};
let mut inner = self.inner.lock();
if proposed_digest.is_some() {
inner.proposed_digest = proposed_digest;
}
inner.import_statement(&*self.context, signed_statement, None)
}
/// Import many statements at once.
///
/// Provide an iterator yielding pairs of (statement, received_from).
pub fn import_statements<I, U>(&self, iterable: I) -> U
where
I: IntoIterator<Item=(<C as TypeResolve>::SignedTableStatement, Option<C::AuthorityId>)>,
U: ::std::iter::FromIterator<table::Summary<C::Digest, C::GroupId>>,
{
let mut inner = self.inner.lock();
iterable.into_iter().filter_map(move |(statement, received_from)| {
inner.import_statement(&*self.context, statement, received_from)
}).collect()
}
/// Update the proposal sealing.
pub fn update_proposal(&self) {
self.inner.lock().update_proposal(&*self.context)
}
/// Register interest in receiving a proposal when ready.
/// If one is ready immediately, it will be provided.
pub fn get_proposal(&self) -> oneshot::Receiver<C::Proposal> {
self.inner.lock().get_proposal(&*self.context)
}
/// Check if a proposal is valid.
pub fn proposal_valid(&self, proposal: &C::Proposal) -> bool {
self.inner.lock().proposal_valid(&*self.context, proposal)
}
/// Execute a closure using a specific candidate.
///
/// Deadlocks if called recursively.
pub fn with_candidate<F, U>(&self, digest: &C::Digest, f: F) -> U
where F: FnOnce(Option<&C::ParachainCandidate>) -> U
{
let inner = self.inner.lock();
f(inner.table.get_candidate(digest))
}
/// Get all witnessed misbehavior.
pub fn get_misbehavior(&self) -> HashMap<C::AuthorityId, <C as TypeResolve>::Misbehavior> {
self.inner.lock().table.get_misbehavior().clone()
}
/// Fill a statement batch.
pub fn fill_batch(&self, batch: &mut C::StatementBatch) {
self.inner.lock().table.fill_batch(batch);
}
/// Get the local proposed candidate digest.
pub fn proposed_digest(&self) -> Option<C::Digest> {
self.inner.lock().proposed_digest.clone()
}
// Get a handle to the table context.
fn context(&self) -> &TableContext<C> {
&*self.context
}
}
/// Errors that can occur during agreement.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Error {
IoTerminated,
FaultyTimer,
CannotPropose,
}
impl From<bft::InputStreamConcluded> for Error {
fn from(_: bft::InputStreamConcluded) -> Error {
Error::IoTerminated
}
}
/// Context owned by the BFT future necessary to execute the logic.
pub struct BftContext<C: Context> {
context: C,
table: SharedTable<C>,
timer: Timer,
round_timeout_multiplier: u64,
}
impl<C: Context> bft::Context for BftContext<C>
where C::Proposal: 'static,
{
type AuthorityId = C::AuthorityId;
type Digest = C::Digest;
type Signature = C::Signature;
type Candidate = C::Proposal;
type RoundTimeout = Box<Future<Item=(),Error=Error>>;
type CreateProposal = Box<Future<Item=Self::Candidate,Error=Error>>;
fn local_id(&self) -> Self::AuthorityId {
self.context.local_id()
}
fn proposal(&self) -> Self::CreateProposal {
Box::new(self.table.get_proposal().map_err(|_| Error::CannotPropose))
}
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest {
C::proposal_digest(candidate)
}
fn sign_local(&self, message: bft::Message<Self::Candidate, Self::Digest>)
-> bft::LocalizedMessage<Self::Candidate, Self::Digest, Self::AuthorityId, Self::Signature>
{
let sender = self.local_id();
let signature = self.context.sign_bft_message(&message);
bft::LocalizedMessage {
message,
sender,
signature,
}
}
fn round_proposer(&self, round: usize) -> Self::AuthorityId {
self.context.round_proposer(round)
}
fn candidate_valid(&self, proposal: &Self::Candidate) -> bool {
self.table.proposal_valid(proposal)
}
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
let round = ::std::cmp::min(63, round) as u32;
let timeout = 1u64.checked_shl(round)
.unwrap_or_else(u64::max_value)
.saturating_mul(self.round_timeout_multiplier);
Box::new(self.timer.sleep(Duration::from_secs(timeout))
.map_err(|_| Error::FaultyTimer))
}
}
/// Parameters necessary for agreement.
pub struct AgreementParams<C: Context> {
/// The context itself.
pub context: C,
/// For scheduling timeouts.
pub timer: Timer,
/// The statement table.
pub table: SharedTable<C>,
/// The number of nodes.
pub nodes: usize,
/// The maximum number of faulty nodes.
pub max_faulty: usize,
/// The round timeout multiplier: 2^round_number is multiplied by this.
pub round_timeout_multiplier: u64,
/// The maximum amount of messages to queue.
pub message_buffer_size: usize,
/// Interval to attempt forming proposals over.
pub form_proposal_interval: Duration,
}
/// Recovery for messages
pub trait MessageRecovery<C: Context> {
/// The unchecked message type. This implies that work hasn't been done
/// to decode the payload and check and authenticate a signature.
type UncheckedMessage;
/// Attempt to transform a checked message into an unchecked.
fn check_message(&self, Self::UncheckedMessage) -> Option<CheckedMessage<C>>;
}
/// A batch of statements to send out.
pub trait StatementBatch<V, T> {
/// Get the target authorities of these statements.
fn targets(&self) -> &[V];
/// If the batch is empty.
fn is_empty(&self) -> bool;
/// Push a statement onto the batch. Returns false when the batch is full.
///
/// This is meant to do work like incrementally serializing the statements
/// into a vector of bytes while making sure the length is below a certain
/// amount.
fn push(&mut self, statement: T) -> bool;
}
/// Recovered and fully checked messages.
pub enum CheckedMessage<C: Context> {
/// Messages meant for the BFT agreement logic.
Bft(<C as TypeResolve>::BftCommunication),
/// Statements circulating about the table.
Table(Vec<<C as TypeResolve>::SignedTableStatement>),
}
/// Outgoing messages to the network.
#[derive(Debug, Clone)]
pub enum OutgoingMessage<C: Context> {
/// Messages meant for BFT agreement peers.
Bft(<C as TypeResolve>::BftCommunication),
/// Batches of table statements.
Table(C::StatementBatch),
}
/// Create an agreement future, and I/O streams.
// TODO: kill 'static bounds and use impl Future.
pub fn agree<
Context,
NetIn,
NetOut,
Recovery,
PropagateStatements,
LocalCandidate,
Err,
>(
params: AgreementParams<Context>,
net_in: NetIn,
net_out: NetOut,
recovery: Recovery,
propagate_statements: PropagateStatements,
local_candidate: LocalCandidate,
)
-> Box<Future<Item=<Context as TypeResolve>::BftCommitted,Error=Error>>
where
Context: ::Context + 'static,
Context::CheckCandidate: IntoFuture<Error=Err>,
Context::CheckAvailability: IntoFuture<Error=Err>,
NetIn: Stream<Item=(Context::AuthorityId, Vec<Recovery::UncheckedMessage>),Error=Err> + 'static,
NetOut: Sink<SinkItem=OutgoingMessage<Context>> + 'static,
Recovery: MessageRecovery<Context> + 'static,
PropagateStatements: Stream<Item=Context::StatementBatch,Error=Err> + 'static,
LocalCandidate: IntoFuture<Item=Context::ParachainCandidate> + 'static
{
let (bft_in_in, bft_in_out) = mpsc::unbounded();
let (bft_out_in, bft_out_out) = mpsc::unbounded();
let agreement = {
let bft_context = BftContext {
context: params.context,
table: params.table.clone(),
timer: params.timer.clone(),
round_timeout_multiplier: params.round_timeout_multiplier,
};
bft::agree(
bft_context,
params.nodes,
params.max_faulty,
bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated),
bft_out_in.sink_map_err(|_| Error::IoTerminated),
)
};
let route_messages_in = {
let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size);
let round_robin_recovered = round_robin
.filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x)));
handle_incoming::HandleIncoming::new(
params.table.clone(),
round_robin_recovered,
bft_in_in,
).map_err(|_| Error::IoTerminated)
};
let route_messages_out = {
let table = params.table.clone();
let periodic_table_statements = propagate_statements
.or_else(|_| ::futures::future::empty()) // halt the stream instead of error.
.map(move |mut batch| { table.fill_batch(&mut batch); batch })
.filter(|b| !b.is_empty())
.map(OutgoingMessage::Table);
let complete_out_stream = bft_out_out
.map_err(|_| Error::IoTerminated)
.map(|bft::ContextCommunication(x)| x)
.map(OutgoingMessage::Bft)
.select(periodic_table_statements);
net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream)
};
let import_local_candidate = {
let table = params.table.clone();
local_candidate
.into_future()
.map(table::Statement::Candidate)
.map(Some)
.or_else(|_| Ok(None))
.map(move |s| if let Some(s) = s {
table.sign_and_import(s);
})
};
let create_proposal_on_interval = {
let table = params.table;
params.timer.interval(params.form_proposal_interval)
.map_err(|_| Error::FaultyTimer)
.for_each(move |_| { table.update_proposal(); Ok(()) })
};
// if these auxiliary futures terminate before the agreement, then
// that is an error.
let auxiliary_futures = route_messages_in.join4(
create_proposal_on_interval,
route_messages_out,
import_local_candidate,
).and_then(|_| Err(Error::IoTerminated));
let future = agreement
.select(auxiliary_futures)
.map(|(committed, _)| committed)
.map_err(|(e, _)| e);
Box::new(future)
}
@@ -0,0 +1,164 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Round-robin buffer for incoming messages.
//!
//! This takes batches of messages associated with a sender as input,
//! and yields messages in a fair order by sender.
use std::collections::{Bound, BTreeMap, VecDeque};
use futures::prelude::*;
use futures::stream::Fuse;
/// Implementation of the round-robin buffer for incoming messages.
#[derive(Debug)]
pub struct RoundRobinBuffer<V: Ord + Eq, S, M> {
buffer: BTreeMap<V, VecDeque<M>>,
last_processed_from: Option<V>,
stored_messages: usize,
max_messages: usize,
inner: Fuse<S>,
}
impl<V: Ord + Eq + Clone, S: Stream, M> RoundRobinBuffer<V, S, M> {
/// Create a new round-robin buffer which holds up to a maximum
/// amount of messages.
pub fn new(stream: S, buffer_size: usize) -> Self {
RoundRobinBuffer {
buffer: BTreeMap::new(),
last_processed_from: None,
stored_messages: 0,
max_messages: buffer_size,
inner: stream.fuse(),
}
}
}
impl<V: Ord + Eq + Clone, S, M> RoundRobinBuffer<V, S, M> {
fn next_message(&mut self) -> Option<(V, M)> {
if self.stored_messages == 0 {
return None
}
// first pick up from the last authority we processed a message from
let mut next = {
let lower_bound = match self.last_processed_from {
None => Bound::Unbounded,
Some(ref x) => Bound::Excluded(x.clone()),
};
self.buffer.range_mut((lower_bound, Bound::Unbounded))
.filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v)))
.next()
};
// but wrap around to the beginning again if we got nothing.
if next.is_none() {
next = self.buffer.iter_mut()
.filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v)))
.next();
}
if let Some((ref authority, _)) = next {
self.stored_messages -= 1;
self.last_processed_from = Some(authority.clone());
}
next
}
// import messages, discarding when the buffer is full.
fn import_messages(&mut self, sender: V, messages: Vec<M>) {
let space_remaining = self.max_messages - self.stored_messages;
self.stored_messages += ::std::cmp::min(space_remaining, messages.len());
let v = self.buffer.entry(sender).or_insert_with(VecDeque::new);
v.extend(messages.into_iter().take(space_remaining));
}
}
impl<V: Ord + Eq + Clone, S, M> Stream for RoundRobinBuffer<V, S, M>
where S: Stream<Item=(V, Vec<M>)>
{
type Item = (V, M);
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, S::Error> {
loop {
match self.inner.poll()? {
Async::NotReady | Async::Ready(None) => break,
Async::Ready(Some((sender, msgs))) => self.import_messages(sender, msgs),
}
}
let done = self.inner.is_done();
Ok(match self.next_message() {
Some(msg) => Async::Ready(Some(msg)),
None => if done { Async::Ready(None) } else { Async::NotReady },
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream::{self, Stream};
#[derive(Debug, PartialEq, Eq)]
struct UncheckedMessage { data: Vec<u8> }
#[test]
fn is_fair_and_wraps_around() {
let stream = stream::iter_ok(vec![
(1, vec![
UncheckedMessage { data: vec![1, 3, 5] },
UncheckedMessage { data: vec![3, 5, 7] },
UncheckedMessage { data: vec![5, 7, 9] },
]),
(2, vec![
UncheckedMessage { data: vec![2, 4, 6] },
UncheckedMessage { data: vec![4, 6, 8] },
UncheckedMessage { data: vec![6, 8, 10] },
]),
]);
let round_robin = RoundRobinBuffer::new(stream, 100);
let output = round_robin.wait().collect::<Result<Vec<_>, ()>>().unwrap();
assert_eq!(output, vec![
(1, UncheckedMessage { data: vec![1, 3, 5] }),
(2, UncheckedMessage { data: vec![2, 4, 6] }),
(1, UncheckedMessage { data: vec![3, 5, 7] }),
(2, UncheckedMessage { data: vec![4, 6, 8] }),
(1, UncheckedMessage { data: vec![5, 7, 9] }),
(2, UncheckedMessage { data: vec![6, 8, 10] }),
]);
}
#[test]
fn discards_when_full() {
let stream = stream::iter_ok(vec![
(1, (0..200).map(|i| UncheckedMessage { data: vec![i] }).collect())
]);
let round_robin = RoundRobinBuffer::new(stream, 100);
let output = round_robin.wait().collect::<Result<Vec<_>, ()>>().unwrap();
assert_eq!(output.len(), 100);
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,385 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Tests and test helpers for the candidate agreement.
const VALIDITY_CHECK_DELAY_MS: u64 = 100;
const AVAILABILITY_CHECK_DELAY_MS: u64 = 100;
const PROPOSAL_FORMATION_TICK_MS: u64 = 50;
const PROPAGATE_STATEMENTS_TICK_MS: u64 = 200;
const TIMER_TICK_DURATION_MS: u64 = 10;
use std::collections::HashMap;
use futures::prelude::*;
use futures::sync::mpsc;
use tokio_timer::Timer;
use super::*;
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)]
struct AuthorityId(usize);
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
struct Digest(Vec<usize>);
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
struct GroupId(usize);
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
struct ParachainCandidate {
group: GroupId,
data: usize,
}
#[derive(PartialEq, Eq, Debug, Clone)]
struct Proposal {
candidates: Vec<ParachainCandidate>,
}
#[derive(PartialEq, Eq, Debug, Clone)]
enum Signature {
Table(AuthorityId, table::Statement<ParachainCandidate, Digest>),
Bft(AuthorityId, bft::Message<Proposal, Digest>),
}
enum Error {
Timer(tokio_timer::TimerError),
NetOut,
NetIn,
}
#[derive(Debug, Clone)]
struct SharedTestContext {
n_authorities: usize,
n_groups: usize,
timer: Timer,
}
#[derive(Debug, Clone)]
struct TestContext {
shared: Arc<SharedTestContext>,
local_id: AuthorityId,
}
impl Context for TestContext {
type AuthorityId = AuthorityId;
type Digest = Digest;
type GroupId = GroupId;
type Signature = Signature;
type Proposal = Proposal;
type ParachainCandidate = ParachainCandidate;
type CheckCandidate = Box<Future<Item=bool,Error=Error>>;
type CheckAvailability = Box<Future<Item=bool,Error=Error>>;
type StatementBatch = VecBatch<
AuthorityId,
table::SignedStatement<ParachainCandidate, Digest, AuthorityId, Signature>
>;
fn candidate_digest(candidate: &ParachainCandidate) -> Digest {
Digest(vec![candidate.group.0, candidate.data])
}
fn proposal_digest(candidate: &Proposal) -> Digest {
Digest(candidate.candidates.iter().fold(Vec::new(), |mut a, c| {
a.extend(Self::candidate_digest(c).0);
a
}))
}
fn candidate_group(candidate: &ParachainCandidate) -> GroupId {
candidate.group.clone()
}
fn round_proposer(&self, round: usize) -> AuthorityId {
AuthorityId(round % self.shared.n_authorities)
}
fn check_validity(&self, _candidate: &ParachainCandidate) -> Self::CheckCandidate {
let future = self.shared.timer
.sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS))
.map_err(Error::Timer)
.map(|_| true);
Box::new(future)
}
fn check_availability(&self, _candidate: &ParachainCandidate) -> Self::CheckAvailability {
let future = self.shared.timer
.sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS))
.map_err(Error::Timer)
.map(|_| true);
Box::new(future)
}
fn create_proposal(&self, candidates: Vec<&ParachainCandidate>)
-> Option<Proposal>
{
let t = self.shared.n_groups * 2 / 3;
if candidates.len() >= t {
Some(Proposal {
candidates: candidates.iter().map(|x| (&**x).clone()).collect()
})
} else {
None
}
}
fn proposal_valid<F>(&self, proposal: &Proposal, check_candidate: F) -> bool
where F: FnMut(&ParachainCandidate) -> bool
{
if proposal.candidates.len() >= self.shared.n_groups * 2 / 3 {
proposal.candidates.iter().all(check_candidate)
} else {
false
}
}
fn local_id(&self) -> AuthorityId {
self.local_id.clone()
}
fn sign_table_statement(
&self,
statement: &table::Statement<ParachainCandidate, Digest>
) -> Signature {
Signature::Table(self.local_id(), statement.clone())
}
fn sign_bft_message(&self, message: &bft::Message<Proposal, Digest>) -> Signature {
Signature::Bft(self.local_id(), message.clone())
}
}
struct TestRecovery;
impl MessageRecovery<TestContext> for TestRecovery {
type UncheckedMessage = OutgoingMessage<TestContext>;
fn check_message(&self, msg: Self::UncheckedMessage) -> Option<CheckedMessage<TestContext>> {
Some(match msg {
OutgoingMessage::Bft(c) => CheckedMessage::Bft(c),
OutgoingMessage::Table(batch) => CheckedMessage::Table(batch.items),
})
}
}
pub struct Network<T> {
endpoints: Vec<mpsc::UnboundedSender<T>>,
input: mpsc::UnboundedReceiver<(usize, T)>,
}
impl<T: Clone + Send + 'static> Network<T> {
pub fn new(nodes: usize)
-> (Self, Vec<mpsc::UnboundedSender<(usize, T)>>, Vec<mpsc::UnboundedReceiver<T>>)
{
let mut inputs = Vec::with_capacity(nodes);
let mut outputs = Vec::with_capacity(nodes);
let mut endpoints = Vec::with_capacity(nodes);
let (in_tx, in_rx) = mpsc::unbounded();
for _ in 0..nodes {
let (out_tx, out_rx) = mpsc::unbounded();
inputs.push(in_tx.clone());
outputs.push(out_rx);
endpoints.push(out_tx);
}
let network = Network {
endpoints,
input: in_rx,
};
(network, inputs, outputs)
}
pub fn route_on_thread(self) {
::std::thread::spawn(move || { let _ = self.wait(); });
}
}
impl<T: Clone> Future for Network<T> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), Self::Error> {
match try_ready!(self.input.poll()) {
None => Ok(Async::Ready(())),
Some((sender, item)) => {
{
let receiving_endpoints = self.endpoints
.iter()
.enumerate()
.filter(|&(i, _)| i != sender)
.map(|(_, x)| x);
for endpoint in receiving_endpoints {
let _ = endpoint.unbounded_send(item.clone());
}
}
self.poll()
}
}
}
}
#[derive(Debug, Clone)]
pub struct VecBatch<V, T> {
pub max_len: usize,
pub targets: Vec<V>,
pub items: Vec<T>,
}
impl<V, T> ::StatementBatch<V, T> for VecBatch<V, T> {
fn targets(&self) -> &[V] { &self.targets }
fn is_empty(&self) -> bool { self.items.is_empty() }
fn push(&mut self, item: T) -> bool {
if self.items.len() == self.max_len {
false
} else {
self.items.push(item);
true
}
}
}
fn make_group_assignments(n_authorities: usize, n_groups: usize)
-> HashMap<GroupId, GroupInfo<AuthorityId>>
{
let mut map = HashMap::new();
let threshold = (n_authorities / n_groups) / 2;
let make_blank_group = || {
GroupInfo {
validity_guarantors: HashSet::new(),
availability_guarantors: HashSet::new(),
needed_validity: threshold,
needed_availability: threshold,
}
};
// every authority checks validity of his ID modulo n_groups and
// guarantees availability for the group above that.
for a_id in 0..n_authorities {
let primary_group = a_id % n_groups;
let availability_groups = [
(a_id + 1) % n_groups,
a_id.wrapping_sub(1) % n_groups,
];
map.entry(GroupId(primary_group))
.or_insert_with(&make_blank_group)
.validity_guarantors
.insert(AuthorityId(a_id));
for &availability_group in &availability_groups {
map.entry(GroupId(availability_group))
.or_insert_with(&make_blank_group)
.availability_guarantors
.insert(AuthorityId(a_id));
}
}
map
}
fn make_blank_batch<T>(n_authorities: usize) -> VecBatch<AuthorityId, T> {
VecBatch {
max_len: 20,
targets: (0..n_authorities).map(AuthorityId).collect(),
items: Vec::new(),
}
}
#[test]
fn consensus_completes_with_minimum_good() {
let n = 50;
let f = 16;
let n_groups = 10;
let timer = ::tokio_timer::wheel()
.tick_duration(Duration::from_millis(TIMER_TICK_DURATION_MS))
.num_slots(1 << 16)
.build();
let (network, inputs, outputs) = Network::<(AuthorityId, OutgoingMessage<TestContext>)>::new(n - f);
network.route_on_thread();
let shared_test_context = Arc::new(SharedTestContext {
n_authorities: n,
n_groups: n_groups,
timer: timer.clone(),
});
let groups = make_group_assignments(n, n_groups);
let authorities = inputs.into_iter().zip(outputs).enumerate().map(|(raw_id, (input, output))| {
let id = AuthorityId(raw_id);
let context = TestContext {
shared: shared_test_context.clone(),
local_id: id,
};
let shared_table = SharedTable::new(context.clone(), groups.clone());
let params = AgreementParams {
context,
timer: timer.clone(),
table: shared_table,
nodes: n,
max_faulty: f,
round_timeout_multiplier: 4,
message_buffer_size: 100,
form_proposal_interval: Duration::from_millis(PROPOSAL_FORMATION_TICK_MS),
};
let net_out = input
.sink_map_err(|_| Error::NetOut)
.with(move |x| Ok::<_, Error>((id.0, (id, x))) );
let net_in = output
.map_err(|_| Error::NetIn)
.map(move |(v, msg)| (v, vec![msg]));
let propagate_statements = timer
.interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS))
.map(move |()| make_blank_batch(n))
.map_err(Error::Timer);
let local_candidate = if raw_id < n_groups {
let candidate = ParachainCandidate {
group: GroupId(raw_id),
data: raw_id,
};
::futures::future::Either::A(Ok::<_, Error>(candidate).into_future())
} else {
::futures::future::Either::B(::futures::future::empty())
};
agree::<_, _, _, _, _, _, Error>(
params,
net_in,
net_out,
TestRecovery,
propagate_statements,
local_candidate
)
}).collect::<Vec<_>>();
futures::future::join_all(authorities).wait().unwrap();
}