From 7b67bc63daa5c3cf31bfb4adeab446dcee8cba23 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 9 Jan 2018 21:03:06 +0100 Subject: [PATCH] round-robin message handler --- substrate/candidate-agreement/src/lib.rs | 8 +- .../candidate-agreement/src/round_robin.rs | 167 ++++++++++++++++++ 2 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 substrate/candidate-agreement/src/round_robin.rs diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index 4a75f5b36d..f6ea13be30 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -45,13 +45,14 @@ use tokio_timer::Timer; use table::Table; -pub mod bft; -pub mod table; +mod bft; +mod round_robin; +mod table; /// Context necessary for agreement. pub trait Context: Send + Clone { /// A validator ID - type ValidatorId: Debug + Hash + Eq + Clone; + type ValidatorId: Debug + Hash + Eq + Clone + Ord; /// The digest (hash or other unique attribute) of a candidate. type Digest: Debug + Hash + Eq + Clone; /// The group ID type @@ -412,6 +413,7 @@ pub fn agree(params: AgreementParams) in_out.map_err(|_| Error::IoTerminated), out_in.sink_map_err(|_| Error::IoTerminated), ); + Agreement { bft: Box::new(agreement), input: in_in, diff --git a/substrate/candidate-agreement/src/round_robin.rs b/substrate/candidate-agreement/src/round_robin.rs new file mode 100644 index 0000000000..9a061a27d7 --- /dev/null +++ b/substrate/candidate-agreement/src/round_robin.rs @@ -0,0 +1,167 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Round-robin buffer for incoming messages. +//! +//! This takes batches of messages associated with a sender as input, +//! and yields messages in a fair order by sender. + +use std::collections::{Bound, BTreeMap, VecDeque}; + +use futures::prelude::*; +use futures::stream::Fuse; + +/// Unchecked message. These haven't had signature recovery run on them. +#[derive(Debug, PartialEq, Eq)] +pub struct UncheckedMessage { + /// The data of the message. + pub data: Vec, +} + +/// Implementation of the round-robin buffer for incoming messages. +pub struct RoundRobinBuffer { + buffer: BTreeMap>, + last_processed_from: Option, + stored_messages: usize, + max_messages: usize, + inner: Fuse, +} + +impl RoundRobinBuffer { + /// Create a new round-robin buffer which holds up to a maximum + /// amount of messages. + pub fn new(stream: S, buffer_size: usize) -> Self { + RoundRobinBuffer { + buffer: BTreeMap::new(), + last_processed_from: None, + stored_messages: 0, + max_messages: buffer_size, + inner: stream.fuse(), + } + } +} + +impl RoundRobinBuffer { + fn next_message(&mut self) -> Option<(V, UncheckedMessage)> { + if self.stored_messages == 0 { + return None + } + + // first pick up from the last authority we processed a message from + let mut next = { + let lower_bound = match self.last_processed_from { + None => Bound::Unbounded, + Some(ref x) => Bound::Excluded(x.clone()), + }; + + self.buffer.range_mut((lower_bound, Bound::Unbounded)) + .filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v))) + .next() + }; + + // but wrap around to the beginning again if we got nothing. + if next.is_none() { + next = self.buffer.iter_mut() + .filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v))) + .next(); + } + + if let Some((ref authority, _)) = next { + self.stored_messages -= 1; + self.last_processed_from = Some(authority.clone()); + } + + next + } + + // import messages, discarding when the buffer is full. + fn import_messages(&mut self, sender: V, messages: Vec) { + let space_remaining = self.max_messages - self.stored_messages; + self.stored_messages += ::std::cmp::min(space_remaining, messages.len()); + + let v = self.buffer.entry(sender).or_insert_with(VecDeque::new); + v.extend(messages.into_iter().take(space_remaining)); + } +} + +impl Stream for RoundRobinBuffer + where S: Stream)> +{ + type Item = (V, UncheckedMessage); + type Error = S::Error; + + fn poll(&mut self) -> Poll, S::Error> { + loop { + match self.inner.poll()? { + Async::NotReady | Async::Ready(None)=> break, + Async::Ready(Some((sender, msgs))) => self.import_messages(sender, msgs), + } + } + + let done = self.inner.is_done(); + Ok(match self.next_message() { + Some(msg) => Async::Ready(Some(msg)), + None => if done { Async::Ready(None) } else { Async::NotReady }, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::stream::{self, Stream}; + + #[test] + fn is_fair_and_wraps_around() { + let stream = stream::iter_ok(vec![ + (1, vec![ + UncheckedMessage { data: vec![1, 3, 5] }, + UncheckedMessage { data: vec![3, 5, 7] }, + UncheckedMessage { data: vec![5, 7, 9] }, + ]), + (2, vec![ + UncheckedMessage { data: vec![2, 4, 6] }, + UncheckedMessage { data: vec![4, 6, 8] }, + UncheckedMessage { data: vec![6, 8, 10] }, + ]), + ]); + + let round_robin = RoundRobinBuffer::new(stream, 100); + let output = round_robin.wait().collect::, ()>>().unwrap(); + + assert_eq!(output, vec![ + (1, UncheckedMessage { data: vec![1, 3, 5] }), + (2, UncheckedMessage { data: vec![2, 4, 6] }), + (1, UncheckedMessage { data: vec![3, 5, 7] }), + + (2, UncheckedMessage { data: vec![4, 6, 8] }), + (1, UncheckedMessage { data: vec![5, 7, 9] }), + (2, UncheckedMessage { data: vec![6, 8, 10] }), + ]); + } + + #[test] + fn discards_when_full() { + let stream = stream::iter_ok(vec![ + (1, (0..200).map(|i| UncheckedMessage { data: vec![i] }).collect()) + ]); + + let round_robin = RoundRobinBuffer::new(stream, 100); + let output = round_robin.wait().collect::, ()>>().unwrap(); + + assert_eq!(output.len(), 100); + } +}