instantiate the agreement future

This commit is contained in:
Robert Habermeier
2018-01-08 16:36:35 +01:00
parent 9ff2fa550f
commit 2fbc256b66
2 changed files with 144 additions and 57 deletions
+139 -54
View File
@@ -39,6 +39,7 @@ use std::hash::Hash;
use std::sync::Arc;
use futures::prelude::*;
use futures::sync::{mpsc, oneshot};
use parking_lot::Mutex;
use tokio_timer::Timer;
@@ -179,37 +180,11 @@ impl<C: Context> table::Context for TableContext<C> {
}
}
struct BftContext<C: Context> {
context: C,
table_context: TableContext<C>,
table: Arc<Mutex<Table<TableContext<C>>>>,
timer: Timer,
round_timeout_multiplier: u64,
}
/// Parameters necessary for agreement.
pub struct AgreementParams<C: Context> {
/// The context itself.
pub context: C,
/// For scheduling timeouts.
pub timer: Timer,
/// Group assignments.
pub groups: HashMap<C::GroupId, GroupInfo<C::ValidatorId>>,
/// The local candidate proposal.
// TODO: replace with future.
pub local_proposal: Option<C::ParachainCandidate>,
/// The number of nodes.
pub nodes: usize,
/// The maximum number of faulty nodes.
pub max_faulty: usize,
/// The round timeout multiplier: 2^round_number is multiplied by this.
pub round_timeout_multiplier: u64,
}
// A shared table object.
struct SharedTableInner<C: Context + Clone> {
context: TableContext<C>,
table: Table<TableContext<C>>,
awaiting_proposal: Vec<oneshot::Sender<C::Proposal>>,
}
impl<C: Context + Clone> SharedTableInner<C> {
@@ -220,6 +195,31 @@ impl<C: Context + Clone> SharedTableInner<C> {
) -> Option<table::Summary<C::Digest, C::GroupId>> {
self.table.import_statement(&self.context, statement, received_from)
}
fn update_proposal(&mut self) {
if self.awaiting_proposal.is_empty() { return }
let proposal_candidates = self.table.proposed_candidates(&self.context);
if let Some(proposal) = self.context.context.create_proposal(proposal_candidates) {
for sender in self.awaiting_proposal.drain(..) {
let _ = sender.send(proposal.clone());
}
}
}
fn get_proposal(&mut self) -> oneshot::Receiver<C::Proposal> {
let (tx, rx) = oneshot::channel();
self.awaiting_proposal.push(tx);
self.update_proposal();
rx
}
fn proposal_valid(&mut self, proposal: &C::Proposal) -> bool {
self.context.context.proposal_valid(proposal, |contained_candidate| {
// check that the candidate is valid (has enough votes)
let digest = C::candidate_digest(contained_candidate);
self.table.candidate_includable(&digest, &self.context)
})
}
}
/// A shared table object.
@@ -240,12 +240,13 @@ impl<C: Context> SharedTable<C> {
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
context: TableContext { context, groups },
awaiting_proposal: Vec::new(),
}))
}
}
/// Import a single statement.
fn import_statement(
pub fn import_statement(
&self,
statement: <C as TypeResolve>::SignedTableStatement,
received_from: Option<C::ValidatorId>,
@@ -256,33 +257,74 @@ impl<C: Context> SharedTable<C> {
/// Import many statements at once.
///
/// Provide an iterator yielding pairs of (statement, received_from).
fn import_statements<'a, I: 'a>(&'a self, iterable: I)
-> Box<Iterator<Item=table::Summary<C::Digest, C::GroupId>> + 'a>
where I: IntoIterator<Item=(<C as TypeResolve>::SignedTableStatement, Option<C::ValidatorId>)>
pub fn import_statements<I, U>(&self, iterable: I) -> U
where
I: IntoIterator<Item=(<C as TypeResolve>::SignedTableStatement, Option<C::ValidatorId>)>,
U: ::std::iter::FromIterator<table::Summary<C::Digest, C::GroupId>>,
{
let mut inner = self.inner.lock();
let iter = iterable.into_iter().filter_map(move |(statement, received_from)| {
inner.import_statement(statement, received_from)
});
Box::new(iter)
iterable.into_iter().filter_map(move |(statement, received_from)| {
inner.import_statement(statement, received_from)
}).collect()
}
/// Update the proposal sealing.
pub fn update_proposal(&self) {
self.inner.lock().update_proposal()
}
/// Register interest in receiving a proposal when ready.
/// If one is ready immediately, it will be provided.
pub fn get_proposal(&self) -> oneshot::Receiver<C::Proposal> {
self.inner.lock().get_proposal()
}
/// Check if a proposal is valid.
pub fn proposal_valid(&self, proposal: &C::Proposal) -> bool {
self.inner.lock().proposal_valid(proposal)
}
}
impl<C: Context> bft::Context for BftContext<C> {
/// Errors that can occur during agreement.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Error {
IoTerminated,
FaultyTimer,
CannotPropose,
}
impl From<bft::InputStreamConcluded> for Error {
fn from(_: bft::InputStreamConcluded) -> Error {
Error::IoTerminated
}
}
/// Context owned by the BFT future necessary to execute the logic.
pub struct BftContext<C: Context> {
context: C,
table: SharedTable<C>,
timer: Timer,
round_timeout_multiplier: u64,
}
impl<C: Context> bft::Context for BftContext<C>
where
C::Proposal: 'static,
{
type ValidatorId = C::ValidatorId;
type Digest = C::Digest;
type Signature = C::Signature;
type Candidate = C::Proposal;
type RoundTimeout = tokio_timer::Sleep;
type CreateProposal = futures::future::Empty<Self::Candidate, ()>;
type RoundTimeout = Box<Future<Item=(),Error=Error>>;
type CreateProposal = Box<Future<Item=Self::Candidate,Error=Error>>;
fn local_id(&self) -> Self::ValidatorId {
self.context.local_id()
}
fn proposal(&self) -> Self::CreateProposal {
futures::future::empty()
Box::new(self.table.get_proposal().map_err(|_| Error::CannotPropose))
}
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest {
@@ -306,17 +348,7 @@ impl<C: Context> bft::Context for BftContext<C> {
}
fn candidate_valid(&self, proposal: &Self::Candidate) -> bool {
let mut table = self.table.lock();
self.context.proposal_valid(proposal, |contained_candidate| {
// check that the candidate is valid (has enough votes)
let digest = C::candidate_digest(contained_candidate);
table.candidate_includable(&digest, &self.table_context)
});
// also check that _enough_ candidates were included (perhaps according
// to a curve over time).
true
self.table.proposal_valid(proposal)
}
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
@@ -325,11 +357,64 @@ impl<C: Context> bft::Context for BftContext<C> {
.unwrap_or_else(u64::max_value)
.saturating_mul(self.round_timeout_multiplier);
self.timer.sleep(::std::time::Duration::from_secs(timeout))
Box::new(self.timer.sleep(::std::time::Duration::from_secs(timeout))
.map_err(|_| Error::FaultyTimer))
}
}
/// Reach agreement with other validators on a new block.
pub fn agree<C: Context, I, O>(_params: AgreementParams<C>) {
unimplemented!()
/// Parameters necessary for agreement.
pub struct AgreementParams<C: Context> {
/// The context itself.
pub context: C,
/// For scheduling timeouts.
pub timer: Timer,
/// The statement table.
pub table: SharedTable<C>,
/// The number of nodes.
pub nodes: usize,
/// The maximum number of faulty nodes.
pub max_faulty: usize,
/// The round timeout multiplier: 2^round_number is multiplied by this.
pub round_timeout_multiplier: u64,
}
/// Future and I/O to reach agreement.
pub struct Agreement<C: bft::Context + 'static> {
/// The future holding the actual BFT logic.
pub bft: Box<Future<
Item=bft::Committed<C::Candidate, C::Digest, C::Signature>,
Error=Error,
>>,
/// The input sink.
pub input: mpsc::UnboundedSender<bft::ContextCommunication<C>>,
/// The output stream.
pub output: mpsc::UnboundedReceiver<bft::ContextCommunication<C>>,
}
/// Create an agreement future, and I/O streams.
pub fn agree<C: Context + 'static>(params: AgreementParams<C>)
-> Agreement<BftContext<C>>
{
let (in_in, in_out) = mpsc::unbounded();
let (out_in, out_out) = mpsc::unbounded();
let bft_context = BftContext {
context: params.context,
table: params.table,
timer: params.timer,
round_timeout_multiplier: params.round_timeout_multiplier,
};
let agreement = bft::agree(
bft_context,
params.nodes,
params.max_faulty,
in_out.map_err(|_| Error::IoTerminated),
out_in.sink_map_err(|_| Error::IoTerminated),
);
Agreement {
bft: Box::new(agreement),
input: in_in,
output: out_out,
}
}
+5 -3
View File
@@ -280,7 +280,9 @@ impl<C: Context> Table<C> {
///
/// This will be at most one per group, consisting of the
/// best candidate for each group with requisite votes for inclusion.
pub fn proposed_candidates(&self, context: &C) -> Vec<C::Candidate> {
///
/// The vector is sorted in ascending order by group id.
pub fn proposed_candidates<'a>(&'a self, context: &C) -> Vec<&'a C::Candidate> {
use std::collections::BTreeMap;
use std::collections::btree_map::Entry as BTreeEntry;
@@ -294,7 +296,7 @@ impl<C: Context> Table<C> {
match best_candidates.entry(group_id.clone()) {
BTreeEntry::Occupied(mut occ) => {
let candidate_ref = occ.get_mut();
if *candidate_ref < candidate {
if *candidate_ref > candidate {
*candidate_ref = candidate;
}
}
@@ -302,7 +304,7 @@ impl<C: Context> Table<C> {
}
}
best_candidates.values().map(|v| C::Candidate::clone(v)).collect::<Vec<_>>()
best_candidates.values().cloned().collect::<Vec<_>>()
}
/// Whether a candidate can be included.