diff --git a/substrate/candidate-agreement/src/bft/tests.rs b/substrate/candidate-agreement/src/bft/tests.rs index a7a3282cc9..cb20bcce62 100644 --- a/substrate/candidate-agreement/src/bft/tests.rs +++ b/substrate/candidate-agreement/src/bft/tests.rs @@ -18,11 +18,13 @@ use super::*; +use tests::Network; + use std::sync::{Arc, Mutex}; use std::time::Duration; use futures::prelude::*; -use futures::sync::{oneshot, mpsc}; +use futures::sync::oneshot; use futures::future::FutureResult; #[derive(Debug, PartialEq, Eq, Clone, Hash)] @@ -149,70 +151,6 @@ impl Context for TestContext { } } -type Comm = ContextCommunication; - -struct Network { - endpoints: Vec>, - input: mpsc::UnboundedReceiver<(usize, Comm)>, -} - -impl Network { - fn new(nodes: usize) - -> (Network, Vec>, Vec>) - { - let mut inputs = Vec::with_capacity(nodes); - let mut outputs = Vec::with_capacity(nodes); - let mut endpoints = Vec::with_capacity(nodes); - - let (in_tx, in_rx) = mpsc::unbounded(); - for _ in 0..nodes { - let (out_tx, out_rx) = mpsc::unbounded(); - inputs.push(in_tx.clone()); - outputs.push(out_rx); - endpoints.push(out_tx); - } - - let network = Network { - endpoints, - input: in_rx, - }; - - (network, inputs, outputs) - } - - fn route_on_thread(self) { - ::std::thread::spawn(move || { let _ = self.wait(); }); - } -} - -impl Future for Network { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll<(), Error> { - match self.input.poll() { - Err(_) => Err(Error), - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => Ok(Async::Ready(())), - Ok(Async::Ready(Some((sender, item)))) => { - { - let receiving_endpoints = self.endpoints - .iter() - .enumerate() - .filter(|&(i, _)| i != sender) - .map(|(_, x)| x); - - for endpoint in receiving_endpoints { - let _ = endpoint.unbounded_send(item.clone()); - } - } - - self.poll() - } - } - } -} - fn timeout_in(t: Duration) -> oneshot::Receiver<()> { let (tx, rx) = oneshot::channel(); ::std::thread::spawn(move || { diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index 7f5d374201..a589f26195 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -419,7 +419,7 @@ impl bft::Context for BftContext } fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { - let round = ::std::cmp::max(63, round) as u32; + let round = ::std::cmp::min(63, round) as u32; let timeout = 1u64.checked_shl(round) .unwrap_or_else(u64::max_value) .saturating_mul(self.round_timeout_multiplier); @@ -429,13 +429,6 @@ impl bft::Context for BftContext } } -/// Unchecked message. These haven't had signature recovery run on them. -#[derive(Debug, PartialEq, Eq)] -pub struct UncheckedMessage { - /// The data of the message. - pub data: Vec, -} - /// Parameters necessary for agreement. pub struct AgreementParams { @@ -459,8 +452,12 @@ pub struct AgreementParams { /// Recovery for messages pub trait MessageRecovery { + /// The unchecked message type. This implies that work hasn't been done + /// to decode the payload and check and authenticate a signature. + type UncheckedMessage; + /// Attempt to transform a checked message into an unchecked. - fn check_message(&self, UncheckedMessage) -> Option>; + fn check_message(&self, Self::UncheckedMessage) -> Option>; } /// A batch of statements to send out. @@ -468,6 +465,9 @@ pub trait StatementBatch { /// Get the target authorities of these statements. fn targets(&self) -> &[V]; + /// If the batch is empty. + fn is_empty(&self) -> bool; + /// Push a statement onto the batch. Returns false when the batch is full. /// /// This is meant to do work like incrementally serializing the statements @@ -485,6 +485,7 @@ pub enum CheckedMessage { } /// Outgoing messages to the network. +#[derive(Debug, Clone)] pub enum OutgoingMessage { /// Messages meant for BFT agreement peers. Bft(::BftCommunication), @@ -515,11 +516,11 @@ pub fn agree< Context: ::Context + 'static, Context::CheckCandidate: IntoFuture, Context::CheckAvailability: IntoFuture, - NetIn: Stream),Error=Err> + 'static, + NetIn: Stream),Error=Err> + 'static, NetOut: Sink> + 'static, Recovery: MessageRecovery + 'static, PropagateStatements: Stream + 'static, - LocalCandidate: Future + 'static + LocalCandidate: IntoFuture + 'static { let (bft_in_in, bft_in_out) = mpsc::unbounded(); let (bft_out_in, bft_out_out) = mpsc::unbounded(); @@ -559,6 +560,7 @@ pub fn agree< let periodic_table_statements = propagate_statements .or_else(|_| ::futures::future::empty()) // halt the stream instead of error. .map(move |mut batch| { table.fill_batch(&mut batch); batch }) + .filter(|b| !b.is_empty()) .map(OutgoingMessage::Table); let complete_out_stream = bft_out_out @@ -573,6 +575,7 @@ pub fn agree< let import_local_candidate = { let table = params.table.clone(); local_candidate + .into_future() .map(table::Statement::Candidate) .map(Some) .or_else(|_| Ok(None)) diff --git a/substrate/candidate-agreement/src/round_robin.rs b/substrate/candidate-agreement/src/round_robin.rs index c0620d1a8e..3f98507cab 100644 --- a/substrate/candidate-agreement/src/round_robin.rs +++ b/substrate/candidate-agreement/src/round_robin.rs @@ -24,19 +24,17 @@ use std::collections::{Bound, BTreeMap, VecDeque}; use futures::prelude::*; use futures::stream::Fuse; -use super::UncheckedMessage; - /// Implementation of the round-robin buffer for incoming messages. #[derive(Debug)] -pub struct RoundRobinBuffer { - buffer: BTreeMap>, +pub struct RoundRobinBuffer { + buffer: BTreeMap>, last_processed_from: Option, stored_messages: usize, max_messages: usize, inner: Fuse, } -impl RoundRobinBuffer { +impl RoundRobinBuffer { /// Create a new round-robin buffer which holds up to a maximum /// amount of messages. pub fn new(stream: S, buffer_size: usize) -> Self { @@ -50,8 +48,8 @@ impl RoundRobinBuffer { } } -impl RoundRobinBuffer { - fn next_message(&mut self) -> Option<(V, UncheckedMessage)> { +impl RoundRobinBuffer { + fn next_message(&mut self) -> Option<(V, M)> { if self.stored_messages == 0 { return None } @@ -84,7 +82,7 @@ impl RoundRobinBuffer { } // import messages, discarding when the buffer is full. - fn import_messages(&mut self, sender: V, messages: Vec) { + fn import_messages(&mut self, sender: V, messages: Vec) { let space_remaining = self.max_messages - self.stored_messages; self.stored_messages += ::std::cmp::min(space_remaining, messages.len()); @@ -93,16 +91,16 @@ impl RoundRobinBuffer { } } -impl Stream for RoundRobinBuffer - where S: Stream)> +impl Stream for RoundRobinBuffer + where S: Stream)> { - type Item = (V, UncheckedMessage); + type Item = (V, M); type Error = S::Error; fn poll(&mut self) -> Poll, S::Error> { loop { match self.inner.poll()? { - Async::NotReady | Async::Ready(None)=> break, + Async::NotReady | Async::Ready(None) => break, Async::Ready(Some((sender, msgs))) => self.import_messages(sender, msgs), } } @@ -120,6 +118,9 @@ mod tests { use super::*; use futures::stream::{self, Stream}; + #[derive(Debug, PartialEq, Eq)] + struct UncheckedMessage { data: Vec } + #[test] fn is_fair_and_wraps_around() { let stream = stream::iter_ok(vec![ diff --git a/substrate/candidate-agreement/src/tests/mod.rs b/substrate/candidate-agreement/src/tests/mod.rs index 8e69e49824..cd7f59c4e5 100644 --- a/substrate/candidate-agreement/src/tests/mod.rs +++ b/substrate/candidate-agreement/src/tests/mod.rs @@ -16,18 +16,25 @@ //! Tests and test helpers for the candidate agreement. -const VALIDITY_CHECK_DELAY_MS: u64 = 400; -const AVAILABILITY_CHECK_DELAY_MS: u64 = 200; +const VALIDITY_CHECK_DELAY_MS: u64 = 100; +const AVAILABILITY_CHECK_DELAY_MS: u64 = 100; +const PROPOSAL_FORMATION_TICK_MS: u64 = 25; +const PROPAGATE_STATEMENTS_TICK_MS: u64 = 25; +const TIMER_TICK_DURATION_MS: u64 = 5; +use std::collections::HashMap; + +use futures::prelude::*; +use futures::sync::mpsc; use tokio_timer::Timer; use super::*; -#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)] struct ValidatorId(usize); #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)] -struct Digest(usize); +struct Digest(Vec); #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)] struct GroupId(usize); @@ -49,22 +56,20 @@ enum Signature { Bft(ValidatorId, bft::Message), } -struct TestAuthority { - id: ValidatorId, -} - enum Error { Timer(tokio_timer::TimerError), + NetOut, + NetIn, } -#[derive(Clone)] +#[derive(Debug, Clone)] struct SharedTestContext { n_authorities: usize, n_groups: usize, timer: Timer, } -#[derive(Clone)] +#[derive(Debug, Clone)] struct TestContext { shared: Arc, local_id: ValidatorId, @@ -87,14 +92,13 @@ impl Context for TestContext { >; fn candidate_digest(candidate: &ParachainCandidate) -> Digest { - Digest(!candidate.data & candidate.group.0) + Digest(vec![candidate.group.0, candidate.data]) } fn proposal_digest(candidate: &Proposal) -> Digest { - Digest(candidate.candidates.iter().fold(0, |mut acc, c| { - acc = acc.wrapping_shl(2); - acc ^= Self::candidate_digest(c).0; - acc + Digest(candidate.candidates.iter().fold(Vec::new(), |mut a, c| { + a.extend(Self::candidate_digest(c).0); + a })) } @@ -106,7 +110,8 @@ impl Context for TestContext { ValidatorId(round % self.shared.n_authorities) } - fn check_validity(&self, _candidate: &ParachainCandidate) -> Self::CheckCandidate { + fn check_validity(&self, candidate: &ParachainCandidate) -> Self::CheckCandidate { + println!("{:?} checking validity of {:?}", self.local_id, Self::candidate_digest(candidate)); let future = self.shared.timer .sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS)) .map_err(Error::Timer) @@ -115,7 +120,8 @@ impl Context for TestContext { Box::new(future) } - fn check_availability(&self, _candidate: &ParachainCandidate) -> Self::CheckAvailability { + fn check_availability(&self, candidate: &ParachainCandidate) -> Self::CheckAvailability { + println!("{:?} checking availability of {:?}", self.local_id, Self::candidate_digest(candidate)); let future = self.shared.timer .sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS)) .map_err(Error::Timer) @@ -128,11 +134,14 @@ impl Context for TestContext { -> Option { // only if it has at least than 2/3 of all groups. - if candidates.len() >= self.shared.n_groups * 2 / 3 { + let t = self.shared.n_groups * 2 / 3; + if candidates.len() >= t { Some(Proposal { candidates: candidates.iter().map(|x| (&**x).clone()).collect() }) } else { + println!("cannot make proposal: only has {} of {}", + candidates.len(), t); None } } @@ -164,6 +173,80 @@ impl Context for TestContext { } } +struct TestRecovery; + +impl MessageRecovery for TestRecovery { + type UncheckedMessage = OutgoingMessage; + + fn check_message(&self, msg: Self::UncheckedMessage) -> Option> { + Some(match msg { + OutgoingMessage::Bft(c) => CheckedMessage::Bft(c), + OutgoingMessage::Table(batch) => CheckedMessage::Table(batch.items), + }) + } +} + +pub struct Network { + endpoints: Vec>, + input: mpsc::UnboundedReceiver<(usize, T)>, +} + +impl Network { + pub fn new(nodes: usize) + -> (Self, Vec>, Vec>) + { + let mut inputs = Vec::with_capacity(nodes); + let mut outputs = Vec::with_capacity(nodes); + let mut endpoints = Vec::with_capacity(nodes); + + let (in_tx, in_rx) = mpsc::unbounded(); + for _ in 0..nodes { + let (out_tx, out_rx) = mpsc::unbounded(); + inputs.push(in_tx.clone()); + outputs.push(out_rx); + endpoints.push(out_tx); + } + + let network = Network { + endpoints, + input: in_rx, + }; + + (network, inputs, outputs) + } + + pub fn route_on_thread(self) { + ::std::thread::spawn(move || { let _ = self.wait(); }); + } +} + +impl Future for Network { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), Self::Error> { + match try_ready!(self.input.poll()) { + None => Ok(Async::Ready(())), + Some((sender, item)) => { + { + let receiving_endpoints = self.endpoints + .iter() + .enumerate() + .filter(|&(i, _)| i != sender) + .map(|(_, x)| x); + + for endpoint in receiving_endpoints { + let _ = endpoint.unbounded_send(item.clone()); + } + } + + self.poll() + } + } + } +} + +#[derive(Debug, Clone)] pub struct VecBatch { pub max_len: usize, pub targets: Vec, @@ -172,6 +255,7 @@ pub struct VecBatch { impl ::StatementBatch for VecBatch { fn targets(&self) -> &[V] { &self.targets } + fn is_empty(&self) -> bool { self.items.is_empty() } fn push(&mut self, item: T) -> bool { if self.items.len() == self.max_len { false @@ -181,3 +265,122 @@ impl ::StatementBatch for VecBatch { } } } + +fn make_group_assignments(n_authorities: usize, n_groups: usize) + -> HashMap> +{ + let mut map = HashMap::new(); + let threshold = (n_authorities / n_groups) / 2; + let make_blank_group = || { + GroupInfo { + validity_guarantors: HashSet::new(), + availability_guarantors: HashSet::new(), + needed_validity: threshold, + needed_availability: threshold, + } + }; + + // every authority checks validity of his ID modulo n_groups and + // guarantees availability for the group above that. + for a_id in 0..n_authorities { + let primary_group = a_id % n_groups; + let availability_group = a_id + 1 % n_groups; + + map.entry(GroupId(primary_group)) + .or_insert_with(&make_blank_group) + .validity_guarantors + .insert(ValidatorId(a_id)); + + map.entry(GroupId(availability_group)) + .or_insert_with(&make_blank_group) + .availability_guarantors + .insert(ValidatorId(a_id)); + } + + map +} + +fn make_blank_batch(n_authorities: usize) -> VecBatch { + VecBatch { + max_len: 20, + targets: (0..n_authorities).map(ValidatorId).collect(), + items: Vec::new(), + } +} + +#[test] +fn consensus_completes_with_minimum_good() { + let n = 100; + let f = 33; + let n_groups = 10; + + let timer = ::tokio_timer::wheel() + .tick_duration(Duration::from_millis(TIMER_TICK_DURATION_MS)) + .num_slots(1 << 16) + .build(); + + let (network, inputs, outputs) = Network::<(ValidatorId, OutgoingMessage)>::new(n - f); + network.route_on_thread(); + + let shared_test_context = Arc::new(SharedTestContext { + n_authorities: n, + n_groups: n_groups, + timer: timer.clone(), + }); + + let groups = make_group_assignments(n, n_groups); + + let authorities = inputs.into_iter().zip(outputs).enumerate().map(|(raw_id, (input, output))| { + let id = ValidatorId(raw_id); + let context = TestContext { + shared: shared_test_context.clone(), + local_id: id, + }; + + let shared_table = SharedTable::new(context.clone(), groups.clone()); + let params = AgreementParams { + context, + timer: timer.clone(), + table: shared_table, + nodes: n, + max_faulty: f, + round_timeout_multiplier: 4, + message_buffer_size: 100, + form_proposal_interval: Duration::from_millis(PROPOSAL_FORMATION_TICK_MS), + }; + + let net_out = input + .sink_map_err(|_| Error::NetOut) + .with(move |x| { Ok::<_, Error>((id.0, (id, x))) }); + + let net_in = output + .map_err(|_| Error::NetIn) + .map(move |(v, msg)| { (v, vec![msg]) }); + + let propagate_statements = timer + .interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS)) + .map(move |()| make_blank_batch(n)) + .map_err(Error::Timer); + + let local_candidate = if raw_id < n_groups { + let candidate = ParachainCandidate { + group: GroupId(raw_id), + data: raw_id, + }; + ::futures::future::Either::A(Ok::<_, Error>(candidate).into_future()) + } else { + ::futures::future::Either::B(::futures::future::empty()) + }; + + agree::<_, _, _, _, _, _, Error>( + params, + net_in, + net_out, + TestRecovery, + propagate_statements, + local_candidate + ) + }).collect::>(); + + futures::future::join_all(authorities).wait().unwrap(); +}