From 9ff2fa550f69d980e1d42e03f22f556dba484942 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 8 Jan 2018 15:34:21 +0100 Subject: [PATCH] beginnings of shared table --- substrate/candidate-agreement/src/lib.rs | 186 ++++++++++++++++++--- substrate/candidate-agreement/src/table.rs | 8 + 2 files changed, 170 insertions(+), 24 deletions(-) diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index 6d106ab781..2553fd080c 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -33,18 +33,20 @@ extern crate futures; extern crate parking_lot; extern crate tokio_timer; -pub mod bft; -pub mod table; - use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::hash::Hash; +use std::sync::Arc; use futures::prelude::*; +use parking_lot::Mutex; use tokio_timer::Timer; use table::Table; +pub mod bft; +pub mod table; + /// Context necessary for agreement. pub trait Context: Send + Clone { /// A validator ID @@ -74,6 +76,9 @@ pub trait Context: Send + Clone { /// Get the digest of a candidate. fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest; + /// Get the digest of a proposal. + fn proposal_digest(proposal: &Self::Proposal) -> Self::Digest; + /// Get the group of a candidate. fn candidate_group(candidate: &Self::ParachainCandidate) -> Self::GroupId; @@ -95,9 +100,11 @@ pub trait Context: Send + Clone { fn create_proposal(&self, candidates: Vec<&Self::ParachainCandidate>) -> Option; - /// Check validity of a proposal. This may also be somewhat subjective - /// based on a monotonic-decreasing curve. - fn proposal_valid(&self, proposal: &Self::Proposal) -> bool; + /// Check validity of a proposal. This should call out to the `check_candidate` + /// function for all parachain candidates contained within it, as well as + /// checking other validity constraints of the proposal. + fn proposal_valid(&self, proposal: &Self::Proposal, check_candidate: F) -> bool + where F: FnMut(&Self::ParachainCandidate) -> bool; /// Get the local validator ID. fn local_id(&self) -> Self::ValidatorId; @@ -112,6 +119,17 @@ pub trait Context: Send + Clone { fn sign_bft_message(&self, &bft::Message) -> Self::Signature; } +/// Helper for type resolution for contexts until type aliases apply bounds. +pub trait TypeResolve { + type SignedTableStatement; + type BftCommunication; +} + +impl TypeResolve for C { + type SignedTableStatement = table::SignedStatement; + type BftCommunication = bft::Communication; +} + /// Information about a specific group. #[derive(Debug, Clone)] pub struct GroupInfo { @@ -161,6 +179,14 @@ impl table::Context for TableContext { } } +struct BftContext { + context: C, + table_context: TableContext, + table: Arc>>>, + timer: Timer, + round_timeout_multiplier: u64, +} + /// Parameters necessary for agreement. pub struct AgreementParams { /// The context itself. @@ -172,26 +198,138 @@ pub struct AgreementParams { /// The local candidate proposal. // TODO: replace with future. pub local_proposal: Option, + /// The number of nodes. + pub nodes: usize, + /// The maximum number of faulty nodes. + pub max_faulty: usize, + /// The round timeout multiplier: 2^round_number is multiplied by this. + pub round_timeout_multiplier: u64, } -pub fn agree(params: AgreementParams) { - let context = params.context; - let local_id = context.local_id(); - let mut table = Table::>::default(); +// A shared table object. +struct SharedTableInner { + context: TableContext, + table: Table>, +} - let table_context = TableContext { - context: context.clone(), - groups: params.groups, - }; - - if let Some(candidate) = params.local_proposal { - let statement = table::Statement::Candidate(candidate); - let signed_statement = table::SignedStatement { - signature: context.sign_table_statement(&statement), - sender: local_id.clone(), - statement: statement, - }; - - table.import_statement(&table_context, signed_statement, None); +impl SharedTableInner { + fn import_statement( + &mut self, + statement: ::SignedTableStatement, + received_from: Option + ) -> Option> { + self.table.import_statement(&self.context, statement, received_from) } } + +/// A shared table object. +pub struct SharedTable { + inner: Arc>>, +} + +impl Clone for SharedTable { + fn clone(&self) -> Self { + SharedTable { inner: self.inner.clone() } + } +} + +impl SharedTable { + /// Create a new shared table. + pub fn new(context: C, groups: HashMap>) -> Self { + SharedTable { + inner: Arc::new(Mutex::new(SharedTableInner { + table: Table::default(), + context: TableContext { context, groups }, + })) + } + } + + /// Import a single statement. + fn import_statement( + &self, + statement: ::SignedTableStatement, + received_from: Option, + ) -> Option> { + self.inner.lock().import_statement(statement, received_from) + } + + /// Import many statements at once. + /// + /// Provide an iterator yielding pairs of (statement, received_from). + fn import_statements<'a, I: 'a>(&'a self, iterable: I) + -> Box> + 'a> + where I: IntoIterator::SignedTableStatement, Option)> + { + let mut inner = self.inner.lock(); + let iter = iterable.into_iter().filter_map(move |(statement, received_from)| { + inner.import_statement(statement, received_from) + }); + + Box::new(iter) + } +} + +impl bft::Context for BftContext { + type ValidatorId = C::ValidatorId; + type Digest = C::Digest; + type Signature = C::Signature; + type Candidate = C::Proposal; + type RoundTimeout = tokio_timer::Sleep; + type CreateProposal = futures::future::Empty; + + fn local_id(&self) -> Self::ValidatorId { + self.context.local_id() + } + + fn proposal(&self) -> Self::CreateProposal { + futures::future::empty() + } + + fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest { + C::proposal_digest(candidate) + } + + fn sign_local(&self, message: bft::Message) + -> bft::LocalizedMessage + { + let sender = self.local_id(); + let signature = self.context.sign_bft_message(&message); + bft::LocalizedMessage { + message, + sender, + signature, + } + } + + fn round_proposer(&self, round: usize) -> Self::ValidatorId { + self.context.round_proposer(round) + } + + fn candidate_valid(&self, proposal: &Self::Candidate) -> bool { + let mut table = self.table.lock(); + + self.context.proposal_valid(proposal, |contained_candidate| { + // check that the candidate is valid (has enough votes) + let digest = C::candidate_digest(contained_candidate); + table.candidate_includable(&digest, &self.table_context) + }); + + // also check that _enough_ candidates were included (perhaps according + // to a curve over time). + true + } + + fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { + let round = ::std::cmp::max(63, round) as u32; + let timeout = 1u64.checked_shl(round) + .unwrap_or_else(u64::max_value) + .saturating_mul(self.round_timeout_multiplier); + + self.timer.sleep(::std::time::Duration::from_secs(timeout)) + } +} + +/// Reach agreement with other validators on a new block. +pub fn agree(_params: AgreementParams) { + unimplemented!() +} diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs index 864d189d6d..456be773c6 100644 --- a/substrate/candidate-agreement/src/table.rs +++ b/substrate/candidate-agreement/src/table.rs @@ -305,6 +305,14 @@ impl Table { best_candidates.values().map(|v| C::Candidate::clone(v)).collect::>() } + /// Whether a candidate can be included. + pub fn candidate_includable(&self, digest: &C::Digest, context: &C) -> bool { + self.candidate_votes.get(digest).map_or(false, |data| { + let (v_threshold, a_threshold) = context.requisite_votes(&data.group_id); + data.can_be_included(v_threshold, a_threshold) + }) + } + /// Get an iterator of all candidates with a given group. // TODO: impl iterator pub fn candidates_in_group<'a>(&'a self, group_id: C::GroupId)