mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 14:31:02 +00:00
Ensure VoteAdvance messages propagate (#148)
* avoid clobbering accumulator state with late propose * log state when polling * trace on input messages
This commit is contained in:
committed by
Gav Wood
parent
6280f84716
commit
266ae03666
@@ -635,7 +635,10 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
|
|||||||
let offset = U256::from_big_endian(&self.random_seed.0) % len;
|
let offset = U256::from_big_endian(&self.random_seed.0) % len;
|
||||||
let offset = offset.low_u64() as usize + round_number;
|
let offset = offset.low_u64() as usize + round_number;
|
||||||
|
|
||||||
authorities[offset % authorities.len()].clone()
|
let proposer = authorities[offset % authorities.len()].clone();
|
||||||
|
trace!(target: "bft", "proposer for round {} is {}", round_number, Hash::from(proposer));
|
||||||
|
|
||||||
|
proposer
|
||||||
}
|
}
|
||||||
|
|
||||||
fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) {
|
fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) {
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ struct BftSink<E> {
|
|||||||
|
|
||||||
struct Messages {
|
struct Messages {
|
||||||
network_stream: net::BftMessageStream,
|
network_stream: net::BftMessageStream,
|
||||||
|
local_id: AuthorityId,
|
||||||
authorities: Vec<AuthorityId>,
|
authorities: Vec<AuthorityId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -64,8 +65,9 @@ impl Stream for Messages {
|
|||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
|
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
|
||||||
Ok(Async::Ready(Some(message))) => {
|
Ok(Async::Ready(Some(message))) => {
|
||||||
match process_message(message, &self.authorities) {
|
match process_message(message, &self.local_id, &self.authorities) {
|
||||||
Ok(message) => return Ok(Async::Ready(Some(message))),
|
Ok(Some(message)) => return Ok(Async::Ready(Some(message))),
|
||||||
|
Ok(None) => {} // ignored local message.
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Message validation failed: {:?}", e);
|
debug!("Message validation failed: {:?}", e);
|
||||||
}
|
}
|
||||||
@@ -76,10 +78,11 @@ impl Stream for Messages {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -> Result<bft::Communication, bft::Error> {
|
fn process_message(msg: net::LocalizedBftMessage, local_id: &AuthorityId, authorities: &[AuthorityId]) -> Result<Option<bft::Communication>, bft::Error> {
|
||||||
Ok(match msg.message {
|
Ok(Some(match msg.message {
|
||||||
net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
|
net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
|
||||||
net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
|
net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
|
||||||
|
if &proposal.sender == local_id { return Ok(None) }
|
||||||
let proposal = bft::generic::LocalizedProposal {
|
let proposal = bft::generic::LocalizedProposal {
|
||||||
round_number: proposal.round_number as usize,
|
round_number: proposal.round_number as usize,
|
||||||
proposal: proposal.proposal,
|
proposal: proposal.proposal,
|
||||||
@@ -95,9 +98,12 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
bft::check_proposal(authorities, &msg.parent_hash, &proposal)?;
|
bft::check_proposal(authorities, &msg.parent_hash, &proposal)?;
|
||||||
|
|
||||||
|
trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, Hash::from(proposal.sender));
|
||||||
proposal
|
proposal
|
||||||
}),
|
}),
|
||||||
net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
|
net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
|
||||||
|
if &vote.sender == local_id { return Ok(None) }
|
||||||
let vote = bft::generic::LocalizedVote {
|
let vote = bft::generic::LocalizedVote {
|
||||||
sender: vote.sender,
|
sender: vote.sender,
|
||||||
signature: ed25519::LocalizedSignature {
|
signature: ed25519::LocalizedSignature {
|
||||||
@@ -111,6 +117,8 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
bft::check_vote(authorities, &msg.parent_hash, &vote)?;
|
bft::check_vote(authorities, &msg.parent_hash, &vote)?;
|
||||||
|
|
||||||
|
trace!(target: "bft", "importing vote {:?} from {}", vote.vote, Hash::from(vote.sender));
|
||||||
vote
|
vote
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
@@ -121,7 +129,7 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -
|
|||||||
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
|
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
|
||||||
bft::generic::Communication::Auxiliary(justification?)
|
bft::generic::Communication::Auxiliary(justification?)
|
||||||
},
|
},
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E> Sink for BftSink<E> {
|
impl<E> Sink for BftSink<E> {
|
||||||
@@ -190,8 +198,10 @@ fn start_bft<F, C>(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
let input = Messages {
|
let input = Messages {
|
||||||
network_stream: network.bft_messages(parent_hash),
|
network_stream: network.bft_messages(parent_hash),
|
||||||
|
local_id: bft_service.local_id(),
|
||||||
authorities,
|
authorities,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -259,13 +259,18 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
|
|||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug!(target: "bft", "Importing proposal for round {}", self.round_number);
|
||||||
|
|
||||||
self.proposal = Some(Proposal {
|
self.proposal = Some(Proposal {
|
||||||
proposal: proposal.proposal.clone(),
|
proposal: proposal.proposal.clone(),
|
||||||
digest: proposal.digest,
|
digest: proposal.digest,
|
||||||
digest_signature: proposal.digest_signature,
|
digest_signature: proposal.digest_signature,
|
||||||
});
|
});
|
||||||
|
|
||||||
self.state = State::Proposed(proposal.proposal);
|
if let State::Begin = self.state {
|
||||||
|
self.state = State::Proposed(proposal.proposal);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,6 +320,7 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
|
|||||||
.map(|&(_, ref s)| s.clone())
|
.map(|&(_, ref s)| s.clone())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
trace!(target: "bft", "observed threshold-prepare for round {}", self.round_number);
|
||||||
self.state = State::Prepared(Justification(UncheckedJustification {
|
self.state = State::Prepared(Justification(UncheckedJustification {
|
||||||
round_number: self.round_number,
|
round_number: self.round_number,
|
||||||
digest: threshold_prepared,
|
digest: threshold_prepared,
|
||||||
@@ -339,7 +345,6 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
|
|||||||
count.committed += 1;
|
count.committed += 1;
|
||||||
|
|
||||||
if count.committed >= self.threshold {
|
if count.committed >= self.threshold {
|
||||||
trace!(target: "bft", "observed threshold-commit for round {} with {} commits", self.round_number, count.committed);
|
|
||||||
Some(digest)
|
Some(digest)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@@ -370,6 +375,7 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
|
|||||||
.map(|&(_, ref s)| s.clone())
|
.map(|&(_, ref s)| s.clone())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
trace!(target: "bft", "observed threshold-commit for round {}", self.round_number);
|
||||||
self.state = State::Committed(Justification(UncheckedJustification {
|
self.state = State::Committed(Justification(UncheckedJustification {
|
||||||
round_number: self.round_number,
|
round_number: self.round_number,
|
||||||
digest: threshold_committed,
|
digest: threshold_committed,
|
||||||
@@ -387,6 +393,7 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
|
|||||||
self.advance_round.insert(sender);
|
self.advance_round.insert(sender);
|
||||||
|
|
||||||
if self.advance_round.len() < self.threshold { return Ok(()) }
|
if self.advance_round.len() < self.threshold { return Ok(()) }
|
||||||
|
trace!(target: "bft", "Witnessed threshold advance-round messages for round {}", self.round_number);
|
||||||
|
|
||||||
// allow transition to new round only if we haven't produced a justification
|
// allow transition to new round only if we haven't produced a justification
|
||||||
// yet.
|
// yet.
|
||||||
@@ -665,6 +672,105 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn propose_after_prepared_does_not_clobber_state() {
|
||||||
|
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
|
||||||
|
assert_eq!(accumulator.state(), &State::Begin);
|
||||||
|
|
||||||
|
for i in 0..7 {
|
||||||
|
accumulator.import_message(LocalizedVote {
|
||||||
|
sender: AuthorityId(i),
|
||||||
|
signature: Signature(999, i),
|
||||||
|
vote: Vote::Prepare(1, Digest(999)),
|
||||||
|
}.into()).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
match accumulator.state() {
|
||||||
|
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||||
|
s => panic!("wrong state: {:?}", s),
|
||||||
|
}
|
||||||
|
|
||||||
|
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
|
||||||
|
sender: AuthorityId(8),
|
||||||
|
full_signature: Signature(999, 8),
|
||||||
|
digest_signature: Signature(999, 8),
|
||||||
|
round_number: 1,
|
||||||
|
proposal: Candidate(999),
|
||||||
|
digest: Digest(999),
|
||||||
|
})).unwrap();
|
||||||
|
|
||||||
|
match accumulator.state() {
|
||||||
|
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||||
|
s => panic!("wrong state: {:?}", s),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn propose_after_committed_does_not_clobber_state() {
|
||||||
|
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
|
||||||
|
assert_eq!(accumulator.state(), &State::Begin);
|
||||||
|
|
||||||
|
for i in 0..7 {
|
||||||
|
accumulator.import_message(LocalizedVote {
|
||||||
|
sender: AuthorityId(i),
|
||||||
|
signature: Signature(999, i),
|
||||||
|
vote: Vote::Commit(1, Digest(999)),
|
||||||
|
}.into()).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
match accumulator.state() {
|
||||||
|
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||||
|
s => panic!("wrong state: {:?}", s),
|
||||||
|
}
|
||||||
|
|
||||||
|
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
|
||||||
|
sender: AuthorityId(8),
|
||||||
|
full_signature: Signature(999, 8),
|
||||||
|
digest_signature: Signature(999, 8),
|
||||||
|
round_number: 1,
|
||||||
|
proposal: Candidate(999),
|
||||||
|
digest: Digest(999),
|
||||||
|
})).unwrap();
|
||||||
|
|
||||||
|
match accumulator.state() {
|
||||||
|
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
|
||||||
|
s => panic!("wrong state: {:?}", s),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn propose_after_advance_does_not_clobber_state() {
|
||||||
|
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
|
||||||
|
assert_eq!(accumulator.state(), &State::Begin);
|
||||||
|
|
||||||
|
for i in 0..7 {
|
||||||
|
accumulator.import_message(LocalizedVote {
|
||||||
|
sender: AuthorityId(i),
|
||||||
|
signature: Signature(1, i),
|
||||||
|
vote: Vote::AdvanceRound(1),
|
||||||
|
}.into()).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
match accumulator.state() {
|
||||||
|
&State::Advanced(_) => {}
|
||||||
|
s => panic!("wrong state: {:?}", s),
|
||||||
|
}
|
||||||
|
|
||||||
|
accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
|
||||||
|
sender: AuthorityId(8),
|
||||||
|
full_signature: Signature(999, 8),
|
||||||
|
digest_signature: Signature(999, 8),
|
||||||
|
round_number: 1,
|
||||||
|
proposal: Candidate(999),
|
||||||
|
digest: Digest(999),
|
||||||
|
})).unwrap();
|
||||||
|
|
||||||
|
match accumulator.state() {
|
||||||
|
&State::Advanced(_) => {}
|
||||||
|
s => panic!("wrong state: {:?}", s),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn begin_to_advance() {
|
fn begin_to_advance() {
|
||||||
let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, AuthorityId(8));
|
let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, AuthorityId(8));
|
||||||
|
|||||||
@@ -418,6 +418,7 @@ impl<C: Context> Strategy<C> {
|
|||||||
|
|
||||||
// poll until either completion or state doesn't change.
|
// poll until either completion or state doesn't change.
|
||||||
loop {
|
loop {
|
||||||
|
trace!(target: "bft", "Polling BFT logic. State={:?}", last_watermark);
|
||||||
match self.poll_once(context, sending)? {
|
match self.poll_once(context, sending)? {
|
||||||
Async::Ready(x) => return Ok(Async::Ready(x)),
|
Async::Ready(x) => return Ok(Async::Ready(x)),
|
||||||
Async::NotReady => {
|
Async::NotReady => {
|
||||||
@@ -683,6 +684,7 @@ impl<C: Context> Strategy<C> {
|
|||||||
|
|
||||||
fn advance_to_round(&mut self, context: &C, round: usize) {
|
fn advance_to_round(&mut self, context: &C, round: usize) {
|
||||||
assert!(round > self.current_accumulator.round_number());
|
assert!(round > self.current_accumulator.round_number());
|
||||||
|
trace!(target: "bft", "advancing to round {}", round);
|
||||||
|
|
||||||
let threshold = self.nodes - self.max_faulty;
|
let threshold = self.nodes - self.max_faulty;
|
||||||
|
|
||||||
@@ -790,7 +792,6 @@ impl<C, I, O> Future for Agreement<C, I, O>
|
|||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
Async::NotReady => {
|
Async::NotReady => {
|
||||||
|
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -329,6 +329,12 @@ impl<P, I> BftService<P, I>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the local Authority ID.
|
||||||
|
pub fn local_id(&self) -> AuthorityId {
|
||||||
|
// TODO: based on a header and some keystore.
|
||||||
|
self.key.public().0
|
||||||
|
}
|
||||||
|
|
||||||
/// Signal that a valid block with the given header has been imported.
|
/// Signal that a valid block with the given header has been imported.
|
||||||
///
|
///
|
||||||
/// If the local signing key is an authority, this will begin the consensus process to build a
|
/// If the local signing key is an authority, this will begin the consensus process to build a
|
||||||
@@ -350,7 +356,7 @@ impl<P, I> BftService<P, I>
|
|||||||
let max_faulty = max_faulty_of(n);
|
let max_faulty = max_faulty_of(n);
|
||||||
trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty);
|
trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty);
|
||||||
|
|
||||||
let local_id = self.key.public().0;
|
let local_id = self.local_id();
|
||||||
|
|
||||||
if !authorities.contains(&local_id) {
|
if !authorities.contains(&local_id) {
|
||||||
// cancel current agreement
|
// cancel current agreement
|
||||||
|
|||||||
@@ -139,7 +139,9 @@ impl Consensus {
|
|||||||
pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::LocalizedBftMessage, hash: Hash) {
|
pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::LocalizedBftMessage, hash: Hash) {
|
||||||
if self.messages.contains_key(&hash) {
|
if self.messages.contains_key(&hash) {
|
||||||
trace!(target:"sync", "Ignored already known BFT message from {}", peer_id);
|
trace!(target:"sync", "Ignored already known BFT message from {}", peer_id);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||||
peer.known_messages.insert(hash);
|
peer.known_messages.insert(hash);
|
||||||
// TODO: validate signature?
|
// TODO: validate signature?
|
||||||
|
|||||||
Reference in New Issue
Block a user