test consensus completion

This commit is contained in:
Robert Habermeier
2018-01-16 20:00:40 +01:00
parent 76fafcb39f
commit 02bdf0ee29
5 changed files with 65 additions and 30 deletions
+1 -1
View File
@@ -4,6 +4,6 @@ version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
futures = "0.1" futures = "0.1.17"
parking_lot = "0.4" parking_lot = "0.4"
tokio-timer = "0.1.2" tokio-timer = "0.1.2"
@@ -128,7 +128,11 @@ impl<C: Context, I> HandleIncoming<C, I> {
let digest = &summary.candidate; let digest = &summary.candidate;
// TODO: consider a strategy based on the number of candidate votes as well. // TODO: consider a strategy based on the number of candidate votes as well.
let checking_validity = is_validity_member && self.checked_validity.insert(digest.clone()); let checking_validity =
is_validity_member &&
self.checked_validity.insert(digest.clone()) &&
self.table.proposed_digest() != Some(digest.clone());
let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone()); let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone());
if checking_validity || checking_availability { if checking_validity || checking_availability {
+31 -7
View File
@@ -208,6 +208,7 @@ impl<C: Context> table::Context for TableContext<C> {
// A shared table object. // A shared table object.
struct SharedTableInner<C: Context> { struct SharedTableInner<C: Context> {
table: Table<TableContext<C>>, table: Table<TableContext<C>>,
proposed_digest: Option<C::Digest>,
awaiting_proposal: Vec<oneshot::Sender<C::Proposal>>, awaiting_proposal: Vec<oneshot::Sender<C::Proposal>>,
} }
@@ -270,6 +271,7 @@ impl<C: Context> SharedTable<C> {
inner: Arc::new(Mutex::new(SharedTableInner { inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(), table: Table::default(),
awaiting_proposal: Vec::new(), awaiting_proposal: Vec::new(),
proposed_digest: None,
})) }))
} }
} }
@@ -288,13 +290,23 @@ impl<C: Context> SharedTable<C> {
&self, &self,
statement: table::Statement<C::ParachainCandidate, C::Digest>, statement: table::Statement<C::ParachainCandidate, C::Digest>,
) -> Option<table::Summary<C::Digest, C::GroupId>> { ) -> Option<table::Summary<C::Digest, C::GroupId>> {
let proposed_digest = match statement {
table::Statement::Candidate(ref c) => Some(C::candidate_digest(c)),
_ => None,
};
let signed_statement = table::SignedStatement { let signed_statement = table::SignedStatement {
signature: self.context.sign_table_statement(&statement), signature: self.context.sign_table_statement(&statement),
sender: self.context.local_id(), sender: self.context.local_id(),
statement, statement,
}; };
self.import_statement(signed_statement, None) let mut inner = self.inner.lock();
if proposed_digest.is_some() {
inner.proposed_digest = proposed_digest;
}
inner.import_statement(&*self.context, signed_statement, None)
} }
/// Import many statements at once. /// Import many statements at once.
@@ -348,6 +360,11 @@ impl<C: Context> SharedTable<C> {
self.inner.lock().table.fill_batch(batch); self.inner.lock().table.fill_batch(batch);
} }
/// Get the local proposed candidate digest.
pub fn proposed_digest(&self) -> Option<C::Digest> {
self.inner.lock().proposed_digest.clone()
}
// Get a handle to the table context. // Get a handle to the table context.
fn context(&self) -> &TableContext<C> { fn context(&self) -> &TableContext<C> {
&*self.context &*self.context
@@ -579,7 +596,9 @@ pub fn agree<
.map(table::Statement::Candidate) .map(table::Statement::Candidate)
.map(Some) .map(Some)
.or_else(|_| Ok(None)) .or_else(|_| Ok(None))
.map(move |s| if let Some(s) = s { table.sign_and_import(s); }) .map(move |s| if let Some(s) = s {
table.sign_and_import(s);
})
}; };
let create_proposal_on_interval = { let create_proposal_on_interval = {
@@ -589,13 +608,18 @@ pub fn agree<
.for_each(move |_| { table.update_proposal(); Ok(()) }) .for_each(move |_| { table.update_proposal(); Ok(()) })
}; };
// TODO: avoid having errors take down everything. // if these auxiliary futures terminate before the agreement, then
let future = agreement.join5( // that is an error.
route_messages_in, let auxiliary_futures = route_messages_in.join4(
route_messages_out,
create_proposal_on_interval, create_proposal_on_interval,
route_messages_out,
import_local_candidate, import_local_candidate,
).map(|(agreed, _, _, _, _)| agreed); ).and_then(|_| Err(Error::IoTerminated));
let future = agreement
.select(auxiliary_futures)
.map(|(committed, _)| committed)
.map_err(|(e, _)| e);
Box::new(future) Box::new(future)
} }
+9 -1
View File
@@ -453,7 +453,14 @@ impl<C: Context> Table<C> {
// reconstruct statements for anything whose trace passes the filter. // reconstruct statements for anything whose trace passes the filter.
for (digest, candidate) in self.candidate_votes.iter() { for (digest, candidate) in self.candidate_votes.iter() {
for (sender, vote) in candidate.validity_votes.iter() { let issuance_iter = candidate.validity_votes.iter()
.filter(|&(_, x)| if let ValidityVote::Issued(_) = *x { true } else { false });
let validity_iter = candidate.validity_votes.iter()
.filter(|&(_, x)| if let ValidityVote::Issued(_) = *x { false } else { true });
// send issuance statements before votes.
for (sender, vote) in issuance_iter.chain(validity_iter) {
match *vote { match *vote {
ValidityVote::Issued(ref sig) => { ValidityVote::Issued(ref sig) => {
attempt_send!( attempt_send!(
@@ -483,6 +490,7 @@ impl<C: Context> Table<C> {
}; };
// and lastly send availability.
for (sender, sig) in candidate.availability_votes.iter() { for (sender, sig) in candidate.availability_votes.iter() {
attempt_send!( attempt_send!(
StatementTrace::Available(sender.clone(), digest.clone()), StatementTrace::Available(sender.clone(), digest.clone()),
+19 -20
View File
@@ -18,9 +18,9 @@
const VALIDITY_CHECK_DELAY_MS: u64 = 100; const VALIDITY_CHECK_DELAY_MS: u64 = 100;
const AVAILABILITY_CHECK_DELAY_MS: u64 = 100; const AVAILABILITY_CHECK_DELAY_MS: u64 = 100;
const PROPOSAL_FORMATION_TICK_MS: u64 = 25; const PROPOSAL_FORMATION_TICK_MS: u64 = 50;
const PROPAGATE_STATEMENTS_TICK_MS: u64 = 25; const PROPAGATE_STATEMENTS_TICK_MS: u64 = 200;
const TIMER_TICK_DURATION_MS: u64 = 5; const TIMER_TICK_DURATION_MS: u64 = 10;
use std::collections::HashMap; use std::collections::HashMap;
@@ -110,8 +110,7 @@ impl Context for TestContext {
ValidatorId(round % self.shared.n_authorities) ValidatorId(round % self.shared.n_authorities)
} }
fn check_validity(&self, candidate: &ParachainCandidate) -> Self::CheckCandidate { fn check_validity(&self, _candidate: &ParachainCandidate) -> Self::CheckCandidate {
println!("{:?} checking validity of {:?}", self.local_id, Self::candidate_digest(candidate));
let future = self.shared.timer let future = self.shared.timer
.sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS)) .sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS))
.map_err(Error::Timer) .map_err(Error::Timer)
@@ -120,8 +119,7 @@ impl Context for TestContext {
Box::new(future) Box::new(future)
} }
fn check_availability(&self, candidate: &ParachainCandidate) -> Self::CheckAvailability { fn check_availability(&self, _candidate: &ParachainCandidate) -> Self::CheckAvailability {
println!("{:?} checking availability of {:?}", self.local_id, Self::candidate_digest(candidate));
let future = self.shared.timer let future = self.shared.timer
.sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS)) .sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS))
.map_err(Error::Timer) .map_err(Error::Timer)
@@ -133,15 +131,12 @@ impl Context for TestContext {
fn create_proposal(&self, candidates: Vec<&ParachainCandidate>) fn create_proposal(&self, candidates: Vec<&ParachainCandidate>)
-> Option<Proposal> -> Option<Proposal>
{ {
// only if it has at least than 2/3 of all groups.
let t = self.shared.n_groups * 2 / 3; let t = self.shared.n_groups * 2 / 3;
if candidates.len() >= t { if candidates.len() >= t {
Some(Proposal { Some(Proposal {
candidates: candidates.iter().map(|x| (&**x).clone()).collect() candidates: candidates.iter().map(|x| (&**x).clone()).collect()
}) })
} else { } else {
println!("cannot make proposal: only has {} of {}",
candidates.len(), t);
None None
} }
} }
@@ -149,7 +144,6 @@ impl Context for TestContext {
fn proposal_valid<F>(&self, proposal: &Proposal, check_candidate: F) -> bool fn proposal_valid<F>(&self, proposal: &Proposal, check_candidate: F) -> bool
where F: FnMut(&ParachainCandidate) -> bool where F: FnMut(&ParachainCandidate) -> bool
{ {
// only if it has more than 2/3 of groups.
if proposal.candidates.len() >= self.shared.n_groups * 2 / 3 { if proposal.candidates.len() >= self.shared.n_groups * 2 / 3 {
proposal.candidates.iter().all(check_candidate) proposal.candidates.iter().all(check_candidate)
} else { } else {
@@ -284,17 +278,22 @@ fn make_group_assignments(n_authorities: usize, n_groups: usize)
// guarantees availability for the group above that. // guarantees availability for the group above that.
for a_id in 0..n_authorities { for a_id in 0..n_authorities {
let primary_group = a_id % n_groups; let primary_group = a_id % n_groups;
let availability_group = a_id + 1 % n_groups; let availability_groups = [
(a_id + 1) % n_groups,
a_id.wrapping_sub(1) % n_groups,
];
map.entry(GroupId(primary_group)) map.entry(GroupId(primary_group))
.or_insert_with(&make_blank_group) .or_insert_with(&make_blank_group)
.validity_guarantors .validity_guarantors
.insert(ValidatorId(a_id)); .insert(ValidatorId(a_id));
map.entry(GroupId(availability_group)) for &availability_group in &availability_groups {
.or_insert_with(&make_blank_group) map.entry(GroupId(availability_group))
.availability_guarantors .or_insert_with(&make_blank_group)
.insert(ValidatorId(a_id)); .availability_guarantors
.insert(ValidatorId(a_id));
}
} }
map map
@@ -310,8 +309,8 @@ fn make_blank_batch<T>(n_authorities: usize) -> VecBatch<ValidatorId, T> {
#[test] #[test]
fn consensus_completes_with_minimum_good() { fn consensus_completes_with_minimum_good() {
let n = 100; let n = 50;
let f = 33; let f = 16;
let n_groups = 10; let n_groups = 10;
let timer = ::tokio_timer::wheel() let timer = ::tokio_timer::wheel()
@@ -351,11 +350,11 @@ fn consensus_completes_with_minimum_good() {
let net_out = input let net_out = input
.sink_map_err(|_| Error::NetOut) .sink_map_err(|_| Error::NetOut)
.with(move |x| { Ok::<_, Error>((id.0, (id, x))) }); .with(move |x| Ok::<_, Error>((id.0, (id, x))) );
let net_in = output let net_in = output
.map_err(|_| Error::NetIn) .map_err(|_| Error::NetIn)
.map(move |(v, msg)| { (v, vec![msg]) }); .map(move |(v, msg)| (v, vec![msg]));
let propagate_statements = timer let propagate_statements = timer
.interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS)) .interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS))