From 02bdf0ee29660f4255094d28d8a80e75f38609d2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 16 Jan 2018 20:00:40 +0100 Subject: [PATCH] test consensus completion --- substrate/candidate-agreement/Cargo.toml | 2 +- .../src/handle_incoming.rs | 6 ++- substrate/candidate-agreement/src/lib.rs | 38 ++++++++++++++---- substrate/candidate-agreement/src/table.rs | 10 ++++- .../candidate-agreement/src/tests/mod.rs | 39 +++++++++---------- 5 files changed, 65 insertions(+), 30 deletions(-) diff --git a/substrate/candidate-agreement/Cargo.toml b/substrate/candidate-agreement/Cargo.toml index 83a8556ad3..8aa2d0001b 100644 --- a/substrate/candidate-agreement/Cargo.toml +++ b/substrate/candidate-agreement/Cargo.toml @@ -4,6 +4,6 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -futures = "0.1" +futures = "0.1.17" parking_lot = "0.4" tokio-timer = "0.1.2" diff --git a/substrate/candidate-agreement/src/handle_incoming.rs b/substrate/candidate-agreement/src/handle_incoming.rs index 9071ba4b58..150f0faf68 100644 --- a/substrate/candidate-agreement/src/handle_incoming.rs +++ b/substrate/candidate-agreement/src/handle_incoming.rs @@ -128,7 +128,11 @@ impl HandleIncoming { let digest = &summary.candidate; // TODO: consider a strategy based on the number of candidate votes as well. - let checking_validity = is_validity_member && self.checked_validity.insert(digest.clone()); + let checking_validity = + is_validity_member && + self.checked_validity.insert(digest.clone()) && + self.table.proposed_digest() != Some(digest.clone()); + let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone()); if checking_validity || checking_availability { diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index a589f26195..20da23c59b 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -208,6 +208,7 @@ impl table::Context for TableContext { // A shared table object. struct SharedTableInner { table: Table>, + proposed_digest: Option, awaiting_proposal: Vec>, } @@ -270,6 +271,7 @@ impl SharedTable { inner: Arc::new(Mutex::new(SharedTableInner { table: Table::default(), awaiting_proposal: Vec::new(), + proposed_digest: None, })) } } @@ -288,13 +290,23 @@ impl SharedTable { &self, statement: table::Statement, ) -> Option> { + let proposed_digest = match statement { + table::Statement::Candidate(ref c) => Some(C::candidate_digest(c)), + _ => None, + }; + let signed_statement = table::SignedStatement { signature: self.context.sign_table_statement(&statement), sender: self.context.local_id(), statement, }; - self.import_statement(signed_statement, None) + let mut inner = self.inner.lock(); + if proposed_digest.is_some() { + inner.proposed_digest = proposed_digest; + } + + inner.import_statement(&*self.context, signed_statement, None) } /// Import many statements at once. @@ -348,6 +360,11 @@ impl SharedTable { self.inner.lock().table.fill_batch(batch); } + /// Get the local proposed candidate digest. + pub fn proposed_digest(&self) -> Option { + self.inner.lock().proposed_digest.clone() + } + // Get a handle to the table context. fn context(&self) -> &TableContext { &*self.context @@ -579,7 +596,9 @@ pub fn agree< .map(table::Statement::Candidate) .map(Some) .or_else(|_| Ok(None)) - .map(move |s| if let Some(s) = s { table.sign_and_import(s); }) + .map(move |s| if let Some(s) = s { + table.sign_and_import(s); + }) }; let create_proposal_on_interval = { @@ -589,13 +608,18 @@ pub fn agree< .for_each(move |_| { table.update_proposal(); Ok(()) }) }; - // TODO: avoid having errors take down everything. - let future = agreement.join5( - route_messages_in, - route_messages_out, + // if these auxiliary futures terminate before the agreement, then + // that is an error. + let auxiliary_futures = route_messages_in.join4( create_proposal_on_interval, + route_messages_out, import_local_candidate, - ).map(|(agreed, _, _, _, _)| agreed); + ).and_then(|_| Err(Error::IoTerminated)); + + let future = agreement + .select(auxiliary_futures) + .map(|(committed, _)| committed) + .map_err(|(e, _)| e); Box::new(future) } diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs index b0f0f586cf..a54cd908b7 100644 --- a/substrate/candidate-agreement/src/table.rs +++ b/substrate/candidate-agreement/src/table.rs @@ -453,7 +453,14 @@ impl Table { // reconstruct statements for anything whose trace passes the filter. for (digest, candidate) in self.candidate_votes.iter() { - for (sender, vote) in candidate.validity_votes.iter() { + let issuance_iter = candidate.validity_votes.iter() + .filter(|&(_, x)| if let ValidityVote::Issued(_) = *x { true } else { false }); + + let validity_iter = candidate.validity_votes.iter() + .filter(|&(_, x)| if let ValidityVote::Issued(_) = *x { false } else { true }); + + // send issuance statements before votes. + for (sender, vote) in issuance_iter.chain(validity_iter) { match *vote { ValidityVote::Issued(ref sig) => { attempt_send!( @@ -483,6 +490,7 @@ impl Table { }; + // and lastly send availability. for (sender, sig) in candidate.availability_votes.iter() { attempt_send!( StatementTrace::Available(sender.clone(), digest.clone()), diff --git a/substrate/candidate-agreement/src/tests/mod.rs b/substrate/candidate-agreement/src/tests/mod.rs index cd7f59c4e5..ec67964d28 100644 --- a/substrate/candidate-agreement/src/tests/mod.rs +++ b/substrate/candidate-agreement/src/tests/mod.rs @@ -18,9 +18,9 @@ const VALIDITY_CHECK_DELAY_MS: u64 = 100; const AVAILABILITY_CHECK_DELAY_MS: u64 = 100; -const PROPOSAL_FORMATION_TICK_MS: u64 = 25; -const PROPAGATE_STATEMENTS_TICK_MS: u64 = 25; -const TIMER_TICK_DURATION_MS: u64 = 5; +const PROPOSAL_FORMATION_TICK_MS: u64 = 50; +const PROPAGATE_STATEMENTS_TICK_MS: u64 = 200; +const TIMER_TICK_DURATION_MS: u64 = 10; use std::collections::HashMap; @@ -110,8 +110,7 @@ impl Context for TestContext { ValidatorId(round % self.shared.n_authorities) } - fn check_validity(&self, candidate: &ParachainCandidate) -> Self::CheckCandidate { - println!("{:?} checking validity of {:?}", self.local_id, Self::candidate_digest(candidate)); + fn check_validity(&self, _candidate: &ParachainCandidate) -> Self::CheckCandidate { let future = self.shared.timer .sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS)) .map_err(Error::Timer) @@ -120,8 +119,7 @@ impl Context for TestContext { Box::new(future) } - fn check_availability(&self, candidate: &ParachainCandidate) -> Self::CheckAvailability { - println!("{:?} checking availability of {:?}", self.local_id, Self::candidate_digest(candidate)); + fn check_availability(&self, _candidate: &ParachainCandidate) -> Self::CheckAvailability { let future = self.shared.timer .sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS)) .map_err(Error::Timer) @@ -133,15 +131,12 @@ impl Context for TestContext { fn create_proposal(&self, candidates: Vec<&ParachainCandidate>) -> Option { - // only if it has at least than 2/3 of all groups. let t = self.shared.n_groups * 2 / 3; if candidates.len() >= t { Some(Proposal { candidates: candidates.iter().map(|x| (&**x).clone()).collect() }) } else { - println!("cannot make proposal: only has {} of {}", - candidates.len(), t); None } } @@ -149,7 +144,6 @@ impl Context for TestContext { fn proposal_valid(&self, proposal: &Proposal, check_candidate: F) -> bool where F: FnMut(&ParachainCandidate) -> bool { - // only if it has more than 2/3 of groups. if proposal.candidates.len() >= self.shared.n_groups * 2 / 3 { proposal.candidates.iter().all(check_candidate) } else { @@ -284,17 +278,22 @@ fn make_group_assignments(n_authorities: usize, n_groups: usize) // guarantees availability for the group above that. for a_id in 0..n_authorities { let primary_group = a_id % n_groups; - let availability_group = a_id + 1 % n_groups; + let availability_groups = [ + (a_id + 1) % n_groups, + a_id.wrapping_sub(1) % n_groups, + ]; map.entry(GroupId(primary_group)) .or_insert_with(&make_blank_group) .validity_guarantors .insert(ValidatorId(a_id)); - map.entry(GroupId(availability_group)) - .or_insert_with(&make_blank_group) - .availability_guarantors - .insert(ValidatorId(a_id)); + for &availability_group in &availability_groups { + map.entry(GroupId(availability_group)) + .or_insert_with(&make_blank_group) + .availability_guarantors + .insert(ValidatorId(a_id)); + } } map @@ -310,8 +309,8 @@ fn make_blank_batch(n_authorities: usize) -> VecBatch { #[test] fn consensus_completes_with_minimum_good() { - let n = 100; - let f = 33; + let n = 50; + let f = 16; let n_groups = 10; let timer = ::tokio_timer::wheel() @@ -351,11 +350,11 @@ fn consensus_completes_with_minimum_good() { let net_out = input .sink_map_err(|_| Error::NetOut) - .with(move |x| { Ok::<_, Error>((id.0, (id, x))) }); + .with(move |x| Ok::<_, Error>((id.0, (id, x))) ); let net_in = output .map_err(|_| Error::NetIn) - .map(move |(v, msg)| { (v, vec![msg]) }); + .map(move |(v, msg)| (v, vec![msg])); let propagate_statements = timer .interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS))