From 54f5debf5dcade490ca45f36f4380586332cc8c4 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 9 May 2018 12:25:17 +0200 Subject: [PATCH] Ensure VoteAdvance messages propagate (#148) * avoid clobbering accumulator state with late propose * log state when polling * trace on input messages --- polkadot/consensus/src/lib.rs | 5 ++++- polkadot/consensus/src/service.rs | 20 +++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 2630bea720..e004a44df5 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -635,7 +635,10 @@ impl bft::Proposer for Proposer { let offset = U256::from_big_endian(&self.random_seed.0) % len; let offset = offset.low_u64() as usize + round_number; - authorities[offset % authorities.len()].clone() + let proposer = authorities[offset % authorities.len()].clone(); + trace!(target: "bft", "proposer for round {} is {}", round_number, Hash::from(proposer)); + + proposer } fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) { diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index c2280f2eaa..ce2b562a47 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -49,6 +49,7 @@ struct BftSink { struct Messages { network_stream: net::BftMessageStream, + local_id: AuthorityId, authorities: Vec, } @@ -64,8 +65,9 @@ impl Stream for Messages { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end. Ok(Async::Ready(Some(message))) => { - match process_message(message, &self.authorities) { - Ok(message) => return Ok(Async::Ready(Some(message))), + match process_message(message, &self.local_id, &self.authorities) { + Ok(Some(message)) => return Ok(Async::Ready(Some(message))), + Ok(None) => {} // ignored local message. Err(e) => { debug!("Message validation failed: {:?}", e); } @@ -76,10 +78,11 @@ impl Stream for Messages { } } -fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -> Result { - Ok(match msg.message { +fn process_message(msg: net::LocalizedBftMessage, local_id: &AuthorityId, authorities: &[AuthorityId]) -> Result, bft::Error> { + Ok(Some(match msg.message { net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c { net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({ + if &proposal.sender == local_id { return Ok(None) } let proposal = bft::generic::LocalizedProposal { round_number: proposal.round_number as usize, proposal: proposal.proposal, @@ -95,9 +98,12 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) - } }; bft::check_proposal(authorities, &msg.parent_hash, &proposal)?; + + trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, Hash::from(proposal.sender)); proposal }), net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({ + if &vote.sender == local_id { return Ok(None) } let vote = bft::generic::LocalizedVote { sender: vote.sender, signature: ed25519::LocalizedSignature { @@ -111,6 +117,8 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) - } }; bft::check_vote(authorities, &msg.parent_hash, &vote)?; + + trace!(target: "bft", "importing vote {:?} from {}", vote.vote, Hash::from(vote.sender)); vote }), }), @@ -121,7 +129,7 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) - .map_err(|_| bft::ErrorKind::InvalidJustification.into()); bft::generic::Communication::Auxiliary(justification?) }, - }) + })) } impl Sink for BftSink { @@ -190,8 +198,10 @@ fn start_bft( } }; + let input = Messages { network_stream: network.bft_messages(parent_hash), + local_id: bft_service.local_id(), authorities, };