From 2fbc256b66e2a50998f8b7ae017b947c16e8bdac Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 8 Jan 2018 16:36:35 +0100 Subject: [PATCH] instantiate the agreement future --- substrate/candidate-agreement/src/lib.rs | 193 +++++++++++++++------ substrate/candidate-agreement/src/table.rs | 8 +- 2 files changed, 144 insertions(+), 57 deletions(-) diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index 2553fd080c..4a75f5b36d 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -39,6 +39,7 @@ use std::hash::Hash; use std::sync::Arc; use futures::prelude::*; +use futures::sync::{mpsc, oneshot}; use parking_lot::Mutex; use tokio_timer::Timer; @@ -179,37 +180,11 @@ impl table::Context for TableContext { } } -struct BftContext { - context: C, - table_context: TableContext, - table: Arc>>>, - timer: Timer, - round_timeout_multiplier: u64, -} - -/// Parameters necessary for agreement. -pub struct AgreementParams { - /// The context itself. - pub context: C, - /// For scheduling timeouts. - pub timer: Timer, - /// Group assignments. - pub groups: HashMap>, - /// The local candidate proposal. - // TODO: replace with future. - pub local_proposal: Option, - /// The number of nodes. - pub nodes: usize, - /// The maximum number of faulty nodes. - pub max_faulty: usize, - /// The round timeout multiplier: 2^round_number is multiplied by this. - pub round_timeout_multiplier: u64, -} - // A shared table object. struct SharedTableInner { context: TableContext, table: Table>, + awaiting_proposal: Vec>, } impl SharedTableInner { @@ -220,6 +195,31 @@ impl SharedTableInner { ) -> Option> { self.table.import_statement(&self.context, statement, received_from) } + + fn update_proposal(&mut self) { + if self.awaiting_proposal.is_empty() { return } + let proposal_candidates = self.table.proposed_candidates(&self.context); + if let Some(proposal) = self.context.context.create_proposal(proposal_candidates) { + for sender in self.awaiting_proposal.drain(..) { + let _ = sender.send(proposal.clone()); + } + } + } + + fn get_proposal(&mut self) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.awaiting_proposal.push(tx); + self.update_proposal(); + rx + } + + fn proposal_valid(&mut self, proposal: &C::Proposal) -> bool { + self.context.context.proposal_valid(proposal, |contained_candidate| { + // check that the candidate is valid (has enough votes) + let digest = C::candidate_digest(contained_candidate); + self.table.candidate_includable(&digest, &self.context) + }) + } } /// A shared table object. @@ -240,12 +240,13 @@ impl SharedTable { inner: Arc::new(Mutex::new(SharedTableInner { table: Table::default(), context: TableContext { context, groups }, + awaiting_proposal: Vec::new(), })) } } /// Import a single statement. - fn import_statement( + pub fn import_statement( &self, statement: ::SignedTableStatement, received_from: Option, @@ -256,33 +257,74 @@ impl SharedTable { /// Import many statements at once. /// /// Provide an iterator yielding pairs of (statement, received_from). - fn import_statements<'a, I: 'a>(&'a self, iterable: I) - -> Box> + 'a> - where I: IntoIterator::SignedTableStatement, Option)> + pub fn import_statements(&self, iterable: I) -> U + where + I: IntoIterator::SignedTableStatement, Option)>, + U: ::std::iter::FromIterator>, { let mut inner = self.inner.lock(); - let iter = iterable.into_iter().filter_map(move |(statement, received_from)| { - inner.import_statement(statement, received_from) - }); - Box::new(iter) + iterable.into_iter().filter_map(move |(statement, received_from)| { + inner.import_statement(statement, received_from) + }).collect() + } + + /// Update the proposal sealing. + pub fn update_proposal(&self) { + self.inner.lock().update_proposal() + } + + /// Register interest in receiving a proposal when ready. + /// If one is ready immediately, it will be provided. + pub fn get_proposal(&self) -> oneshot::Receiver { + self.inner.lock().get_proposal() + } + + /// Check if a proposal is valid. + pub fn proposal_valid(&self, proposal: &C::Proposal) -> bool { + self.inner.lock().proposal_valid(proposal) } } -impl bft::Context for BftContext { +/// Errors that can occur during agreement. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum Error { + IoTerminated, + FaultyTimer, + CannotPropose, +} + +impl From for Error { + fn from(_: bft::InputStreamConcluded) -> Error { + Error::IoTerminated + } +} + +/// Context owned by the BFT future necessary to execute the logic. +pub struct BftContext { + context: C, + table: SharedTable, + timer: Timer, + round_timeout_multiplier: u64, +} + +impl bft::Context for BftContext + where + C::Proposal: 'static, +{ type ValidatorId = C::ValidatorId; type Digest = C::Digest; type Signature = C::Signature; type Candidate = C::Proposal; - type RoundTimeout = tokio_timer::Sleep; - type CreateProposal = futures::future::Empty; + type RoundTimeout = Box>; + type CreateProposal = Box>; fn local_id(&self) -> Self::ValidatorId { self.context.local_id() } fn proposal(&self) -> Self::CreateProposal { - futures::future::empty() + Box::new(self.table.get_proposal().map_err(|_| Error::CannotPropose)) } fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest { @@ -306,17 +348,7 @@ impl bft::Context for BftContext { } fn candidate_valid(&self, proposal: &Self::Candidate) -> bool { - let mut table = self.table.lock(); - - self.context.proposal_valid(proposal, |contained_candidate| { - // check that the candidate is valid (has enough votes) - let digest = C::candidate_digest(contained_candidate); - table.candidate_includable(&digest, &self.table_context) - }); - - // also check that _enough_ candidates were included (perhaps according - // to a curve over time). - true + self.table.proposal_valid(proposal) } fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { @@ -325,11 +357,64 @@ impl bft::Context for BftContext { .unwrap_or_else(u64::max_value) .saturating_mul(self.round_timeout_multiplier); - self.timer.sleep(::std::time::Duration::from_secs(timeout)) + Box::new(self.timer.sleep(::std::time::Duration::from_secs(timeout)) + .map_err(|_| Error::FaultyTimer)) } } -/// Reach agreement with other validators on a new block. -pub fn agree(_params: AgreementParams) { - unimplemented!() +/// Parameters necessary for agreement. +pub struct AgreementParams { + /// The context itself. + pub context: C, + /// For scheduling timeouts. + pub timer: Timer, + /// The statement table. + pub table: SharedTable, + /// The number of nodes. + pub nodes: usize, + /// The maximum number of faulty nodes. + pub max_faulty: usize, + /// The round timeout multiplier: 2^round_number is multiplied by this. + pub round_timeout_multiplier: u64, +} + +/// Future and I/O to reach agreement. +pub struct Agreement { + /// The future holding the actual BFT logic. + pub bft: Box, + Error=Error, + >>, + /// The input sink. + pub input: mpsc::UnboundedSender>, + /// The output stream. + pub output: mpsc::UnboundedReceiver>, +} + +/// Create an agreement future, and I/O streams. +pub fn agree(params: AgreementParams) + -> Agreement> +{ + let (in_in, in_out) = mpsc::unbounded(); + let (out_in, out_out) = mpsc::unbounded(); + + let bft_context = BftContext { + context: params.context, + table: params.table, + timer: params.timer, + round_timeout_multiplier: params.round_timeout_multiplier, + }; + + let agreement = bft::agree( + bft_context, + params.nodes, + params.max_faulty, + in_out.map_err(|_| Error::IoTerminated), + out_in.sink_map_err(|_| Error::IoTerminated), + ); + Agreement { + bft: Box::new(agreement), + input: in_in, + output: out_out, + } } diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs index 456be773c6..d8b2898aef 100644 --- a/substrate/candidate-agreement/src/table.rs +++ b/substrate/candidate-agreement/src/table.rs @@ -280,7 +280,9 @@ impl Table { /// /// This will be at most one per group, consisting of the /// best candidate for each group with requisite votes for inclusion. - pub fn proposed_candidates(&self, context: &C) -> Vec { + /// + /// The vector is sorted in ascending order by group id. + pub fn proposed_candidates<'a>(&'a self, context: &C) -> Vec<&'a C::Candidate> { use std::collections::BTreeMap; use std::collections::btree_map::Entry as BTreeEntry; @@ -294,7 +296,7 @@ impl Table { match best_candidates.entry(group_id.clone()) { BTreeEntry::Occupied(mut occ) => { let candidate_ref = occ.get_mut(); - if *candidate_ref < candidate { + if *candidate_ref > candidate { *candidate_ref = candidate; } } @@ -302,7 +304,7 @@ impl Table { } } - best_candidates.values().map(|v| C::Candidate::clone(v)).collect::>() + best_candidates.values().cloned().collect::>() } /// Whether a candidate can be included.