beginnings of shared table

This commit is contained in:
Robert Habermeier
2018-01-08 15:34:21 +01:00
parent acca871d20
commit 9ff2fa550f
2 changed files with 170 additions and 24 deletions
+162 -24
View File
@@ -33,18 +33,20 @@ extern crate futures;
extern crate parking_lot;
extern crate tokio_timer;
pub mod bft;
pub mod table;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
use futures::prelude::*;
use parking_lot::Mutex;
use tokio_timer::Timer;
use table::Table;
pub mod bft;
pub mod table;
/// Context necessary for agreement.
pub trait Context: Send + Clone {
/// A validator ID
@@ -74,6 +76,9 @@ pub trait Context: Send + Clone {
/// Get the digest of a candidate.
fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest;
/// Get the digest of a proposal.
fn proposal_digest(proposal: &Self::Proposal) -> Self::Digest;
/// Get the group of a candidate.
fn candidate_group(candidate: &Self::ParachainCandidate) -> Self::GroupId;
@@ -95,9 +100,11 @@ pub trait Context: Send + Clone {
fn create_proposal(&self, candidates: Vec<&Self::ParachainCandidate>)
-> Option<Self::Proposal>;
/// Check validity of a proposal. This may also be somewhat subjective
/// based on a monotonic-decreasing curve.
fn proposal_valid(&self, proposal: &Self::Proposal) -> bool;
/// Check validity of a proposal. This should call out to the `check_candidate`
/// function for all parachain candidates contained within it, as well as
/// checking other validity constraints of the proposal.
fn proposal_valid<F>(&self, proposal: &Self::Proposal, check_candidate: F) -> bool
where F: FnMut(&Self::ParachainCandidate) -> bool;
/// Get the local validator ID.
fn local_id(&self) -> Self::ValidatorId;
@@ -112,6 +119,17 @@ pub trait Context: Send + Clone {
fn sign_bft_message(&self, &bft::Message<Self::Proposal, Self::Digest>) -> Self::Signature;
}
/// Helper for type resolution for contexts until type aliases apply bounds.
pub trait TypeResolve {
type SignedTableStatement;
type BftCommunication;
}
impl<C: Context> TypeResolve for C {
type SignedTableStatement = table::SignedStatement<C::ParachainCandidate, C::Digest, C::ValidatorId, C::Signature>;
type BftCommunication = bft::Communication<C::Proposal, C::Digest, C::ValidatorId, C::Signature>;
}
/// Information about a specific group.
#[derive(Debug, Clone)]
pub struct GroupInfo<V: Hash + Eq> {
@@ -161,6 +179,14 @@ impl<C: Context> table::Context for TableContext<C> {
}
}
struct BftContext<C: Context> {
context: C,
table_context: TableContext<C>,
table: Arc<Mutex<Table<TableContext<C>>>>,
timer: Timer,
round_timeout_multiplier: u64,
}
/// Parameters necessary for agreement.
pub struct AgreementParams<C: Context> {
/// The context itself.
@@ -172,26 +198,138 @@ pub struct AgreementParams<C: Context> {
/// The local candidate proposal.
// TODO: replace with future.
pub local_proposal: Option<C::ParachainCandidate>,
/// The number of nodes.
pub nodes: usize,
/// The maximum number of faulty nodes.
pub max_faulty: usize,
/// The round timeout multiplier: 2^round_number is multiplied by this.
pub round_timeout_multiplier: u64,
}
pub fn agree<C: Context + Clone>(params: AgreementParams<C>) {
let context = params.context;
let local_id = context.local_id();
let mut table = Table::<TableContext<C>>::default();
// A shared table object.
struct SharedTableInner<C: Context + Clone> {
context: TableContext<C>,
table: Table<TableContext<C>>,
}
let table_context = TableContext {
context: context.clone(),
groups: params.groups,
};
if let Some(candidate) = params.local_proposal {
let statement = table::Statement::Candidate(candidate);
let signed_statement = table::SignedStatement {
signature: context.sign_table_statement(&statement),
sender: local_id.clone(),
statement: statement,
};
table.import_statement(&table_context, signed_statement, None);
impl<C: Context + Clone> SharedTableInner<C> {
fn import_statement(
&mut self,
statement: <C as TypeResolve>::SignedTableStatement,
received_from: Option<C::ValidatorId>
) -> Option<table::Summary<C::Digest, C::GroupId>> {
self.table.import_statement(&self.context, statement, received_from)
}
}
/// A shared table object.
pub struct SharedTable<C: Context> {
inner: Arc<Mutex<SharedTableInner<C>>>,
}
impl<C: Context> Clone for SharedTable<C> {
fn clone(&self) -> Self {
SharedTable { inner: self.inner.clone() }
}
}
impl<C: Context> SharedTable<C> {
/// Create a new shared table.
pub fn new(context: C, groups: HashMap<C::GroupId, GroupInfo<C::ValidatorId>>) -> Self {
SharedTable {
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
context: TableContext { context, groups },
}))
}
}
/// Import a single statement.
fn import_statement(
&self,
statement: <C as TypeResolve>::SignedTableStatement,
received_from: Option<C::ValidatorId>,
) -> Option<table::Summary<C::Digest, C::GroupId>> {
self.inner.lock().import_statement(statement, received_from)
}
/// Import many statements at once.
///
/// Provide an iterator yielding pairs of (statement, received_from).
fn import_statements<'a, I: 'a>(&'a self, iterable: I)
-> Box<Iterator<Item=table::Summary<C::Digest, C::GroupId>> + 'a>
where I: IntoIterator<Item=(<C as TypeResolve>::SignedTableStatement, Option<C::ValidatorId>)>
{
let mut inner = self.inner.lock();
let iter = iterable.into_iter().filter_map(move |(statement, received_from)| {
inner.import_statement(statement, received_from)
});
Box::new(iter)
}
}
impl<C: Context> bft::Context for BftContext<C> {
type ValidatorId = C::ValidatorId;
type Digest = C::Digest;
type Signature = C::Signature;
type Candidate = C::Proposal;
type RoundTimeout = tokio_timer::Sleep;
type CreateProposal = futures::future::Empty<Self::Candidate, ()>;
fn local_id(&self) -> Self::ValidatorId {
self.context.local_id()
}
fn proposal(&self) -> Self::CreateProposal {
futures::future::empty()
}
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest {
C::proposal_digest(candidate)
}
fn sign_local(&self, message: bft::Message<Self::Candidate, Self::Digest>)
-> bft::LocalizedMessage<Self::Candidate, Self::Digest, Self::ValidatorId, Self::Signature>
{
let sender = self.local_id();
let signature = self.context.sign_bft_message(&message);
bft::LocalizedMessage {
message,
sender,
signature,
}
}
fn round_proposer(&self, round: usize) -> Self::ValidatorId {
self.context.round_proposer(round)
}
fn candidate_valid(&self, proposal: &Self::Candidate) -> bool {
let mut table = self.table.lock();
self.context.proposal_valid(proposal, |contained_candidate| {
// check that the candidate is valid (has enough votes)
let digest = C::candidate_digest(contained_candidate);
table.candidate_includable(&digest, &self.table_context)
});
// also check that _enough_ candidates were included (perhaps according
// to a curve over time).
true
}
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
let round = ::std::cmp::max(63, round) as u32;
let timeout = 1u64.checked_shl(round)
.unwrap_or_else(u64::max_value)
.saturating_mul(self.round_timeout_multiplier);
self.timer.sleep(::std::time::Duration::from_secs(timeout))
}
}
/// Reach agreement with other validators on a new block.
pub fn agree<C: Context, I, O>(_params: AgreementParams<C>) {
unimplemented!()
}
@@ -305,6 +305,14 @@ impl<C: Context> Table<C> {
best_candidates.values().map(|v| C::Candidate::clone(v)).collect::<Vec<_>>()
}
/// Whether a candidate can be included.
pub fn candidate_includable(&self, digest: &C::Digest, context: &C) -> bool {
self.candidate_votes.get(digest).map_or(false, |data| {
let (v_threshold, a_threshold) = context.requisite_votes(&data.group_id);
data.can_be_included(v_threshold, a_threshold)
})
}
/// Get an iterator of all candidates with a given group.
// TODO: impl iterator
pub fn candidates_in_group<'a>(&'a self, group_id: C::GroupId)