initial test harness

This commit is contained in:
Robert Habermeier
2018-01-16 17:47:09 +01:00
parent f87893cd87
commit 76fafcb39f
4 changed files with 251 additions and 106 deletions
+3 -65
View File
@@ -18,11 +18,13 @@
use super::*;
use tests::Network;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::prelude::*;
use futures::sync::{oneshot, mpsc};
use futures::sync::oneshot;
use futures::future::FutureResult;
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
@@ -149,70 +151,6 @@ impl Context for TestContext {
}
}
type Comm = ContextCommunication<TestContext>;
struct Network {
endpoints: Vec<mpsc::UnboundedSender<Comm>>,
input: mpsc::UnboundedReceiver<(usize, Comm)>,
}
impl Network {
fn new(nodes: usize)
-> (Network, Vec<mpsc::UnboundedSender<(usize, Comm)>>, Vec<mpsc::UnboundedReceiver<Comm>>)
{
let mut inputs = Vec::with_capacity(nodes);
let mut outputs = Vec::with_capacity(nodes);
let mut endpoints = Vec::with_capacity(nodes);
let (in_tx, in_rx) = mpsc::unbounded();
for _ in 0..nodes {
let (out_tx, out_rx) = mpsc::unbounded();
inputs.push(in_tx.clone());
outputs.push(out_rx);
endpoints.push(out_tx);
}
let network = Network {
endpoints,
input: in_rx,
};
(network, inputs, outputs)
}
fn route_on_thread(self) {
::std::thread::spawn(move || { let _ = self.wait(); });
}
}
impl Future for Network {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
match self.input.poll() {
Err(_) => Err(Error),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(Async::Ready(())),
Ok(Async::Ready(Some((sender, item)))) => {
{
let receiving_endpoints = self.endpoints
.iter()
.enumerate()
.filter(|&(i, _)| i != sender)
.map(|(_, x)| x);
for endpoint in receiving_endpoints {
let _ = endpoint.unbounded_send(item.clone());
}
}
self.poll()
}
}
}
}
fn timeout_in(t: Duration) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
::std::thread::spawn(move || {
+14 -11
View File
@@ -419,7 +419,7 @@ impl<C: Context> bft::Context for BftContext<C>
}
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
let round = ::std::cmp::max(63, round) as u32;
let round = ::std::cmp::min(63, round) as u32;
let timeout = 1u64.checked_shl(round)
.unwrap_or_else(u64::max_value)
.saturating_mul(self.round_timeout_multiplier);
@@ -429,13 +429,6 @@ impl<C: Context> bft::Context for BftContext<C>
}
}
/// Unchecked message. These haven't had signature recovery run on them.
#[derive(Debug, PartialEq, Eq)]
pub struct UncheckedMessage {
/// The data of the message.
pub data: Vec<u8>,
}
/// Parameters necessary for agreement.
pub struct AgreementParams<C: Context> {
@@ -459,8 +452,12 @@ pub struct AgreementParams<C: Context> {
/// Recovery for messages
pub trait MessageRecovery<C: Context> {
/// The unchecked message type. This implies that work hasn't been done
/// to decode the payload and check and authenticate a signature.
type UncheckedMessage;
/// Attempt to transform a checked message into an unchecked.
fn check_message(&self, UncheckedMessage) -> Option<CheckedMessage<C>>;
fn check_message(&self, Self::UncheckedMessage) -> Option<CheckedMessage<C>>;
}
/// A batch of statements to send out.
@@ -468,6 +465,9 @@ pub trait StatementBatch<V, T> {
/// Get the target authorities of these statements.
fn targets(&self) -> &[V];
/// If the batch is empty.
fn is_empty(&self) -> bool;
/// Push a statement onto the batch. Returns false when the batch is full.
///
/// This is meant to do work like incrementally serializing the statements
@@ -485,6 +485,7 @@ pub enum CheckedMessage<C: Context> {
}
/// Outgoing messages to the network.
#[derive(Debug, Clone)]
pub enum OutgoingMessage<C: Context> {
/// Messages meant for BFT agreement peers.
Bft(<C as TypeResolve>::BftCommunication),
@@ -515,11 +516,11 @@ pub fn agree<
Context: ::Context + 'static,
Context::CheckCandidate: IntoFuture<Error=Err>,
Context::CheckAvailability: IntoFuture<Error=Err>,
NetIn: Stream<Item=(Context::ValidatorId, Vec<UncheckedMessage>),Error=Err> + 'static,
NetIn: Stream<Item=(Context::ValidatorId, Vec<Recovery::UncheckedMessage>),Error=Err> + 'static,
NetOut: Sink<SinkItem=OutgoingMessage<Context>> + 'static,
Recovery: MessageRecovery<Context> + 'static,
PropagateStatements: Stream<Item=Context::StatementBatch,Error=Err> + 'static,
LocalCandidate: Future<Item=Context::ParachainCandidate> + 'static
LocalCandidate: IntoFuture<Item=Context::ParachainCandidate> + 'static
{
let (bft_in_in, bft_in_out) = mpsc::unbounded();
let (bft_out_in, bft_out_out) = mpsc::unbounded();
@@ -559,6 +560,7 @@ pub fn agree<
let periodic_table_statements = propagate_statements
.or_else(|_| ::futures::future::empty()) // halt the stream instead of error.
.map(move |mut batch| { table.fill_batch(&mut batch); batch })
.filter(|b| !b.is_empty())
.map(OutgoingMessage::Table);
let complete_out_stream = bft_out_out
@@ -573,6 +575,7 @@ pub fn agree<
let import_local_candidate = {
let table = params.table.clone();
local_candidate
.into_future()
.map(table::Statement::Candidate)
.map(Some)
.or_else(|_| Ok(None))
@@ -24,19 +24,17 @@ use std::collections::{Bound, BTreeMap, VecDeque};
use futures::prelude::*;
use futures::stream::Fuse;
use super::UncheckedMessage;
/// Implementation of the round-robin buffer for incoming messages.
#[derive(Debug)]
pub struct RoundRobinBuffer<V: Ord + Eq, S> {
buffer: BTreeMap<V, VecDeque<UncheckedMessage>>,
pub struct RoundRobinBuffer<V: Ord + Eq, S, M> {
buffer: BTreeMap<V, VecDeque<M>>,
last_processed_from: Option<V>,
stored_messages: usize,
max_messages: usize,
inner: Fuse<S>,
}
impl<V: Ord + Eq + Clone, S: Stream> RoundRobinBuffer<V, S> {
impl<V: Ord + Eq + Clone, S: Stream, M> RoundRobinBuffer<V, S, M> {
/// Create a new round-robin buffer which holds up to a maximum
/// amount of messages.
pub fn new(stream: S, buffer_size: usize) -> Self {
@@ -50,8 +48,8 @@ impl<V: Ord + Eq + Clone, S: Stream> RoundRobinBuffer<V, S> {
}
}
impl<V: Ord + Eq + Clone, S> RoundRobinBuffer<V, S> {
fn next_message(&mut self) -> Option<(V, UncheckedMessage)> {
impl<V: Ord + Eq + Clone, S, M> RoundRobinBuffer<V, S, M> {
fn next_message(&mut self) -> Option<(V, M)> {
if self.stored_messages == 0 {
return None
}
@@ -84,7 +82,7 @@ impl<V: Ord + Eq + Clone, S> RoundRobinBuffer<V, S> {
}
// import messages, discarding when the buffer is full.
fn import_messages(&mut self, sender: V, messages: Vec<UncheckedMessage>) {
fn import_messages(&mut self, sender: V, messages: Vec<M>) {
let space_remaining = self.max_messages - self.stored_messages;
self.stored_messages += ::std::cmp::min(space_remaining, messages.len());
@@ -93,16 +91,16 @@ impl<V: Ord + Eq + Clone, S> RoundRobinBuffer<V, S> {
}
}
impl<V: Ord + Eq + Clone, S> Stream for RoundRobinBuffer<V, S>
where S: Stream<Item=(V, Vec<UncheckedMessage>)>
impl<V: Ord + Eq + Clone, S, M> Stream for RoundRobinBuffer<V, S, M>
where S: Stream<Item=(V, Vec<M>)>
{
type Item = (V, UncheckedMessage);
type Item = (V, M);
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, S::Error> {
loop {
match self.inner.poll()? {
Async::NotReady | Async::Ready(None)=> break,
Async::NotReady | Async::Ready(None) => break,
Async::Ready(Some((sender, msgs))) => self.import_messages(sender, msgs),
}
}
@@ -120,6 +118,9 @@ mod tests {
use super::*;
use futures::stream::{self, Stream};
#[derive(Debug, PartialEq, Eq)]
struct UncheckedMessage { data: Vec<u8> }
#[test]
fn is_fair_and_wraps_around() {
let stream = stream::iter_ok(vec![
+221 -18
View File
@@ -16,18 +16,25 @@
//! Tests and test helpers for the candidate agreement.
const VALIDITY_CHECK_DELAY_MS: u64 = 400;
const AVAILABILITY_CHECK_DELAY_MS: u64 = 200;
const VALIDITY_CHECK_DELAY_MS: u64 = 100;
const AVAILABILITY_CHECK_DELAY_MS: u64 = 100;
const PROPOSAL_FORMATION_TICK_MS: u64 = 25;
const PROPAGATE_STATEMENTS_TICK_MS: u64 = 25;
const TIMER_TICK_DURATION_MS: u64 = 5;
use std::collections::HashMap;
use futures::prelude::*;
use futures::sync::mpsc;
use tokio_timer::Timer;
use super::*;
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)]
struct ValidatorId(usize);
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
struct Digest(usize);
struct Digest(Vec<usize>);
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
struct GroupId(usize);
@@ -49,22 +56,20 @@ enum Signature {
Bft(ValidatorId, bft::Message<Proposal, Digest>),
}
struct TestAuthority {
id: ValidatorId,
}
enum Error {
Timer(tokio_timer::TimerError),
NetOut,
NetIn,
}
#[derive(Clone)]
#[derive(Debug, Clone)]
struct SharedTestContext {
n_authorities: usize,
n_groups: usize,
timer: Timer,
}
#[derive(Clone)]
#[derive(Debug, Clone)]
struct TestContext {
shared: Arc<SharedTestContext>,
local_id: ValidatorId,
@@ -87,14 +92,13 @@ impl Context for TestContext {
>;
fn candidate_digest(candidate: &ParachainCandidate) -> Digest {
Digest(!candidate.data & candidate.group.0)
Digest(vec![candidate.group.0, candidate.data])
}
fn proposal_digest(candidate: &Proposal) -> Digest {
Digest(candidate.candidates.iter().fold(0, |mut acc, c| {
acc = acc.wrapping_shl(2);
acc ^= Self::candidate_digest(c).0;
acc
Digest(candidate.candidates.iter().fold(Vec::new(), |mut a, c| {
a.extend(Self::candidate_digest(c).0);
a
}))
}
@@ -106,7 +110,8 @@ impl Context for TestContext {
ValidatorId(round % self.shared.n_authorities)
}
fn check_validity(&self, _candidate: &ParachainCandidate) -> Self::CheckCandidate {
fn check_validity(&self, candidate: &ParachainCandidate) -> Self::CheckCandidate {
println!("{:?} checking validity of {:?}", self.local_id, Self::candidate_digest(candidate));
let future = self.shared.timer
.sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS))
.map_err(Error::Timer)
@@ -115,7 +120,8 @@ impl Context for TestContext {
Box::new(future)
}
fn check_availability(&self, _candidate: &ParachainCandidate) -> Self::CheckAvailability {
fn check_availability(&self, candidate: &ParachainCandidate) -> Self::CheckAvailability {
println!("{:?} checking availability of {:?}", self.local_id, Self::candidate_digest(candidate));
let future = self.shared.timer
.sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS))
.map_err(Error::Timer)
@@ -128,11 +134,14 @@ impl Context for TestContext {
-> Option<Proposal>
{
// only if it has at least than 2/3 of all groups.
if candidates.len() >= self.shared.n_groups * 2 / 3 {
let t = self.shared.n_groups * 2 / 3;
if candidates.len() >= t {
Some(Proposal {
candidates: candidates.iter().map(|x| (&**x).clone()).collect()
})
} else {
println!("cannot make proposal: only has {} of {}",
candidates.len(), t);
None
}
}
@@ -164,6 +173,80 @@ impl Context for TestContext {
}
}
struct TestRecovery;
impl MessageRecovery<TestContext> for TestRecovery {
type UncheckedMessage = OutgoingMessage<TestContext>;
fn check_message(&self, msg: Self::UncheckedMessage) -> Option<CheckedMessage<TestContext>> {
Some(match msg {
OutgoingMessage::Bft(c) => CheckedMessage::Bft(c),
OutgoingMessage::Table(batch) => CheckedMessage::Table(batch.items),
})
}
}
pub struct Network<T> {
endpoints: Vec<mpsc::UnboundedSender<T>>,
input: mpsc::UnboundedReceiver<(usize, T)>,
}
impl<T: Clone + Send + 'static> Network<T> {
pub fn new(nodes: usize)
-> (Self, Vec<mpsc::UnboundedSender<(usize, T)>>, Vec<mpsc::UnboundedReceiver<T>>)
{
let mut inputs = Vec::with_capacity(nodes);
let mut outputs = Vec::with_capacity(nodes);
let mut endpoints = Vec::with_capacity(nodes);
let (in_tx, in_rx) = mpsc::unbounded();
for _ in 0..nodes {
let (out_tx, out_rx) = mpsc::unbounded();
inputs.push(in_tx.clone());
outputs.push(out_rx);
endpoints.push(out_tx);
}
let network = Network {
endpoints,
input: in_rx,
};
(network, inputs, outputs)
}
pub fn route_on_thread(self) {
::std::thread::spawn(move || { let _ = self.wait(); });
}
}
impl<T: Clone> Future for Network<T> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), Self::Error> {
match try_ready!(self.input.poll()) {
None => Ok(Async::Ready(())),
Some((sender, item)) => {
{
let receiving_endpoints = self.endpoints
.iter()
.enumerate()
.filter(|&(i, _)| i != sender)
.map(|(_, x)| x);
for endpoint in receiving_endpoints {
let _ = endpoint.unbounded_send(item.clone());
}
}
self.poll()
}
}
}
}
#[derive(Debug, Clone)]
pub struct VecBatch<V, T> {
pub max_len: usize,
pub targets: Vec<V>,
@@ -172,6 +255,7 @@ pub struct VecBatch<V, T> {
impl<V, T> ::StatementBatch<V, T> for VecBatch<V, T> {
fn targets(&self) -> &[V] { &self.targets }
fn is_empty(&self) -> bool { self.items.is_empty() }
fn push(&mut self, item: T) -> bool {
if self.items.len() == self.max_len {
false
@@ -181,3 +265,122 @@ impl<V, T> ::StatementBatch<V, T> for VecBatch<V, T> {
}
}
}
fn make_group_assignments(n_authorities: usize, n_groups: usize)
-> HashMap<GroupId, GroupInfo<ValidatorId>>
{
let mut map = HashMap::new();
let threshold = (n_authorities / n_groups) / 2;
let make_blank_group = || {
GroupInfo {
validity_guarantors: HashSet::new(),
availability_guarantors: HashSet::new(),
needed_validity: threshold,
needed_availability: threshold,
}
};
// every authority checks validity of his ID modulo n_groups and
// guarantees availability for the group above that.
for a_id in 0..n_authorities {
let primary_group = a_id % n_groups;
let availability_group = a_id + 1 % n_groups;
map.entry(GroupId(primary_group))
.or_insert_with(&make_blank_group)
.validity_guarantors
.insert(ValidatorId(a_id));
map.entry(GroupId(availability_group))
.or_insert_with(&make_blank_group)
.availability_guarantors
.insert(ValidatorId(a_id));
}
map
}
fn make_blank_batch<T>(n_authorities: usize) -> VecBatch<ValidatorId, T> {
VecBatch {
max_len: 20,
targets: (0..n_authorities).map(ValidatorId).collect(),
items: Vec::new(),
}
}
#[test]
fn consensus_completes_with_minimum_good() {
let n = 100;
let f = 33;
let n_groups = 10;
let timer = ::tokio_timer::wheel()
.tick_duration(Duration::from_millis(TIMER_TICK_DURATION_MS))
.num_slots(1 << 16)
.build();
let (network, inputs, outputs) = Network::<(ValidatorId, OutgoingMessage<TestContext>)>::new(n - f);
network.route_on_thread();
let shared_test_context = Arc::new(SharedTestContext {
n_authorities: n,
n_groups: n_groups,
timer: timer.clone(),
});
let groups = make_group_assignments(n, n_groups);
let authorities = inputs.into_iter().zip(outputs).enumerate().map(|(raw_id, (input, output))| {
let id = ValidatorId(raw_id);
let context = TestContext {
shared: shared_test_context.clone(),
local_id: id,
};
let shared_table = SharedTable::new(context.clone(), groups.clone());
let params = AgreementParams {
context,
timer: timer.clone(),
table: shared_table,
nodes: n,
max_faulty: f,
round_timeout_multiplier: 4,
message_buffer_size: 100,
form_proposal_interval: Duration::from_millis(PROPOSAL_FORMATION_TICK_MS),
};
let net_out = input
.sink_map_err(|_| Error::NetOut)
.with(move |x| { Ok::<_, Error>((id.0, (id, x))) });
let net_in = output
.map_err(|_| Error::NetIn)
.map(move |(v, msg)| { (v, vec![msg]) });
let propagate_statements = timer
.interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS))
.map(move |()| make_blank_batch(n))
.map_err(Error::Timer);
let local_candidate = if raw_id < n_groups {
let candidate = ParachainCandidate {
group: GroupId(raw_id),
data: raw_id,
};
::futures::future::Either::A(Ok::<_, Error>(candidate).into_future())
} else {
::futures::future::Either::B(::futures::future::empty())
};
agree::<_, _, _, _, _, _, Error>(
params,
net_in,
net_out,
TestRecovery,
propagate_statements,
local_candidate
)
}).collect::<Vec<_>>();
futures::future::join_all(authorities).wait().unwrap();
}