create the overarching agreement and IO futures

This commit is contained in:
Robert Habermeier
2018-01-10 21:58:56 +01:00
parent 3d9cc2697c
commit 00c4ee2d81
2 changed files with 88 additions and 52 deletions
+67 -29
View File
@@ -128,11 +128,13 @@ pub trait Context: Send + Clone {
pub trait TypeResolve {
type SignedTableStatement;
type BftCommunication;
type Misbehavior;
}
impl<C: Context> TypeResolve for C {
type SignedTableStatement = table::SignedStatement<C::ParachainCandidate, C::Digest, C::ValidatorId, C::Signature>;
type BftCommunication = bft::Communication<C::Proposal, C::Digest, C::ValidatorId, C::Signature>;
type Misbehavior = table::Misbehavior<C::ParachainCandidate, C::Digest, C::ValidatorId, C::Signature>;
}
/// Information about a specific group.
@@ -311,6 +313,11 @@ impl<C: Context> SharedTable<C> {
f(inner.table.get_candidate(digest))
}
/// Get all witnessed misbehavior.
pub fn get_misbehavior(&self) -> HashMap<C::ValidatorId, <C as TypeResolve>::Misbehavior> {
self.inner.lock().table.get_misbehavior().clone()
}
// Get a handle to the table context.
fn context(&self) -> &TableContext<C> {
&*self.context
@@ -418,6 +425,8 @@ pub struct AgreementParams<C: Context> {
pub message_buffer_size: usize,
/// Interval to attempt forming proposals over.
pub form_proposal_interval: Duration,
/// Interval to create table statement packets over.
pub table_broadcast_interval: Duration,
}
/// Recovery for messages
@@ -436,47 +445,76 @@ pub enum CheckedMessage<C: Context> {
/// Create an agreement future, and I/O streams.
pub fn agree<C, I, O, R, E>(params: AgreementParams<C>, net_in: I, net_out: O, recovery: R)
-> Box<Future<Item=(),Error=()>>
-> Box<Future<Item=bft::Committed<C::Proposal,C::Digest,C::Signature>,Error=Error>>
where
C: Context + 'static,
C::CheckCandidate: IntoFuture<Error=E>,
C::CheckAvailability: IntoFuture<Error=E>,
I: Stream<Item=(C::ValidatorId, Vec<UncheckedMessage>),Error=E>,
O: Sink<SinkItem=CheckedMessage<C>>,
R: MessageRecovery<C>,
I: Stream<Item=(C::ValidatorId, Vec<UncheckedMessage>),Error=E> + 'static,
O: Sink<SinkItem=CheckedMessage<C>> + 'static,
R: MessageRecovery<C> + 'static,
{
let (bft_in_in, bft_in_out) = mpsc::unbounded();
let (bft_out_in, bft_out_out) = mpsc::unbounded::<bft::ContextCommunication<BftContext<C>>>();
let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size);
let agreement = {
let bft_context = BftContext {
context: params.context,
table: params.table.clone(),
timer: params.timer.clone(),
round_timeout_multiplier: params.round_timeout_multiplier,
};
let round_robin_recovered = round_robin
.filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x)));
let route_messages_in = handle_incoming::HandleIncoming::new(
params.table.clone(),
round_robin_recovered,
bft_in_in,
).map_err(|_| Error::IoTerminated);
let bft_context = BftContext {
context: params.context,
table: params.table.clone(),
timer: params.timer,
round_timeout_multiplier: params.round_timeout_multiplier,
bft::agree(
bft_context,
params.nodes,
params.max_faulty,
bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated),
bft_out_in.sink_map_err(|_| Error::IoTerminated),
)
};
let agreement = bft::agree(
bft_context,
params.nodes,
params.max_faulty,
bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated),
bft_out_in.sink_map_err(|_| Error::IoTerminated),
);
let route_messages_in = {
let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size);
let route_messages_out = futures::future::empty::<(), _>();
let round_robin_recovered = round_robin
.filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x)));
agreement.join(route_messages_in).join(route_messages_out);
handle_incoming::HandleIncoming::new(
params.table.clone(),
round_robin_recovered,
bft_in_in,
).map_err(|_| Error::IoTerminated)
};
unimplemented!()
let route_messages_out = {
let periodic_table_statements = params.timer.interval(params.table_broadcast_interval)
.map_err(|_| Error::FaultyTimer)
.map(|()| unimplemented!()); // create table statements to send. but to _who_ and how many?
let complete_out_stream = bft_out_out
.map_err(|_| Error::IoTerminated)
.map(|bft::ContextCommunication(x)| x)
.map(CheckedMessage::Bft)
.select(periodic_table_statements);
net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream)
};
let create_proposal_on_interval = {
let table = params.table;
params.timer.interval(params.form_proposal_interval)
.map_err(|_| Error::FaultyTimer)
.for_each(move |_| { table.update_proposal(); Ok(()) })
};
// TODO: avoid having errors take down everything.
let future = agreement.join4(
route_messages_in,
route_messages_out,
create_proposal_on_interval
).map(|(agreed, _, _, _)| agreed);
Box::new(future)
}
+21 -23
View File
@@ -69,7 +69,7 @@ pub trait Context {
}
/// Statements circulated among peers.
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum Statement<C, D> {
/// Broadcast by a validator to indicate that this is his candidate for
/// inclusion.
@@ -88,7 +88,7 @@ pub enum Statement<C, D> {
}
/// A signed statement.
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct SignedStatement<C, D, V, S> {
/// The statement.
pub statement: Statement<C, D>,
@@ -122,7 +122,7 @@ enum StatementTrace<V, D> {
///
/// Since there are three possible ways to vote, a double vote is possible in
/// three possible combinations (unordered)
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum ValidityDoubleVote<C, D, S> {
/// Implicit vote by issuing and explicity voting validity.
IssuedAndValidity((C, S), (D, S)),
@@ -133,7 +133,7 @@ pub enum ValidityDoubleVote<C, D, S> {
}
/// Misbehavior: declaring multiple candidates.
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct MultipleCandidates<C, S> {
/// The first candidate seen.
pub first: (C, S),
@@ -142,7 +142,7 @@ pub struct MultipleCandidates<C, S> {
}
/// Misbehavior: submitted statement for wrong group.
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct UnauthorizedStatement<C, D, V, S> {
/// A signed statement which was submitted without proper authority.
pub statement: SignedStatement<C, D, V, S>,
@@ -150,7 +150,7 @@ pub struct UnauthorizedStatement<C, D, V, S> {
/// Different kinds of misbehavior. All of these kinds of malicious misbehavior
/// are easily provable and extremely disincentivized.
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum Misbehavior<C, D, V, S> {
/// Voted invalid and valid on validity.
ValidityDoubleVote(ValidityDoubleVote<C, D, S>),
@@ -238,15 +238,6 @@ struct ValidatorData<C: Context> {
known_statements: HashSet<StatementTrace<C::ValidatorId, C::Digest>>,
}
/// Create a new, empty statement table.
pub fn create<C: Context>() -> Table<C> {
Table {
validator_data: HashMap::default(),
detected_misbehavior: HashMap::default(),
candidate_votes: HashMap::default(),
}
}
/// Stores votes
pub struct Table<C: Context> {
validator_data: HashMap<C::ValidatorId, ValidatorData<C>>,
@@ -304,14 +295,6 @@ impl<C: Context> Table<C> {
})
}
/// Get an iterator of all candidates with a given group.
// TODO: impl iterator
pub fn candidates_in_group<'a>(&'a self, group_id: C::GroupId)
-> Box<Iterator<Item=&'a CandidateData<C>> + 'a>
{
Box::new(self.candidate_votes.values().filter(move |c| c.group_id == group_id))
}
/// Import a signed statement. Signatures should be checked for validity, and the
/// sender should be checked to actually be a validator.
///
@@ -379,6 +362,13 @@ impl<C: Context> Table<C> {
self.candidate_votes.get(digest).map(|d| &d.candidate)
}
/// Access all witnessed misbehavior.
pub fn get_misbehavior(&self)
-> &HashMap<C::ValidatorId, <C as ResolveMisbehavior>::Misbehavior>
{
&self.detected_misbehavior
}
fn note_trace_seen(&mut self, trace: StatementTrace<C::ValidatorId, C::Digest>, known_by: C::ValidatorId) {
self.validator_data.entry(known_by).or_insert_with(|| ValidatorData {
proposal: None,
@@ -587,6 +577,14 @@ mod tests {
use super::*;
use std::collections::HashMap;
fn create<C: Context>() -> Table<C> {
Table {
validator_data: HashMap::default(),
detected_misbehavior: HashMap::default(),
candidate_votes: HashMap::default(),
}
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
struct ValidatorId(usize);