From 3d9cc2697c6df6c84db3a631cabe6d79501d6d39 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 10 Jan 2018 21:20:53 +0100 Subject: [PATCH] incoming message handler --- .../src/bft/accumulator.rs | 5 - substrate/candidate-agreement/src/bft/mod.rs | 9 +- .../src/handle_incoming.rs | 218 ++++++++++++++++++ substrate/candidate-agreement/src/lib.rs | 150 ++++++++---- .../candidate-agreement/src/round_robin.rs | 8 +- substrate/candidate-agreement/src/table.rs | 21 +- 6 files changed, 338 insertions(+), 73 deletions(-) create mode 100644 substrate/candidate-agreement/src/handle_incoming.rs diff --git a/substrate/candidate-agreement/src/bft/accumulator.rs b/substrate/candidate-agreement/src/bft/accumulator.rs index 8999a9f29b..3e46c0c311 100644 --- a/substrate/candidate-agreement/src/bft/accumulator.rs +++ b/substrate/candidate-agreement/src/bft/accumulator.rs @@ -171,11 +171,6 @@ impl Accumulator &ValidatorId { - &self.round_proposer - } - pub fn proposal(&self) -> Option<&Candidate> { self.proposal.as_ref() } diff --git a/substrate/candidate-agreement/src/bft/mod.rs b/substrate/candidate-agreement/src/bft/mod.rs index 71adbf609a..2cdd6d5f4d 100644 --- a/substrate/candidate-agreement/src/bft/mod.rs +++ b/substrate/candidate-agreement/src/bft/mod.rs @@ -699,8 +699,15 @@ impl Future for Agreement /// conclude without having witnessed the conclusion. /// In general, this future should be pre-empted by the import of a justification /// set for this block height. -pub fn agree(context: C, nodes: usize, max_faulty: usize, input: I, output: O) +pub fn agree(context: C, nodes: usize, max_faulty: usize, input: I, output: O) -> Agreement + where + C: Context, + C::RoundTimeout: Future, + C::CreateProposal: Future, + I: Stream,Error=E>, + O: Sink,SinkError=E>, + E: From, { let strategy = Strategy::create(&context, nodes, max_faulty); Agreement { diff --git a/substrate/candidate-agreement/src/handle_incoming.rs b/substrate/candidate-agreement/src/handle_incoming.rs new file mode 100644 index 0000000000..331ee7a923 --- /dev/null +++ b/substrate/candidate-agreement/src/handle_incoming.rs @@ -0,0 +1,218 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A stream that handles incoming messages to the BFT agreement module and statement +//! table. It forwards as necessary, and dispatches requests for determining availability +//! and validity of candidates as necessary. + +use std::collections::HashSet; + +use futures::prelude::*; +use futures::stream::{Fuse, FuturesUnordered}; +use futures::sync::mpsc; + +use table::{self, Statement, SignedStatement, Context as TableContext}; + +use super::{Context, CheckedMessage, SharedTable, TypeResolve}; + +enum CheckResult { + Available, + Unavailable, + Valid, + Invalid, +} + +enum Checking { + Availability(D, A), + Validity(D, V), +} + +impl Future for Checking + where + D: Clone, + A: Future, + V: Future, +{ + type Item = (D, CheckResult); + type Error = E; + + fn poll(&mut self) -> Poll { + Ok(Async::Ready(match *self { + Checking::Availability(ref digest, ref mut f) => { + match try_ready!(f.poll()) { + true => (digest.clone(), CheckResult::Available), + false => (digest.clone(), CheckResult::Unavailable), + } + } + Checking::Validity(ref digest, ref mut f) => { + match try_ready!(f.poll()) { + true => (digest.clone(), CheckResult::Valid), + false => (digest.clone(), CheckResult::Invalid), + } + } + })) + } +} + +/// Handles incoming messages to the BFT service and statement table. +/// +/// Also triggers requests for determining validity and availability of other +/// parachain candidates. +pub struct HandleIncoming { + table: SharedTable, + messages_in: Fuse, + bft_out: mpsc::UnboundedSender<::BftCommunication>, + local_id: C::ValidatorId, + requesting_about: FuturesUnordered::Future, + ::Future, + >>, + checked_validity: HashSet, + checked_availability: HashSet, +} + +impl HandleIncoming { + fn sign_and_import_statement(&self, digest: C::Digest, result: CheckResult) { + let statement = match result { + CheckResult::Valid => Statement::Valid(digest), + CheckResult::Invalid => Statement::Invalid(digest), + CheckResult::Available => Statement::Available(digest), + CheckResult::Unavailable => return, // no such statement and not provable. + }; + + let signature = self.table.context().sign_table_statement(&statement); + + let statement = SignedStatement { + statement, + signature, + sender: self.local_id.clone(), + }; + + // TODO: trigger broadcast to peers immediately? + self.table.import_statement(statement, None); + } + + fn import_message(&mut self, origin: C::ValidatorId, message: CheckedMessage) { + match message { + CheckedMessage::Bft(msg) => { let _ = self.bft_out.unbounded_send(msg); } + CheckedMessage::Table(table_messages) => { + // import all table messages and check for any that we + // need to produce statements for. + let msg_iter = table_messages + .into_iter() + .map(|m| (m, Some(origin.clone()))); + let summaries: Vec<_> = self.table.import_statements(msg_iter); + + for summary in summaries { + self.dispatch_on_summary(summary) + } + } + } + } + + // on new candidates in our group, begin checking validity. + // on new candidates in our availability sphere, begin checking availability. + fn dispatch_on_summary(&mut self, summary: table::Summary) { + let is_validity_member = + self.table.context().is_member_of(&self.local_id, &summary.group_id); + + let is_availability_member = + self.table.context().is_availability_guarantor_of(&self.local_id, &summary.group_id); + + let digest = &summary.candidate; + + // TODO: consider a strategy based on the number of candidate votes as well. + let checking_validity = is_validity_member && self.checked_validity.insert(digest.clone()); + let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone()); + + if checking_validity || checking_availability { + let context = &*self.table.context(); + let requesting_about = &mut self.requesting_about; + self.table.with_candidate(digest, |c| match c { + None => {} // TODO: handle table inconsistency somehow? + Some(candidate) => { + if checking_validity { + let future = context.check_validity(candidate).into_future(); + let checking = Checking::Validity(digest.clone(), future); + requesting_about.push(checking); + } + + if checking_availability { + let future = context.check_availability(candidate).into_future(); + let checking = Checking::Availability(digest.clone(), future); + requesting_about.push(checking); + } + } + }) + } + } +} + +impl HandleIncoming + where + C: Context, + I: Stream),Error=E>, + C::CheckAvailability: IntoFuture, + C::CheckCandidate: IntoFuture, +{ + pub fn new( + table: SharedTable, + messages_in: I, + bft_out: mpsc::UnboundedSender<::BftCommunication>, + ) -> Self { + let local_id = table.context().local_id(); + + HandleIncoming { + table, + bft_out, + local_id, + messages_in: messages_in.fuse(), + requesting_about: FuturesUnordered::new(), + checked_validity: HashSet::new(), + checked_availability: HashSet::new(), + } + } +} + +impl Future for HandleIncoming + where + C: Context, + I: Stream),Error=E>, + C::CheckAvailability: IntoFuture, + C::CheckCandidate: IntoFuture, +{ + type Item = (); + type Error = E; + + fn poll(&mut self) -> Poll<(), E> { + loop { + // FuturesUnordered is safe to poll after it has completed. + while let Async::Ready(Some((d, r))) = self.requesting_about.poll()? { + self.sign_and_import_statement(d, r); + } + + match try_ready!(self.messages_in.poll()) { + None => if self.requesting_about.is_empty() { + return Ok(Async::Ready(())) + } else { + return Ok(Async::NotReady) + }, + Some((origin, msg)) => self.import_message(origin, msg), + } + } + } +} diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index f6ea13be30..16985ce8d7 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -29,6 +29,7 @@ //! //! Groups themselves may be compromised by malicious validators. +#[macro_use] extern crate futures; extern crate parking_lot; extern crate tokio_timer; @@ -37,6 +38,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::hash::Hash; use std::sync::Arc; +use std::time::Duration; use futures::prelude::*; use futures::sync::{mpsc, oneshot}; @@ -46,6 +48,7 @@ use tokio_timer::Timer; use table::Table; mod bft; +mod handle_incoming; mod round_robin; mod table; @@ -150,6 +153,14 @@ struct TableContext { groups: HashMap>, } +impl ::std::ops::Deref for TableContext { + type Target = C; + + fn deref(&self) -> &C { + &self.context + } +} + impl table::Context for TableContext { type ValidatorId = C::ValidatorId; type Digest = C::Digest; @@ -182,55 +193,59 @@ impl table::Context for TableContext { } // A shared table object. -struct SharedTableInner { - context: TableContext, +struct SharedTableInner { table: Table>, awaiting_proposal: Vec>, } -impl SharedTableInner { +impl SharedTableInner { fn import_statement( &mut self, + context: &TableContext, statement: ::SignedTableStatement, received_from: Option ) -> Option> { - self.table.import_statement(&self.context, statement, received_from) + self.table.import_statement(context, statement, received_from) } - fn update_proposal(&mut self) { + fn update_proposal(&mut self, context: &TableContext) { if self.awaiting_proposal.is_empty() { return } - let proposal_candidates = self.table.proposed_candidates(&self.context); - if let Some(proposal) = self.context.context.create_proposal(proposal_candidates) { + let proposal_candidates = self.table.proposed_candidates(context); + if let Some(proposal) = context.context.create_proposal(proposal_candidates) { for sender in self.awaiting_proposal.drain(..) { let _ = sender.send(proposal.clone()); } } } - fn get_proposal(&mut self) -> oneshot::Receiver { + fn get_proposal(&mut self, context: &TableContext) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); self.awaiting_proposal.push(tx); - self.update_proposal(); + self.update_proposal(context); rx } - fn proposal_valid(&mut self, proposal: &C::Proposal) -> bool { - self.context.context.proposal_valid(proposal, |contained_candidate| { + fn proposal_valid(&mut self, context: &TableContext, proposal: &C::Proposal) -> bool { + context.context.proposal_valid(proposal, |contained_candidate| { // check that the candidate is valid (has enough votes) let digest = C::candidate_digest(contained_candidate); - self.table.candidate_includable(&digest, &self.context) + self.table.candidate_includable(&digest, context) }) } } /// A shared table object. pub struct SharedTable { + context: Arc>, inner: Arc>>, } impl Clone for SharedTable { fn clone(&self) -> Self { - SharedTable { inner: self.inner.clone() } + SharedTable { + context: self.context.clone(), + inner: self.inner.clone() + } } } @@ -238,9 +253,9 @@ impl SharedTable { /// Create a new shared table. pub fn new(context: C, groups: HashMap>) -> Self { SharedTable { + context: Arc::new(TableContext { context, groups }), inner: Arc::new(Mutex::new(SharedTableInner { table: Table::default(), - context: TableContext { context, groups }, awaiting_proposal: Vec::new(), })) } @@ -252,7 +267,7 @@ impl SharedTable { statement: ::SignedTableStatement, received_from: Option, ) -> Option> { - self.inner.lock().import_statement(statement, received_from) + self.inner.lock().import_statement(&*self.context, statement, received_from) } /// Import many statements at once. @@ -266,24 +281,39 @@ impl SharedTable { let mut inner = self.inner.lock(); iterable.into_iter().filter_map(move |(statement, received_from)| { - inner.import_statement(statement, received_from) + inner.import_statement(&*self.context, statement, received_from) }).collect() } /// Update the proposal sealing. pub fn update_proposal(&self) { - self.inner.lock().update_proposal() + self.inner.lock().update_proposal(&*self.context) } /// Register interest in receiving a proposal when ready. /// If one is ready immediately, it will be provided. pub fn get_proposal(&self) -> oneshot::Receiver { - self.inner.lock().get_proposal() + self.inner.lock().get_proposal(&*self.context) } /// Check if a proposal is valid. pub fn proposal_valid(&self, proposal: &C::Proposal) -> bool { - self.inner.lock().proposal_valid(proposal) + self.inner.lock().proposal_valid(&*self.context, proposal) + } + + /// Execute a closure using a specific candidate. + /// + /// Deadlocks if called recursively. + pub fn with_candidate(&self, digest: &C::Digest, f: F) -> U + where F: FnOnce(Option<&C::ParachainCandidate>) -> U + { + let inner = self.inner.lock(); + f(inner.table.get_candidate(digest)) + } + + // Get a handle to the table context. + fn context(&self) -> &TableContext { + &*self.context } } @@ -310,8 +340,7 @@ pub struct BftContext { } impl bft::Context for BftContext - where - C::Proposal: 'static, + where C::Proposal: 'static, { type ValidatorId = C::ValidatorId; type Digest = C::Digest; @@ -358,11 +387,19 @@ impl bft::Context for BftContext .unwrap_or_else(u64::max_value) .saturating_mul(self.round_timeout_multiplier); - Box::new(self.timer.sleep(::std::time::Duration::from_secs(timeout)) + Box::new(self.timer.sleep(Duration::from_secs(timeout)) .map_err(|_| Error::FaultyTimer)) } } +/// Unchecked message. These haven't had signature recovery run on them. +#[derive(Debug, PartialEq, Eq)] +pub struct UncheckedMessage { + /// The data of the message. + pub data: Vec, +} + + /// Parameters necessary for agreement. pub struct AgreementParams { /// The context itself. @@ -377,31 +414,54 @@ pub struct AgreementParams { pub max_faulty: usize, /// The round timeout multiplier: 2^round_number is multiplied by this. pub round_timeout_multiplier: u64, + /// The maximum amount of messages to queue. + pub message_buffer_size: usize, + /// Interval to attempt forming proposals over. + pub form_proposal_interval: Duration, } -/// Future and I/O to reach agreement. -pub struct Agreement { - /// The future holding the actual BFT logic. - pub bft: Box, - Error=Error, - >>, - /// The input sink. - pub input: mpsc::UnboundedSender>, - /// The output stream. - pub output: mpsc::UnboundedReceiver>, +/// Recovery for messages +pub trait MessageRecovery { + /// Attempt to transform a checked message into an unchecked. + fn check_message(&self, UncheckedMessage) -> Option>; +} + +/// Recovered and fully checked messages. +pub enum CheckedMessage { + /// Messages meant for the BFT agreement logic. + Bft(::BftCommunication), + /// Statements circulating about the table. + Table(Vec<::SignedTableStatement>), } /// Create an agreement future, and I/O streams. -pub fn agree(params: AgreementParams) - -> Agreement> +pub fn agree(params: AgreementParams, net_in: I, net_out: O, recovery: R) + -> Box> + where + C: Context + 'static, + C::CheckCandidate: IntoFuture, + C::CheckAvailability: IntoFuture, + I: Stream),Error=E>, + O: Sink>, + R: MessageRecovery, { - let (in_in, in_out) = mpsc::unbounded(); - let (out_in, out_out) = mpsc::unbounded(); + let (bft_in_in, bft_in_out) = mpsc::unbounded(); + let (bft_out_in, bft_out_out) = mpsc::unbounded::>>(); + + let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size); + + let round_robin_recovered = round_robin + .filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x))); + + let route_messages_in = handle_incoming::HandleIncoming::new( + params.table.clone(), + round_robin_recovered, + bft_in_in, + ).map_err(|_| Error::IoTerminated); let bft_context = BftContext { context: params.context, - table: params.table, + table: params.table.clone(), timer: params.timer, round_timeout_multiplier: params.round_timeout_multiplier, }; @@ -410,13 +470,13 @@ pub fn agree(params: AgreementParams) bft_context, params.nodes, params.max_faulty, - in_out.map_err(|_| Error::IoTerminated), - out_in.sink_map_err(|_| Error::IoTerminated), + bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated), + bft_out_in.sink_map_err(|_| Error::IoTerminated), ); - Agreement { - bft: Box::new(agreement), - input: in_in, - output: out_out, - } + let route_messages_out = futures::future::empty::<(), _>(); + + agreement.join(route_messages_in).join(route_messages_out); + + unimplemented!() } diff --git a/substrate/candidate-agreement/src/round_robin.rs b/substrate/candidate-agreement/src/round_robin.rs index 9a061a27d7..c0620d1a8e 100644 --- a/substrate/candidate-agreement/src/round_robin.rs +++ b/substrate/candidate-agreement/src/round_robin.rs @@ -24,14 +24,10 @@ use std::collections::{Bound, BTreeMap, VecDeque}; use futures::prelude::*; use futures::stream::Fuse; -/// Unchecked message. These haven't had signature recovery run on them. -#[derive(Debug, PartialEq, Eq)] -pub struct UncheckedMessage { - /// The data of the message. - pub data: Vec, -} +use super::UncheckedMessage; /// Implementation of the round-robin buffer for incoming messages. +#[derive(Debug)] pub struct RoundRobinBuffer { buffer: BTreeMap>, last_processed_from: Option, diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs index d8b2898aef..53e492c765 100644 --- a/substrate/candidate-agreement/src/table.rs +++ b/substrate/candidate-agreement/src/table.rs @@ -212,17 +212,6 @@ impl CandidateData { !self.indicated_bad_by.is_empty() } - /// Get an iterator over those who have indicated this candidate valid. - // TODO: impl trait - pub fn voted_valid_by<'a>(&'a self) -> Box + 'a> { - Box::new(self.validity_votes.iter().filter_map(|(v, vote)| { - match *vote { - ValidityVote::Issued(_) | ValidityVote::Valid(_) => Some(v.clone()), - ValidityVote::Invalid(_) => None, - } - })) - } - // Candidate data can be included in a proposal // if it has enough validity and availability votes // and no validators have called it bad. @@ -323,11 +312,6 @@ impl Table { Box::new(self.candidate_votes.values().filter(move |c| c.group_id == group_id)) } - /// Drain all misbehavior observed up to this point. - pub fn drain_misbehavior(&mut self) -> HashMap::Misbehavior> { - ::std::mem::replace(&mut self.detected_misbehavior, HashMap::new()) - } - /// Import a signed statement. Signatures should be checked for validity, and the /// sender should be checked to actually be a validator. /// @@ -390,6 +374,11 @@ impl Table { maybe_summary } + /// Get a candidate by digest. + pub fn get_candidate(&self, digest: &C::Digest) -> Option<&C::Candidate> { + self.candidate_votes.get(digest).map(|d| &d.candidate) + } + fn note_trace_seen(&mut self, trace: StatementTrace, known_by: C::ValidatorId) { self.validator_data.entry(known_by).or_insert_with(|| ValidatorData { proposal: None,