Ensure VoteAdvance messages propagate (#148)

* avoid clobbering accumulator state with late propose

* log state when polling

* trace on input messages
This commit is contained in:
Robert Habermeier
2018-05-09 12:25:17 +02:00
committed by Gav Wood
parent 42d22cbb37
commit 54f5debf5d
2 changed files with 19 additions and 6 deletions
+4 -1
View File
@@ -635,7 +635,10 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
let offset = U256::from_big_endian(&self.random_seed.0) % len; let offset = U256::from_big_endian(&self.random_seed.0) % len;
let offset = offset.low_u64() as usize + round_number; let offset = offset.low_u64() as usize + round_number;
authorities[offset % authorities.len()].clone() let proposer = authorities[offset % authorities.len()].clone();
trace!(target: "bft", "proposer for round {} is {}", round_number, Hash::from(proposer));
proposer
} }
fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) { fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) {
+15 -5
View File
@@ -49,6 +49,7 @@ struct BftSink<E> {
struct Messages { struct Messages {
network_stream: net::BftMessageStream, network_stream: net::BftMessageStream,
local_id: AuthorityId,
authorities: Vec<AuthorityId>, authorities: Vec<AuthorityId>,
} }
@@ -64,8 +65,9 @@ impl Stream for Messages {
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end. Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
Ok(Async::Ready(Some(message))) => { Ok(Async::Ready(Some(message))) => {
match process_message(message, &self.authorities) { match process_message(message, &self.local_id, &self.authorities) {
Ok(message) => return Ok(Async::Ready(Some(message))), Ok(Some(message)) => return Ok(Async::Ready(Some(message))),
Ok(None) => {} // ignored local message.
Err(e) => { Err(e) => {
debug!("Message validation failed: {:?}", e); debug!("Message validation failed: {:?}", e);
} }
@@ -76,10 +78,11 @@ impl Stream for Messages {
} }
} }
fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -> Result<bft::Communication, bft::Error> { fn process_message(msg: net::LocalizedBftMessage, local_id: &AuthorityId, authorities: &[AuthorityId]) -> Result<Option<bft::Communication>, bft::Error> {
Ok(match msg.message { Ok(Some(match msg.message {
net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c { net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({ net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
if &proposal.sender == local_id { return Ok(None) }
let proposal = bft::generic::LocalizedProposal { let proposal = bft::generic::LocalizedProposal {
round_number: proposal.round_number as usize, round_number: proposal.round_number as usize,
proposal: proposal.proposal, proposal: proposal.proposal,
@@ -95,9 +98,12 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -
} }
}; };
bft::check_proposal(authorities, &msg.parent_hash, &proposal)?; bft::check_proposal(authorities, &msg.parent_hash, &proposal)?;
trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, Hash::from(proposal.sender));
proposal proposal
}), }),
net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({ net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
if &vote.sender == local_id { return Ok(None) }
let vote = bft::generic::LocalizedVote { let vote = bft::generic::LocalizedVote {
sender: vote.sender, sender: vote.sender,
signature: ed25519::LocalizedSignature { signature: ed25519::LocalizedSignature {
@@ -111,6 +117,8 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -
} }
}; };
bft::check_vote(authorities, &msg.parent_hash, &vote)?; bft::check_vote(authorities, &msg.parent_hash, &vote)?;
trace!(target: "bft", "importing vote {:?} from {}", vote.vote, Hash::from(vote.sender));
vote vote
}), }),
}), }),
@@ -121,7 +129,7 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -
.map_err(|_| bft::ErrorKind::InvalidJustification.into()); .map_err(|_| bft::ErrorKind::InvalidJustification.into());
bft::generic::Communication::Auxiliary(justification?) bft::generic::Communication::Auxiliary(justification?)
}, },
}) }))
} }
impl<E> Sink for BftSink<E> { impl<E> Sink for BftSink<E> {
@@ -190,8 +198,10 @@ fn start_bft<F, C>(
} }
}; };
let input = Messages { let input = Messages {
network_stream: network.bft_messages(parent_hash), network_stream: network.bft_messages(parent_hash),
local_id: bft_service.local_id(),
authorities, authorities,
}; };