incoming message handler

This commit is contained in:
Robert Habermeier
2018-01-10 21:20:53 +01:00
parent 7b67bc63da
commit 3d9cc2697c
6 changed files with 338 additions and 73 deletions
@@ -171,11 +171,6 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
self.round_number.clone()
}
/// Get the round proposer.
pub fn round_proposer(&self) -> &ValidatorId {
&self.round_proposer
}
pub fn proposal(&self) -> Option<&Candidate> {
self.proposal.as_ref()
}
+8 -1
View File
@@ -699,8 +699,15 @@ impl<C, I, O, E> Future for Agreement<C, I, O>
/// conclude without having witnessed the conclusion.
/// In general, this future should be pre-empted by the import of a justification
/// set for this block height.
pub fn agree<C: Context, I, O>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
pub fn agree<C: Context, I, O, E>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
-> Agreement<C, I, O>
where
C: Context,
C::RoundTimeout: Future<Error=E>,
C::CreateProposal: Future<Error=E>,
I: Stream<Item=ContextCommunication<C>,Error=E>,
O: Sink<SinkItem=ContextCommunication<C>,SinkError=E>,
E: From<InputStreamConcluded>,
{
let strategy = Strategy::create(&context, nodes, max_faulty);
Agreement {
@@ -0,0 +1,218 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A stream that handles incoming messages to the BFT agreement module and statement
//! table. It forwards as necessary, and dispatches requests for determining availability
//! and validity of candidates as necessary.
use std::collections::HashSet;
use futures::prelude::*;
use futures::stream::{Fuse, FuturesUnordered};
use futures::sync::mpsc;
use table::{self, Statement, SignedStatement, Context as TableContext};
use super::{Context, CheckedMessage, SharedTable, TypeResolve};
enum CheckResult {
Available,
Unavailable,
Valid,
Invalid,
}
enum Checking<D, A, V> {
Availability(D, A),
Validity(D, V),
}
impl<D, A, V, E> Future for Checking<D, A, V>
where
D: Clone,
A: Future<Item=bool,Error=E>,
V: Future<Item=bool,Error=E>,
{
type Item = (D, CheckResult);
type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(match *self {
Checking::Availability(ref digest, ref mut f) => {
match try_ready!(f.poll()) {
true => (digest.clone(), CheckResult::Available),
false => (digest.clone(), CheckResult::Unavailable),
}
}
Checking::Validity(ref digest, ref mut f) => {
match try_ready!(f.poll()) {
true => (digest.clone(), CheckResult::Valid),
false => (digest.clone(), CheckResult::Invalid),
}
}
}))
}
}
/// Handles incoming messages to the BFT service and statement table.
///
/// Also triggers requests for determining validity and availability of other
/// parachain candidates.
pub struct HandleIncoming<C: Context, I> {
table: SharedTable<C>,
messages_in: Fuse<I>,
bft_out: mpsc::UnboundedSender<<C as TypeResolve>::BftCommunication>,
local_id: C::ValidatorId,
requesting_about: FuturesUnordered<Checking<
C::Digest,
<C::CheckAvailability as IntoFuture>::Future,
<C::CheckCandidate as IntoFuture>::Future,
>>,
checked_validity: HashSet<C::Digest>,
checked_availability: HashSet<C::Digest>,
}
impl<C: Context, I> HandleIncoming<C, I> {
fn sign_and_import_statement(&self, digest: C::Digest, result: CheckResult) {
let statement = match result {
CheckResult::Valid => Statement::Valid(digest),
CheckResult::Invalid => Statement::Invalid(digest),
CheckResult::Available => Statement::Available(digest),
CheckResult::Unavailable => return, // no such statement and not provable.
};
let signature = self.table.context().sign_table_statement(&statement);
let statement = SignedStatement {
statement,
signature,
sender: self.local_id.clone(),
};
// TODO: trigger broadcast to peers immediately?
self.table.import_statement(statement, None);
}
fn import_message(&mut self, origin: C::ValidatorId, message: CheckedMessage<C>) {
match message {
CheckedMessage::Bft(msg) => { let _ = self.bft_out.unbounded_send(msg); }
CheckedMessage::Table(table_messages) => {
// import all table messages and check for any that we
// need to produce statements for.
let msg_iter = table_messages
.into_iter()
.map(|m| (m, Some(origin.clone())));
let summaries: Vec<_> = self.table.import_statements(msg_iter);
for summary in summaries {
self.dispatch_on_summary(summary)
}
}
}
}
// on new candidates in our group, begin checking validity.
// on new candidates in our availability sphere, begin checking availability.
fn dispatch_on_summary(&mut self, summary: table::Summary<C::Digest, C::GroupId>) {
let is_validity_member =
self.table.context().is_member_of(&self.local_id, &summary.group_id);
let is_availability_member =
self.table.context().is_availability_guarantor_of(&self.local_id, &summary.group_id);
let digest = &summary.candidate;
// TODO: consider a strategy based on the number of candidate votes as well.
let checking_validity = is_validity_member && self.checked_validity.insert(digest.clone());
let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone());
if checking_validity || checking_availability {
let context = &*self.table.context();
let requesting_about = &mut self.requesting_about;
self.table.with_candidate(digest, |c| match c {
None => {} // TODO: handle table inconsistency somehow?
Some(candidate) => {
if checking_validity {
let future = context.check_validity(candidate).into_future();
let checking = Checking::Validity(digest.clone(), future);
requesting_about.push(checking);
}
if checking_availability {
let future = context.check_availability(candidate).into_future();
let checking = Checking::Availability(digest.clone(), future);
requesting_about.push(checking);
}
}
})
}
}
}
impl<C, I, E> HandleIncoming<C, I>
where
C: Context,
I: Stream<Item=(C::ValidatorId, CheckedMessage<C>),Error=E>,
C::CheckAvailability: IntoFuture<Error=E>,
C::CheckCandidate: IntoFuture<Error=E>,
{
pub fn new(
table: SharedTable<C>,
messages_in: I,
bft_out: mpsc::UnboundedSender<<C as TypeResolve>::BftCommunication>,
) -> Self {
let local_id = table.context().local_id();
HandleIncoming {
table,
bft_out,
local_id,
messages_in: messages_in.fuse(),
requesting_about: FuturesUnordered::new(),
checked_validity: HashSet::new(),
checked_availability: HashSet::new(),
}
}
}
impl<C, I, E> Future for HandleIncoming<C, I>
where
C: Context,
I: Stream<Item=(C::ValidatorId, CheckedMessage<C>),Error=E>,
C::CheckAvailability: IntoFuture<Error=E>,
C::CheckCandidate: IntoFuture<Error=E>,
{
type Item = ();
type Error = E;
fn poll(&mut self) -> Poll<(), E> {
loop {
// FuturesUnordered is safe to poll after it has completed.
while let Async::Ready(Some((d, r))) = self.requesting_about.poll()? {
self.sign_and_import_statement(d, r);
}
match try_ready!(self.messages_in.poll()) {
None => if self.requesting_about.is_empty() {
return Ok(Async::Ready(()))
} else {
return Ok(Async::NotReady)
},
Some((origin, msg)) => self.import_message(origin, msg),
}
}
}
}
+105 -45
View File
@@ -29,6 +29,7 @@
//!
//! Groups themselves may be compromised by malicious validators.
#[macro_use]
extern crate futures;
extern crate parking_lot;
extern crate tokio_timer;
@@ -37,6 +38,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
use futures::prelude::*;
use futures::sync::{mpsc, oneshot};
@@ -46,6 +48,7 @@ use tokio_timer::Timer;
use table::Table;
mod bft;
mod handle_incoming;
mod round_robin;
mod table;
@@ -150,6 +153,14 @@ struct TableContext<C: Context> {
groups: HashMap<C::GroupId, GroupInfo<C::ValidatorId>>,
}
impl<C: Context> ::std::ops::Deref for TableContext<C> {
type Target = C;
fn deref(&self) -> &C {
&self.context
}
}
impl<C: Context> table::Context for TableContext<C> {
type ValidatorId = C::ValidatorId;
type Digest = C::Digest;
@@ -182,55 +193,59 @@ impl<C: Context> table::Context for TableContext<C> {
}
// A shared table object.
struct SharedTableInner<C: Context + Clone> {
context: TableContext<C>,
struct SharedTableInner<C: Context> {
table: Table<TableContext<C>>,
awaiting_proposal: Vec<oneshot::Sender<C::Proposal>>,
}
impl<C: Context + Clone> SharedTableInner<C> {
impl<C: Context> SharedTableInner<C> {
fn import_statement(
&mut self,
context: &TableContext<C>,
statement: <C as TypeResolve>::SignedTableStatement,
received_from: Option<C::ValidatorId>
) -> Option<table::Summary<C::Digest, C::GroupId>> {
self.table.import_statement(&self.context, statement, received_from)
self.table.import_statement(context, statement, received_from)
}
fn update_proposal(&mut self) {
fn update_proposal(&mut self, context: &TableContext<C>) {
if self.awaiting_proposal.is_empty() { return }
let proposal_candidates = self.table.proposed_candidates(&self.context);
if let Some(proposal) = self.context.context.create_proposal(proposal_candidates) {
let proposal_candidates = self.table.proposed_candidates(context);
if let Some(proposal) = context.context.create_proposal(proposal_candidates) {
for sender in self.awaiting_proposal.drain(..) {
let _ = sender.send(proposal.clone());
}
}
}
fn get_proposal(&mut self) -> oneshot::Receiver<C::Proposal> {
fn get_proposal(&mut self, context: &TableContext<C>) -> oneshot::Receiver<C::Proposal> {
let (tx, rx) = oneshot::channel();
self.awaiting_proposal.push(tx);
self.update_proposal();
self.update_proposal(context);
rx
}
fn proposal_valid(&mut self, proposal: &C::Proposal) -> bool {
self.context.context.proposal_valid(proposal, |contained_candidate| {
fn proposal_valid(&mut self, context: &TableContext<C>, proposal: &C::Proposal) -> bool {
context.context.proposal_valid(proposal, |contained_candidate| {
// check that the candidate is valid (has enough votes)
let digest = C::candidate_digest(contained_candidate);
self.table.candidate_includable(&digest, &self.context)
self.table.candidate_includable(&digest, context)
})
}
}
/// A shared table object.
pub struct SharedTable<C: Context> {
context: Arc<TableContext<C>>,
inner: Arc<Mutex<SharedTableInner<C>>>,
}
impl<C: Context> Clone for SharedTable<C> {
fn clone(&self) -> Self {
SharedTable { inner: self.inner.clone() }
SharedTable {
context: self.context.clone(),
inner: self.inner.clone()
}
}
}
@@ -238,9 +253,9 @@ impl<C: Context> SharedTable<C> {
/// Create a new shared table.
pub fn new(context: C, groups: HashMap<C::GroupId, GroupInfo<C::ValidatorId>>) -> Self {
SharedTable {
context: Arc::new(TableContext { context, groups }),
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
context: TableContext { context, groups },
awaiting_proposal: Vec::new(),
}))
}
@@ -252,7 +267,7 @@ impl<C: Context> SharedTable<C> {
statement: <C as TypeResolve>::SignedTableStatement,
received_from: Option<C::ValidatorId>,
) -> Option<table::Summary<C::Digest, C::GroupId>> {
self.inner.lock().import_statement(statement, received_from)
self.inner.lock().import_statement(&*self.context, statement, received_from)
}
/// Import many statements at once.
@@ -266,24 +281,39 @@ impl<C: Context> SharedTable<C> {
let mut inner = self.inner.lock();
iterable.into_iter().filter_map(move |(statement, received_from)| {
inner.import_statement(statement, received_from)
inner.import_statement(&*self.context, statement, received_from)
}).collect()
}
/// Update the proposal sealing.
pub fn update_proposal(&self) {
self.inner.lock().update_proposal()
self.inner.lock().update_proposal(&*self.context)
}
/// Register interest in receiving a proposal when ready.
/// If one is ready immediately, it will be provided.
pub fn get_proposal(&self) -> oneshot::Receiver<C::Proposal> {
self.inner.lock().get_proposal()
self.inner.lock().get_proposal(&*self.context)
}
/// Check if a proposal is valid.
pub fn proposal_valid(&self, proposal: &C::Proposal) -> bool {
self.inner.lock().proposal_valid(proposal)
self.inner.lock().proposal_valid(&*self.context, proposal)
}
/// Execute a closure using a specific candidate.
///
/// Deadlocks if called recursively.
pub fn with_candidate<F, U>(&self, digest: &C::Digest, f: F) -> U
where F: FnOnce(Option<&C::ParachainCandidate>) -> U
{
let inner = self.inner.lock();
f(inner.table.get_candidate(digest))
}
// Get a handle to the table context.
fn context(&self) -> &TableContext<C> {
&*self.context
}
}
@@ -310,8 +340,7 @@ pub struct BftContext<C: Context> {
}
impl<C: Context> bft::Context for BftContext<C>
where
C::Proposal: 'static,
where C::Proposal: 'static,
{
type ValidatorId = C::ValidatorId;
type Digest = C::Digest;
@@ -358,11 +387,19 @@ impl<C: Context> bft::Context for BftContext<C>
.unwrap_or_else(u64::max_value)
.saturating_mul(self.round_timeout_multiplier);
Box::new(self.timer.sleep(::std::time::Duration::from_secs(timeout))
Box::new(self.timer.sleep(Duration::from_secs(timeout))
.map_err(|_| Error::FaultyTimer))
}
}
/// Unchecked message. These haven't had signature recovery run on them.
#[derive(Debug, PartialEq, Eq)]
pub struct UncheckedMessage {
/// The data of the message.
pub data: Vec<u8>,
}
/// Parameters necessary for agreement.
pub struct AgreementParams<C: Context> {
/// The context itself.
@@ -377,31 +414,54 @@ pub struct AgreementParams<C: Context> {
pub max_faulty: usize,
/// The round timeout multiplier: 2^round_number is multiplied by this.
pub round_timeout_multiplier: u64,
/// The maximum amount of messages to queue.
pub message_buffer_size: usize,
/// Interval to attempt forming proposals over.
pub form_proposal_interval: Duration,
}
/// Future and I/O to reach agreement.
pub struct Agreement<C: bft::Context + 'static> {
/// The future holding the actual BFT logic.
pub bft: Box<Future<
Item=bft::Committed<C::Candidate, C::Digest, C::Signature>,
Error=Error,
>>,
/// The input sink.
pub input: mpsc::UnboundedSender<bft::ContextCommunication<C>>,
/// The output stream.
pub output: mpsc::UnboundedReceiver<bft::ContextCommunication<C>>,
/// Recovery for messages
pub trait MessageRecovery<C: Context> {
/// Attempt to transform a checked message into an unchecked.
fn check_message(&self, UncheckedMessage) -> Option<CheckedMessage<C>>;
}
/// Recovered and fully checked messages.
pub enum CheckedMessage<C: Context> {
/// Messages meant for the BFT agreement logic.
Bft(<C as TypeResolve>::BftCommunication),
/// Statements circulating about the table.
Table(Vec<<C as TypeResolve>::SignedTableStatement>),
}
/// Create an agreement future, and I/O streams.
pub fn agree<C: Context + 'static>(params: AgreementParams<C>)
-> Agreement<BftContext<C>>
pub fn agree<C, I, O, R, E>(params: AgreementParams<C>, net_in: I, net_out: O, recovery: R)
-> Box<Future<Item=(),Error=()>>
where
C: Context + 'static,
C::CheckCandidate: IntoFuture<Error=E>,
C::CheckAvailability: IntoFuture<Error=E>,
I: Stream<Item=(C::ValidatorId, Vec<UncheckedMessage>),Error=E>,
O: Sink<SinkItem=CheckedMessage<C>>,
R: MessageRecovery<C>,
{
let (in_in, in_out) = mpsc::unbounded();
let (out_in, out_out) = mpsc::unbounded();
let (bft_in_in, bft_in_out) = mpsc::unbounded();
let (bft_out_in, bft_out_out) = mpsc::unbounded::<bft::ContextCommunication<BftContext<C>>>();
let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size);
let round_robin_recovered = round_robin
.filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x)));
let route_messages_in = handle_incoming::HandleIncoming::new(
params.table.clone(),
round_robin_recovered,
bft_in_in,
).map_err(|_| Error::IoTerminated);
let bft_context = BftContext {
context: params.context,
table: params.table,
table: params.table.clone(),
timer: params.timer,
round_timeout_multiplier: params.round_timeout_multiplier,
};
@@ -410,13 +470,13 @@ pub fn agree<C: Context + 'static>(params: AgreementParams<C>)
bft_context,
params.nodes,
params.max_faulty,
in_out.map_err(|_| Error::IoTerminated),
out_in.sink_map_err(|_| Error::IoTerminated),
bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated),
bft_out_in.sink_map_err(|_| Error::IoTerminated),
);
Agreement {
bft: Box::new(agreement),
input: in_in,
output: out_out,
}
let route_messages_out = futures::future::empty::<(), _>();
agreement.join(route_messages_in).join(route_messages_out);
unimplemented!()
}
@@ -24,14 +24,10 @@ use std::collections::{Bound, BTreeMap, VecDeque};
use futures::prelude::*;
use futures::stream::Fuse;
/// Unchecked message. These haven't had signature recovery run on them.
#[derive(Debug, PartialEq, Eq)]
pub struct UncheckedMessage {
/// The data of the message.
pub data: Vec<u8>,
}
use super::UncheckedMessage;
/// Implementation of the round-robin buffer for incoming messages.
#[derive(Debug)]
pub struct RoundRobinBuffer<V: Ord + Eq, S> {
buffer: BTreeMap<V, VecDeque<UncheckedMessage>>,
last_processed_from: Option<V>,
+5 -16
View File
@@ -212,17 +212,6 @@ impl<C: Context> CandidateData<C> {
!self.indicated_bad_by.is_empty()
}
/// Get an iterator over those who have indicated this candidate valid.
// TODO: impl trait
pub fn voted_valid_by<'a>(&'a self) -> Box<Iterator<Item=C::ValidatorId> + 'a> {
Box::new(self.validity_votes.iter().filter_map(|(v, vote)| {
match *vote {
ValidityVote::Issued(_) | ValidityVote::Valid(_) => Some(v.clone()),
ValidityVote::Invalid(_) => None,
}
}))
}
// Candidate data can be included in a proposal
// if it has enough validity and availability votes
// and no validators have called it bad.
@@ -323,11 +312,6 @@ impl<C: Context> Table<C> {
Box::new(self.candidate_votes.values().filter(move |c| c.group_id == group_id))
}
/// Drain all misbehavior observed up to this point.
pub fn drain_misbehavior(&mut self) -> HashMap<C::ValidatorId, <C as ResolveMisbehavior>::Misbehavior> {
::std::mem::replace(&mut self.detected_misbehavior, HashMap::new())
}
/// Import a signed statement. Signatures should be checked for validity, and the
/// sender should be checked to actually be a validator.
///
@@ -390,6 +374,11 @@ impl<C: Context> Table<C> {
maybe_summary
}
/// Get a candidate by digest.
pub fn get_candidate(&self, digest: &C::Digest) -> Option<&C::Candidate> {
self.candidate_votes.get(digest).map(|d| &d.candidate)
}
fn note_trace_seen(&mut self, trace: StatementTrace<C::ValidatorId, C::Digest>, known_by: C::ValidatorId) {
self.validator_data.entry(known_by).or_insert_with(|| ValidatorData {
proposal: None,