diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index 16985ce8d7..582feeb132 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -128,11 +128,13 @@ pub trait Context: Send + Clone { pub trait TypeResolve { type SignedTableStatement; type BftCommunication; + type Misbehavior; } impl TypeResolve for C { type SignedTableStatement = table::SignedStatement; type BftCommunication = bft::Communication; + type Misbehavior = table::Misbehavior; } /// Information about a specific group. @@ -311,6 +313,11 @@ impl SharedTable { f(inner.table.get_candidate(digest)) } + /// Get all witnessed misbehavior. + pub fn get_misbehavior(&self) -> HashMap::Misbehavior> { + self.inner.lock().table.get_misbehavior().clone() + } + // Get a handle to the table context. fn context(&self) -> &TableContext { &*self.context @@ -418,6 +425,8 @@ pub struct AgreementParams { pub message_buffer_size: usize, /// Interval to attempt forming proposals over. pub form_proposal_interval: Duration, + /// Interval to create table statement packets over. + pub table_broadcast_interval: Duration, } /// Recovery for messages @@ -436,47 +445,76 @@ pub enum CheckedMessage { /// Create an agreement future, and I/O streams. pub fn agree(params: AgreementParams, net_in: I, net_out: O, recovery: R) - -> Box> + -> Box,Error=Error>> where C: Context + 'static, C::CheckCandidate: IntoFuture, C::CheckAvailability: IntoFuture, - I: Stream),Error=E>, - O: Sink>, - R: MessageRecovery, + I: Stream),Error=E> + 'static, + O: Sink> + 'static, + R: MessageRecovery + 'static, { let (bft_in_in, bft_in_out) = mpsc::unbounded(); let (bft_out_in, bft_out_out) = mpsc::unbounded::>>(); - let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size); + let agreement = { + let bft_context = BftContext { + context: params.context, + table: params.table.clone(), + timer: params.timer.clone(), + round_timeout_multiplier: params.round_timeout_multiplier, + }; - let round_robin_recovered = round_robin - .filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x))); - - let route_messages_in = handle_incoming::HandleIncoming::new( - params.table.clone(), - round_robin_recovered, - bft_in_in, - ).map_err(|_| Error::IoTerminated); - - let bft_context = BftContext { - context: params.context, - table: params.table.clone(), - timer: params.timer, - round_timeout_multiplier: params.round_timeout_multiplier, + bft::agree( + bft_context, + params.nodes, + params.max_faulty, + bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated), + bft_out_in.sink_map_err(|_| Error::IoTerminated), + ) }; - let agreement = bft::agree( - bft_context, - params.nodes, - params.max_faulty, - bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated), - bft_out_in.sink_map_err(|_| Error::IoTerminated), - ); + let route_messages_in = { + let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size); - let route_messages_out = futures::future::empty::<(), _>(); + let round_robin_recovered = round_robin + .filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x))); - agreement.join(route_messages_in).join(route_messages_out); + handle_incoming::HandleIncoming::new( + params.table.clone(), + round_robin_recovered, + bft_in_in, + ).map_err(|_| Error::IoTerminated) + }; - unimplemented!() + + let route_messages_out = { + let periodic_table_statements = params.timer.interval(params.table_broadcast_interval) + .map_err(|_| Error::FaultyTimer) + .map(|()| unimplemented!()); // create table statements to send. but to _who_ and how many? + + let complete_out_stream = bft_out_out + .map_err(|_| Error::IoTerminated) + .map(|bft::ContextCommunication(x)| x) + .map(CheckedMessage::Bft) + .select(periodic_table_statements); + + net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream) + }; + + let create_proposal_on_interval = { + let table = params.table; + params.timer.interval(params.form_proposal_interval) + .map_err(|_| Error::FaultyTimer) + .for_each(move |_| { table.update_proposal(); Ok(()) }) + }; + + // TODO: avoid having errors take down everything. + let future = agreement.join4( + route_messages_in, + route_messages_out, + create_proposal_on_interval + ).map(|(agreed, _, _, _)| agreed); + + Box::new(future) } diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs index 53e492c765..fe48c3ca37 100644 --- a/substrate/candidate-agreement/src/table.rs +++ b/substrate/candidate-agreement/src/table.rs @@ -69,7 +69,7 @@ pub trait Context { } /// Statements circulated among peers. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum Statement { /// Broadcast by a validator to indicate that this is his candidate for /// inclusion. @@ -88,7 +88,7 @@ pub enum Statement { } /// A signed statement. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub struct SignedStatement { /// The statement. pub statement: Statement, @@ -122,7 +122,7 @@ enum StatementTrace { /// /// Since there are three possible ways to vote, a double vote is possible in /// three possible combinations (unordered) -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum ValidityDoubleVote { /// Implicit vote by issuing and explicity voting validity. IssuedAndValidity((C, S), (D, S)), @@ -133,7 +133,7 @@ pub enum ValidityDoubleVote { } /// Misbehavior: declaring multiple candidates. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub struct MultipleCandidates { /// The first candidate seen. pub first: (C, S), @@ -142,7 +142,7 @@ pub struct MultipleCandidates { } /// Misbehavior: submitted statement for wrong group. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub struct UnauthorizedStatement { /// A signed statement which was submitted without proper authority. pub statement: SignedStatement, @@ -150,7 +150,7 @@ pub struct UnauthorizedStatement { /// Different kinds of misbehavior. All of these kinds of malicious misbehavior /// are easily provable and extremely disincentivized. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum Misbehavior { /// Voted invalid and valid on validity. ValidityDoubleVote(ValidityDoubleVote), @@ -238,15 +238,6 @@ struct ValidatorData { known_statements: HashSet>, } -/// Create a new, empty statement table. -pub fn create() -> Table { - Table { - validator_data: HashMap::default(), - detected_misbehavior: HashMap::default(), - candidate_votes: HashMap::default(), - } -} - /// Stores votes pub struct Table { validator_data: HashMap>, @@ -304,14 +295,6 @@ impl Table { }) } - /// Get an iterator of all candidates with a given group. - // TODO: impl iterator - pub fn candidates_in_group<'a>(&'a self, group_id: C::GroupId) - -> Box> + 'a> - { - Box::new(self.candidate_votes.values().filter(move |c| c.group_id == group_id)) - } - /// Import a signed statement. Signatures should be checked for validity, and the /// sender should be checked to actually be a validator. /// @@ -379,6 +362,13 @@ impl Table { self.candidate_votes.get(digest).map(|d| &d.candidate) } + /// Access all witnessed misbehavior. + pub fn get_misbehavior(&self) + -> &HashMap::Misbehavior> + { + &self.detected_misbehavior + } + fn note_trace_seen(&mut self, trace: StatementTrace, known_by: C::ValidatorId) { self.validator_data.entry(known_by).or_insert_with(|| ValidatorData { proposal: None, @@ -587,6 +577,14 @@ mod tests { use super::*; use std::collections::HashMap; + fn create() -> Table { + Table { + validator_data: HashMap::default(), + detected_misbehavior: HashMap::default(), + candidate_votes: HashMap::default(), + } + } + #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] struct ValidatorId(usize);