diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index 582feeb132..8aec09d076 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -52,6 +52,9 @@ mod handle_incoming; mod round_robin; mod table; +#[cfg(test)] +pub mod tests; + /// Context necessary for agreement. pub trait Context: Send + Clone { /// A validator ID @@ -78,6 +81,12 @@ pub trait Context: Send + Clone { /// data is checked. type CheckAvailability: IntoFuture; + /// The statement batch type. + type StatementBatch: StatementBatch< + Self::ValidatorId, + table::SignedStatement, + >; + /// Get the digest of a candidate. fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest; @@ -128,12 +137,14 @@ pub trait Context: Send + Clone { pub trait TypeResolve { type SignedTableStatement; type BftCommunication; + type BftCommitted; type Misbehavior; } impl TypeResolve for C { type SignedTableStatement = table::SignedStatement; type BftCommunication = bft::Communication; + type BftCommitted = bft::Committed; type Misbehavior = table::Misbehavior; } @@ -318,6 +329,11 @@ impl SharedTable { self.inner.lock().table.get_misbehavior().clone() } + /// Fill a statement batch. + pub fn fill_batch(&self, batch: &mut C::StatementBatch) { + self.inner.lock().table.fill_batch(batch); + } + // Get a handle to the table context. fn context(&self) -> &TableContext { &*self.context @@ -425,8 +441,6 @@ pub struct AgreementParams { pub message_buffer_size: usize, /// Interval to attempt forming proposals over. pub form_proposal_interval: Duration, - /// Interval to create table statement packets over. - pub table_broadcast_interval: Duration, } /// Recovery for messages @@ -435,6 +449,19 @@ pub trait MessageRecovery { fn check_message(&self, UncheckedMessage) -> Option>; } +/// A batch of statements to send out. +pub trait StatementBatch { + /// Get the target authorities of these statements. + fn targets(&self) -> &[V]; + + /// Push a statement onto the batch. Returns false when the batch is full. + /// + /// This is meant to do work like incrementally serializing the statements + /// into a vector of bytes while making sure the length is below a certain + /// amount. + fn push(&mut self, statement: T) -> bool; +} + /// Recovered and fully checked messages. pub enum CheckedMessage { /// Messages meant for the BFT agreement logic. @@ -443,19 +470,42 @@ pub enum CheckedMessage { Table(Vec<::SignedTableStatement>), } +/// Outgoing messages to the network. +pub enum OutgoingMessage { + /// Messages meant for BFT agreement peers. + Bft(::BftCommunication), + /// Batches of table statements. + Table(C::StatementBatch), +} + /// Create an agreement future, and I/O streams. -pub fn agree(params: AgreementParams, net_in: I, net_out: O, recovery: R) - -> Box,Error=Error>> +// TODO: kill 'static bounds and use impl Future. +pub fn agree< + Context, + NetIn, + NetOut, + Recovery, + PropagateStatements, + Err, +>( + params: AgreementParams, + net_in: NetIn, + net_out: NetOut, + recovery: Recovery, + propagate_statements: PropagateStatements, +) + -> Box::BftCommitted,Error=Error>> where - C: Context + 'static, - C::CheckCandidate: IntoFuture, - C::CheckAvailability: IntoFuture, - I: Stream),Error=E> + 'static, - O: Sink> + 'static, - R: MessageRecovery + 'static, + Context: ::Context + 'static, + Context::CheckCandidate: IntoFuture, + Context::CheckAvailability: IntoFuture, + NetIn: Stream),Error=Err> + 'static, + NetOut: Sink> + 'static, + Recovery: MessageRecovery + 'static, + PropagateStatements: Stream + 'static, { let (bft_in_in, bft_in_out) = mpsc::unbounded(); - let (bft_out_in, bft_out_out) = mpsc::unbounded::>>(); + let (bft_out_in, bft_out_out) = mpsc::unbounded(); let agreement = { let bft_context = BftContext { @@ -489,14 +539,16 @@ pub fn agree(params: AgreementParams, net_in: I, net_out: O, r let route_messages_out = { - let periodic_table_statements = params.timer.interval(params.table_broadcast_interval) - .map_err(|_| Error::FaultyTimer) - .map(|()| unimplemented!()); // create table statements to send. but to _who_ and how many? + let table = params.table.clone(); + let periodic_table_statements = propagate_statements + .or_else(|_| ::futures::future::empty()) // halt the stream instead of error. + .map(move |mut batch| { table.fill_batch(&mut batch); batch }) + .map(OutgoingMessage::Table); let complete_out_stream = bft_out_out .map_err(|_| Error::IoTerminated) .map(|bft::ContextCommunication(x)| x) - .map(CheckedMessage::Bft) + .map(OutgoingMessage::Bft) .select(periodic_table_statements); net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream) diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs index fe48c3ca37..2e322761b6 100644 --- a/substrate/candidate-agreement/src/table.rs +++ b/substrate/candidate-agreement/src/table.rs @@ -32,6 +32,8 @@ use std::collections::hash_map::{HashMap, Entry}; use std::hash::Hash; use std::fmt::Debug; +use super::StatementBatch; + /// Context for the statement table. pub trait Context { /// A validator ID @@ -238,6 +240,15 @@ struct ValidatorData { known_statements: HashSet>, } +impl Default for ValidatorData { + fn default() -> Self { + ValidatorData { + proposal: None, + known_statements: HashSet::default(), + } + } +} + /// Stores votes pub struct Table { validator_data: HashMap>, @@ -369,6 +380,120 @@ impl Table { &self.detected_misbehavior } + /// Fill a statement batch and note messages seen by the targets. + pub fn fill_batch(&mut self, batch: &mut B) + where B: StatementBatch< + C::ValidatorId, + SignedStatement, + > + { + // naively iterate all statements so far, taking any that + // at least one of the targets has not seen. + + // workaround for the fact that it's inconvenient to borrow multiple + // entries out of a hashmap mutably -- we just move them out and + // replace them when we're done. + struct SwappedTargetData<'a, C: 'a + Context> { + validator_data: &'a mut HashMap>, + target_data: Vec<(C::ValidatorId, ValidatorData)>, + } + + impl<'a, C: 'a + Context> Drop for SwappedTargetData<'a, C> { + fn drop(&mut self) { + for (id, data) in self.target_data.drain(..) { + self.validator_data.insert(id, data); + } + } + } + + // pre-fetch authority data for all the targets. + let mut target_data = { + let validator_data = &mut self.validator_data; + let mut target_data = Vec::with_capacity(batch.targets().len()); + for target in batch.targets() { + let active_data = match validator_data.get_mut(target) { + None => Default::default(), + Some(x) => ::std::mem::replace(x, Default::default()), + }; + + target_data.push((target.clone(), active_data)); + } + + SwappedTargetData { + validator_data, + target_data + } + }; + + let target_data = &mut target_data.target_data; + + macro_rules! attempt_send { + ($trace:expr, sender=$sender:expr, sig=$sig:expr, statement=$statement:expr) => {{ + let trace = $trace; + let can_send = target_data.iter() + .any(|t| t.1.known_statements.contains(&trace)); + + if can_send { + let statement = SignedStatement { + statement: $statement, + signature: $sig, + sender: $sender, + }; + + if batch.push(statement) { + for target in target_data.iter_mut() { + target.1.known_statements.insert(trace.clone()); + } + } else { + return; + } + } + }} + } + + // reconstruct statements for anything whose trace passes the filter. + for (digest, candidate) in self.candidate_votes.iter() { + for (sender, vote) in candidate.validity_votes.iter() { + match *vote { + ValidityVote::Issued(ref sig) => { + attempt_send!( + StatementTrace::Candidate(sender.clone()), + sender = sender.clone(), + sig = sig.clone(), + statement = Statement::Candidate(candidate.candidate.clone()) + ) + } + ValidityVote::Valid(ref sig) => { + attempt_send!( + StatementTrace::Valid(sender.clone(), digest.clone()), + sender = sender.clone(), + sig = sig.clone(), + statement = Statement::Valid(digest.clone()) + ) + } + ValidityVote::Invalid(ref sig) => { + attempt_send!( + StatementTrace::Invalid(sender.clone(), digest.clone()), + sender = sender.clone(), + sig = sig.clone(), + statement = Statement::Invalid(digest.clone()) + ) + } + } + }; + + + for (sender, sig) in candidate.availability_votes.iter() { + attempt_send!( + StatementTrace::Available(sender.clone(), digest.clone()), + sender = sender.clone(), + sig = sig.clone(), + statement = Statement::Available(digest.clone()) + ) + } + } + } + fn note_trace_seen(&mut self, trace: StatementTrace, known_by: C::ValidatorId) { self.validator_data.entry(known_by).or_insert_with(|| ValidatorData { proposal: None, diff --git a/substrate/candidate-agreement/src/tests/mod.rs b/substrate/candidate-agreement/src/tests/mod.rs new file mode 100644 index 0000000000..d4f5532fbd --- /dev/null +++ b/substrate/candidate-agreement/src/tests/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Tests and test helpers for the candidate agreement. + +const VALIDITY_CHECK_DELAY_MS: isize = 400; +const AVAILABILITY_CHECK_DELAY_MS: isize = 200; + +use super::*;