grandpa: round catchup messages (#2801)

* grandpa: initial structure for catch up messages

* grandpa: answer catch up requests

* grandpa: inject catch up messages into global stream

* grandpa: keep track of pending catch up request

* grandpa: block catchup until all referenced blocks are imported

* grandpa: unify catch up and commit streams

* grandpa: simplify communication stream/sink types

* grandpa: note gossip validator on catch up message import

* grandpa: fix cost on catch up message validation

* grandpa: check signatures on catch up messages

* grandpa: clean up catch up request handling state

* grandpa: adjust costs on invalid catch up requests

* grandpa: release lock before pushing catch up message

* grandpa: validate catch up request against peer view

* grandpa: catch up docs

* grandpa: fix tests

* grandpa: until_imported: add tests for catch up messages

* grandpa: add tests for catch up message gossip validation

* grandpa: integrate HistoricalVotes changes

* grandpa: add test for neighbor packet triggering catch up

* grandpa: add test for full voter catch up

* grandpa: depend on finality-grandpa 0.8 from crates

* granda: use finality-grandpa test helpers

* grandpa: add PSM cost for answering catch up requests

* grandpa: code style fixes

Co-Authored-By: Robert Habermeier <rphmeier@gmail.com>

* grandpa: more trailing commas

* grandpa: lower cost of invalid catch up requests near set change

* grandpa: process catch up sending on import of neighbor message

* grandpa: add comments on HistoricalVotes

* grandpa: use finality-grandpa v0.8.1 from crates.io

* grandpa: fix test compilation
This commit is contained in:
André Silva
2019-07-04 20:40:16 +01:00
committed by GitHub
parent 90f214f000
commit d5bc7325b9
11 changed files with 1381 additions and 240 deletions
@@ -46,7 +46,7 @@
//! #### Propose
//!
//! This is a broadcast by a known voter of the last-round estimate.
//!
//! #### Commit
//!
//! These are used to announce past agreement of finality.
@@ -58,6 +58,21 @@
//! Sending a commit is polite when it may finalize something that the receiving peer
//! was not aware of.
//!
//! #### Catch Up
//!
//! These allow a peer to request another peer, which they perceive to be in a
//! later round, to provide all the votes necessary to complete a given round
//! `R`.
//!
//! It is impolite to send a catch up request for a round `R` to a peer whose
//! announced view is behind `R`. It is also impolite to send a catch up request
//! to a peer in a new different Set ID.
//!
//! The logic for issuing and tracking pending catch up requests is implemented
//! in the `GossipValidator`. A catch up request is issued anytime we see a
//! neighbor packet from a peer at a round `CATCH_UP_THRESHOLD` higher than at
//! we are.
//!
//! ## Expiration
//!
//! We keep some amount of recent rounds' messages, but do not accept new ones from rounds
@@ -78,13 +93,20 @@ use log::{trace, debug, warn};
use futures::prelude::*;
use futures::sync::mpsc;
use crate::{CompactCommit, SignedMessage};
use crate::{environment, CatchUp, CompactCommit, SignedMessage};
use super::{cost, benefit, Round, SetId};
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
const CATCH_UP_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
const CATCH_UP_PROCESS_TIMEOUT: Duration = Duration::from_secs(15);
/// Maximum number of rounds we are behind a peer before issuing a
/// catch up request.
const CATCH_UP_THRESHOLD: u64 = 2;
type Report = (PeerId, i32);
/// An outcome of examining a message.
#[derive(Debug, PartialEq, Clone, Copy)]
@@ -230,6 +252,10 @@ pub(super) enum GossipMessage<Block: BlockT> {
Commit(FullCommitMessage<Block>),
/// A neighbor packet. Not repropagated.
Neighbor(VersionedNeighborPacket<NumberFor<Block>>),
/// Grandpa catch up request message with round and set info. Not repropagated.
CatchUpRequest(CatchUpRequestMessage),
/// Grandpa catch up message with round and set info. Not repropagated.
CatchUp(FullCatchUpMessage<Block>),
}
impl<Block: BlockT> From<NeighborPacket<NumberFor<Block>>> for GossipMessage<Block> {
@@ -264,9 +290,12 @@ pub(super) struct FullCommitMessage<Block: BlockT> {
/// and are not repropagated. These contain information about the node's state.
#[derive(Debug, Encode, Decode, Clone)]
pub(super) struct NeighborPacket<N> {
round: Round,
set_id: SetId,
commit_finalized_height: N,
/// The round the node is currently at.
pub(super) round: Round,
/// The set ID the node is currently at.
pub(super) set_id: SetId,
/// The highest finalizing commit observed.
pub(super) commit_finalized_height: N,
}
/// A versioned neighbor packet.
@@ -284,6 +313,24 @@ impl<N> VersionedNeighborPacket<N> {
}
}
/// A catch up request for a given round (or any further round) localized by set id.
#[derive(Clone, Debug, Encode, Decode)]
pub(super) struct CatchUpRequestMessage {
/// The round that we want to catch up to.
pub(super) round: Round,
/// The voter set ID this message is from.
pub(super) set_id: SetId,
}
/// Network level catch up message with topic information.
#[derive(Debug, Encode, Decode)]
pub(super) struct FullCatchUpMessage<Block: BlockT> {
/// The voter set ID this message is from.
pub(super) set_id: SetId,
/// The compact commit message.
pub(super) message: CatchUp<Block>,
}
/// Misbehavior that peers can perform.
///
/// `cost` gives a cost that can be used to perform cost/benefit analysis of a
@@ -294,6 +341,10 @@ pub(super) enum Misbehavior {
InvalidViewChange,
// could not decode neighbor message. bytes-length of the packet.
UndecodablePacket(i32),
// Bad catch up message (invalid signatures).
BadCatchUpMessage {
signatures_checked: i32,
},
// Bad commit message
BadCommitMessage {
signatures_checked: i32,
@@ -315,7 +366,9 @@ impl Misbehavior {
match *self {
InvalidViewChange => cost::INVALID_VIEW_CHANGE,
UndecodablePacket(bytes) => bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
UndecodablePacket(bytes) => bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
BadCatchUpMessage { signatures_checked } =>
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked),
BadCommitMessage { signatures_checked, blocks_loaded, equivocations_caught } => {
let cost = cost::PER_SIGNATURE_CHECKED
.saturating_mul(signatures_checked)
@@ -425,6 +478,23 @@ pub(super) enum Action<H> {
Discard(i32),
}
/// State of catch up request handling.
#[derive(Debug)]
enum PendingCatchUp {
/// No pending catch up requests.
None,
/// Pending catch up request which has not been answered yet.
Requesting {
who: PeerId,
request: CatchUpRequestMessage,
instant: Instant,
},
/// Pending catch up request that was answered and is being processed.
Processing {
instant: Instant,
},
}
struct Inner<Block: BlockT> {
local_view: Option<View<NumberFor<Block>>>,
peers: Peers<NumberFor<Block>>,
@@ -432,6 +502,7 @@ struct Inner<Block: BlockT> {
authorities: Vec<AuthorityId>,
config: crate::Config,
next_rebroadcast: Instant,
pending_catch_up: PendingCatchUp,
}
type MaybeMessage<Block> = Option<(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)>;
@@ -444,6 +515,7 @@ impl<Block: BlockT> Inner<Block> {
live_topics: KeepTopics::new(),
next_rebroadcast: Instant::now() + REBROADCAST_AFTER,
authorities: Vec::new(),
pending_catch_up: PendingCatchUp::None,
config,
}
}
@@ -593,18 +665,201 @@ impl<Block: BlockT> Inner<Block> {
Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_COMMIT)
}
fn import_neighbor_message(&mut self, who: &PeerId, update: NeighborPacket<NumberFor<Block>>)
-> (Vec<Block::Hash>, Action<Block::Hash>)
fn validate_catch_up_message(&mut self, who: &PeerId, full: &FullCatchUpMessage<Block>)
-> Action<Block::Hash>
{
let (cb, topics) = match self.peers.update_peer_state(who, update) {
Ok(view) => (100i32, view.map(|view| neighbor_topics::<Block>(view))),
Err(misbehavior) => (misbehavior.cost(), None)
match &self.pending_catch_up {
PendingCatchUp::Requesting { who: peer, request, instant } => {
if peer != who {
return Action::Discard(Misbehavior::OutOfScopeMessage.cost());
}
if request.set_id != full.set_id {
return Action::Discard(cost::MALFORMED_CATCH_UP);
}
if request.round.0 > full.message.round_number {
return Action::Discard(cost::MALFORMED_CATCH_UP);
}
if full.message.prevotes.is_empty() || full.message.precommits.is_empty() {
return Action::Discard(cost::MALFORMED_CATCH_UP);
}
// move request to pending processing state, we won't push out
// any catch up requests until we import this one (either with a
// success or failure).
self.pending_catch_up = PendingCatchUp::Processing {
instant: instant.clone(),
};
// always discard catch up messages, they're point-to-point
let topic = super::global_topic::<Block>(full.set_id.0);
Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_CATCH_UP)
},
_ => Action::Discard(Misbehavior::OutOfScopeMessage.cost()),
}
}
fn note_catch_up_message_processed(&mut self) {
match &self.pending_catch_up {
PendingCatchUp::Processing { .. } => {
self.pending_catch_up = PendingCatchUp::None;
},
state => trace!(target: "afg",
"Noted processed catch up message when state was: {:?}",
state,
),
}
}
fn handle_catch_up_request(
&mut self,
who: &PeerId,
request: CatchUpRequestMessage,
set_state: &environment::SharedVoterSetState<Block>,
) -> (Option<GossipMessage<Block>>, Action<Block::Hash>) {
let local_view = match self.local_view {
None => return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
Some(ref view) => view,
};
if request.set_id != local_view.set_id {
// NOTE: When we're close to a set change there is potentially a
// race where the peer sent us the request before it observed that
// we had transitioned to a new set. In this case we charge a lower
// cost.
if local_view.round.0.saturating_sub(CATCH_UP_THRESHOLD) == 0 {
return (None, Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP));
}
return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()));
}
match self.peers.peer(who) {
None =>
return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
Some(peer) if peer.view.round >= request.round =>
return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())),
_ => {},
}
let last_completed_round = set_state.read().last_completed_round();
if last_completed_round.number < request.round.0 {
return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost()));
}
trace!(target: "afg", "Replying to catch-up request for round {} from {} with round {}",
request.round.0,
who,
last_completed_round.number,
);
let mut prevotes = Vec::new();
let mut precommits = Vec::new();
// NOTE: the set of votes stored in `LastCompletedRound` is a minimal
// set of votes, i.e. at most one equivocation is stored per voter. The
// code below assumes this invariant is maintained when creating the
// catch up reply since peers won't accept catch-up messages that have
// too many equivocations (we exceed the fault-tolerance bound).
for vote in last_completed_round.votes {
match vote.message {
grandpa::Message::Prevote(prevote) => {
prevotes.push(grandpa::SignedPrevote {
prevote,
signature: vote.signature,
id: vote.id,
});
},
grandpa::Message::Precommit(precommit) => {
precommits.push(grandpa::SignedPrecommit {
precommit,
signature: vote.signature,
id: vote.id,
});
},
_ => {},
}
}
let (base_hash, base_number) = last_completed_round.base;
let catch_up = CatchUp::<Block> {
round_number: last_completed_round.number,
prevotes,
precommits,
base_hash,
base_number,
};
let full_catch_up = GossipMessage::CatchUp::<Block>(FullCatchUpMessage {
set_id: request.set_id,
message: catch_up,
});
(Some(full_catch_up), Action::Discard(cost::CATCH_UP_REPLY))
}
fn try_catch_up(&mut self, who: &PeerId) -> (Option<GossipMessage<Block>>, Option<Report>) {
let mut catch_up = None;
let mut report = None;
// if the peer is on the same set and ahead of us by a margin bigger
// than `CATCH_UP_THRESHOLD` then we should ask it for a catch up
// message.
if let (Some(peer), Some(local_view)) = (self.peers.peer(who), &self.local_view) {
if peer.view.set_id == local_view.set_id &&
peer.view.round.0.saturating_sub(CATCH_UP_THRESHOLD) > local_view.round.0
{
// send catch up request if allowed
let round = peer.view.round.0 - 1; // peer.view.round is > 0
let request = CatchUpRequestMessage {
set_id: peer.view.set_id,
round: Round(round),
};
let (catch_up_allowed, catch_up_report) = self.note_catch_up_request(who, &request);
if catch_up_allowed {
trace!(target: "afg", "Sending catch-up request for round {} to {}",
round,
who,
);
catch_up = Some(GossipMessage::<Block>::CatchUpRequest(request));
}
report = catch_up_report;
}
}
(catch_up, report)
}
fn import_neighbor_message(&mut self, who: &PeerId, update: NeighborPacket<NumberFor<Block>>)
-> (Vec<Block::Hash>, Action<Block::Hash>, Option<GossipMessage<Block>>, Option<Report>)
{
let update_res = self.peers.update_peer_state(who, update);
let (cost_benefit, topics) = match update_res {
Ok(view) =>
(benefit::NEIGHBOR_MESSAGE, view.map(|view| neighbor_topics::<Block>(view))),
Err(misbehavior) =>
(misbehavior.cost(), None),
};
let (catch_up, report) = match update_res {
Ok(_) => self.try_catch_up(who),
_ => (None, None),
};
let neighbor_topics = topics.unwrap_or_default();
// always discard, it's valid for one hop.
(neighbor_topics, Action::Discard(cb))
// always discard neighbor messages, it's only valid for one hop.
let action = Action::Discard(cost_benefit);
(neighbor_topics, action, catch_up, report)
}
fn multicast_neighbor_packet(&self) -> MaybeMessage<Block> {
@@ -619,20 +874,55 @@ impl<Block: BlockT> Inner<Block> {
(peers, packet)
})
}
fn note_catch_up_request(
&mut self,
who: &PeerId,
catch_up_request: &CatchUpRequestMessage,
) -> (bool, Option<Report>) {
let report = match &self.pending_catch_up {
PendingCatchUp::Requesting { who: peer, instant, .. } =>
if instant.elapsed() <= CATCH_UP_REQUEST_TIMEOUT {
return (false, None);
} else {
// report peer for timeout
Some((peer.clone(), cost::CATCH_UP_REQUEST_TIMEOUT))
},
PendingCatchUp::Processing { instant, .. } =>
if instant.elapsed() < CATCH_UP_PROCESS_TIMEOUT {
return (false, None);
} else {
None
},
_ => None,
};
self.pending_catch_up = PendingCatchUp::Requesting {
who: who.clone(),
request: catch_up_request.clone(),
instant: Instant::now(),
};
(true, report)
}
}
/// A validator for GRANDPA gossip messages.
pub(super) struct GossipValidator<Block: BlockT> {
inner: parking_lot::RwLock<Inner<Block>>,
set_state: environment::SharedVoterSetState<Block>,
report_sender: mpsc::UnboundedSender<PeerReport>,
}
impl<Block: BlockT> GossipValidator<Block> {
/// Create a new gossip-validator. This initialized the current set to 0.
pub(super) fn new(config: crate::Config) -> (GossipValidator<Block>, ReportStream) {
pub(super) fn new(config: crate::Config, set_state: environment::SharedVoterSetState<Block>)
-> (GossipValidator<Block>, ReportStream)
{
let (tx, rx) = mpsc::unbounded();
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
set_state,
report_sender: tx,
};
@@ -670,26 +960,50 @@ impl<Block: BlockT> GossipValidator<Block> {
}
}
/// Note that we've processed a catch up message.
pub(super) fn note_catch_up_message_processed(&self) {
self.inner.write().note_catch_up_message_processed();
}
fn report(&self, who: PeerId, cost_benefit: i32) {
let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
}
pub(super) fn do_validate(&self, who: &PeerId, mut data: &[u8])
-> (Action<Block::Hash>, Vec<Block::Hash>)
-> (Action<Block::Hash>, Vec<Block::Hash>, Option<GossipMessage<Block>>)
{
let mut broadcast_topics = Vec::new();
let mut peer_reply = None;
let action = {
match GossipMessage::<Block>::decode(&mut data) {
Some(GossipMessage::VoteOrPrecommit(ref message))
=> self.inner.write().validate_round_message(who, message),
Some(GossipMessage::Commit(ref message)) => self.inner.write().validate_commit_message(who, message),
Some(GossipMessage::Neighbor(update)) => {
let (topics, action) = self.inner.write().import_neighbor_message(
let (topics, action, catch_up, report) = self.inner.write().import_neighbor_message(
who,
update.into_neighbor_packet(),
);
if let Some((peer, cost_benefit)) = report {
self.report(peer, cost_benefit);
}
broadcast_topics = topics;
peer_reply = catch_up;
action
}
Some(GossipMessage::CatchUp(ref message))
=> self.inner.write().validate_catch_up_message(who, message),
Some(GossipMessage::CatchUpRequest(request)) => {
let (reply, action) = self.inner.write().handle_catch_up_request(
who,
request,
&self.set_state,
);
peer_reply = reply;
action
}
None => {
@@ -702,7 +1016,7 @@ impl<Block: BlockT> GossipValidator<Block> {
}
};
(action, broadcast_topics)
(action, broadcast_topics, peer_reply)
}
}
@@ -734,9 +1048,13 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
fn validate(&self, context: &mut dyn ValidatorContext<Block>, who: &PeerId, data: &[u8])
-> network_gossip::ValidationResult<Block::Hash>
{
let (action, broadcast_topics) = self.do_validate(who, data);
let (action, broadcast_topics, peer_reply) = self.do_validate(who, data);
// not with lock held!
if let Some(msg) = peer_reply {
context.send_message(who, msg.encode());
}
for topic in broadcast_topics {
context.send_topic(who, topic, false);
}
@@ -817,6 +1135,8 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
&& Some(full.message.target_number) > peer_best_commit
}
Some(GossipMessage::Neighbor(_)) => false,
Some(GossipMessage::CatchUpRequest(_)) => false,
Some(GossipMessage::CatchUp(_)) => false,
Some(GossipMessage::VoteOrPrecommit(_)) => false, // should not be the case.
}
})
@@ -910,6 +1230,7 @@ impl<B: BlockT, N: super::Network<B>> Future for ReportingTask<B, N> {
#[cfg(test)]
mod tests {
use super::*;
use super::environment::SharedVoterSetState;
use network_gossip::Validator as GossipValidatorT;
use network::test::Block;
@@ -923,6 +1244,33 @@ mod tests {
}
}
// dummy voter set state
fn voter_set_state() -> SharedVoterSetState<Block> {
use crate::authorities::AuthoritySet;
use crate::environment::{CompletedRound, CompletedRounds, HasVoted, VoterSetState};
use grandpa::round::State as RoundState;
use substrate_primitives::H256;
let state = RoundState::genesis((H256::zero(), 0));
let base = state.prevote_ghost.unwrap();
let voters = AuthoritySet::genesis(Vec::new());
let set_state = VoterSetState::Live {
completed_rounds: CompletedRounds::new(
CompletedRound {
state,
number: 0,
votes: Vec::new(),
base,
},
0,
&voters,
),
current_round: HasVoted::No,
};
set_state.into()
}
#[test]
fn view_vote_rules() {
let view = View { round: Round(100), set_id: SetId(1), last_commit: Some(1000u64) };
@@ -1064,7 +1412,10 @@ mod tests {
#[test]
fn messages_not_expired_immediately() {
let (val, _) = GossipValidator::<Block>::new(config());
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
);
let set_id = 1;
@@ -1096,7 +1447,10 @@ mod tests {
fn message_from_unknown_authority_discarded() {
assert!(cost::UNKNOWN_VOTER != cost::BAD_SIGNATURE);
let (val, _) = GossipValidator::<Block>::new(config());
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
);
let set_id = 1;
let auth = AuthorityId::from_raw([1u8; 32]);
let peer = PeerId::random();
@@ -1134,4 +1488,122 @@ mod tests {
assert_eq!(unknown_voter, Action::Discard(cost::UNKNOWN_VOTER));
assert_eq!(bad_sig, Action::Discard(cost::BAD_SIGNATURE));
}
#[test]
fn unsolicited_catch_up_messages_discarded() {
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
);
let set_id = 1;
let auth = AuthorityId::from_raw([1u8; 32]);
let peer = PeerId::random();
val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
val.note_round(Round(0), |_, _| {});
let validate_catch_up = || {
let mut inner = val.inner.write();
inner.validate_catch_up_message(&peer, &FullCatchUpMessage {
set_id: SetId(set_id),
message: grandpa::CatchUp {
round_number: 10,
prevotes: Default::default(),
precommits: Default::default(),
base_hash: Default::default(),
base_number: Default::default(),
}
})
};
// the catch up is discarded because we have no pending request
assert_eq!(validate_catch_up(), Action::Discard(cost::OUT_OF_SCOPE_MESSAGE));
let noted = val.inner.write().note_catch_up_request(
&peer,
&CatchUpRequestMessage {
set_id: SetId(set_id),
round: Round(10),
}
);
assert!(noted.0);
// catch up is allowed because we have requested it, but it's rejected
// because it's malformed (empty prevotes and precommits)
assert_eq!(validate_catch_up(), Action::Discard(cost::MALFORMED_CATCH_UP));
}
#[test]
fn unanswerable_catch_up_requests_discarded() {
// create voter set state with round 1 completed
let set_state: SharedVoterSetState<Block> = {
let mut completed_rounds = voter_set_state().read().completed_rounds();
assert!(completed_rounds.push(environment::CompletedRound {
number: 1,
state: grandpa::round::State::genesis(Default::default()),
base: Default::default(),
votes: Default::default(),
}));
let set_state = environment::VoterSetState::<Block>::Live {
completed_rounds,
current_round: environment::HasVoted::No,
};
set_state.into()
};
let (val, _) = GossipValidator::<Block>::new(
config(),
set_state.clone(),
);
let set_id = 1;
let auth = AuthorityId::from_raw([1u8; 32]);
let peer = PeerId::random();
val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {});
val.note_round(Round(2), |_, _| {});
// add the peer making the request to the validator,
// otherwise it is discarded
let mut inner = val.inner.write();
inner.peers.new_peer(peer.clone());
let res = inner.handle_catch_up_request(
&peer,
CatchUpRequestMessage {
set_id: SetId(set_id),
round: Round(10),
},
&set_state,
);
// we're at round 2, a catch up request for round 10 is out of scope
assert!(res.0.is_none());
assert_eq!(res.1, Action::Discard(cost::OUT_OF_SCOPE_MESSAGE));
let res = inner.handle_catch_up_request(
&peer,
CatchUpRequestMessage {
set_id: SetId(set_id),
round: Round(1),
},
&set_state,
);
// a catch up request for round 1 should be answered successfully
match res.0.unwrap() {
GossipMessage::CatchUp(catch_up) => {
assert_eq!(catch_up.set_id, SetId(set_id));
assert_eq!(catch_up.message.round_number, 1);
assert_eq!(res.1, Action::Discard(cost::CATCH_UP_REPLY));
},
_ => panic!("expected catch up message"),
};
}
}
@@ -29,7 +29,7 @@
use std::sync::Arc;
use grandpa::voter_set::VoterSet;
use grandpa::{voter, voter_set::VoterSet};
use grandpa::Message::{Prevote, Precommit, PrimaryPropose};
use futures::prelude::*;
use futures::sync::{oneshot, mpsc};
@@ -42,10 +42,13 @@ use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as Heade
use network::{consensus_gossip as network_gossip, NetworkService};
use network_gossip::ConsensusMessage;
use crate::{Error, Message, SignedMessage, Commit, CompactCommit};
use crate::{
CatchUp, Commit, CommunicationIn, CommunicationOut, CompactCommit, Error,
Message, SignedMessage,
};
use crate::environment::HasVoted;
use gossip::{
GossipMessage, FullCommitMessage, VoteOrPrecommitMessage, GossipValidator
GossipMessage, FullCatchUpMessage, FullCommitMessage, VoteOrPrecommitMessage, GossipValidator
};
use substrate_primitives::ed25519::{Public as AuthorityId, Signature as AuthoritySignature};
@@ -61,6 +64,7 @@ pub use fg_primitives::GRANDPA_ENGINE_ID;
mod cost {
pub(super) const PAST_REJECTION: i32 = -50;
pub(super) const BAD_SIGNATURE: i32 = -100;
pub(super) const MALFORMED_CATCH_UP: i32 = -1000;
pub(super) const MALFORMED_COMMIT: i32 = -1000;
pub(super) const FUTURE_MESSAGE: i32 = -500;
pub(super) const UNKNOWN_VOTER: i32 = -150;
@@ -69,13 +73,21 @@ mod cost {
pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
pub(super) const PER_BLOCK_LOADED: i32 = -10;
pub(super) const INVALID_CATCH_UP: i32 = -5000;
pub(super) const INVALID_COMMIT: i32 = -5000;
pub(super) const OUT_OF_SCOPE_MESSAGE: i32 = -500;
pub(super) const CATCH_UP_REQUEST_TIMEOUT: i32 = -200;
// cost of answering a catch up request
pub(super) const CATCH_UP_REPLY: i32 = -200;
pub(super) const HONEST_OUT_OF_SCOPE_CATCH_UP: i32 = -200;
}
// benefit scalars for reporting peers.
mod benefit {
pub(super) const NEIGHBOR_MESSAGE: i32 = 100;
pub(super) const ROUND_MESSAGE: i32 = 100;
pub(super) const BASIC_VALIDATED_CATCH_UP: i32 = 200;
pub(super) const BASIC_VALIDATED_COMMIT: i32 = 100;
pub(super) const PER_EQUIVOCATION: i32 = 10;
}
@@ -214,12 +226,6 @@ impl Stream for NetworkStream {
}
}
/// The result of processing a commit.
pub(crate) enum CommitProcessingOutcome {
Good,
Bad,
}
/// Bridge between the underlying network service, gossiping consensus messages and Grandpa
pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N,
@@ -235,21 +241,21 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
pub(crate) fn new(
service: N,
config: crate::Config,
set_state: Option<&crate::environment::VoterSetState<B>>,
set_state: crate::environment::SharedVoterSetState<B>,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
) -> (
Self,
impl futures::Future<Item = (), Error = ()> + Send + 'static,
) {
let (validator, report_stream) = GossipValidator::new(config);
let (validator, report_stream) = GossipValidator::new(config, set_state.clone());
let validator = Arc::new(validator);
service.register_validator(validator.clone());
if let Some(set_state) = set_state {
{
// register all previous votes with the gossip service so that they're
// available to peers potentially stuck on a previous round.
let completed = set_state.completed_rounds();
let completed = set_state.read().completed_rounds();
let (set_id, voters) = completed.set_info();
validator.note_set(SetId(set_id), voters.to_vec(), |_, _| {});
for round in completed.iter() {
@@ -422,8 +428,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
voters: Arc<VoterSet<AuthorityId>>,
is_voter: bool,
) -> (
impl Stream<Item = (u64, CompactCommit<B>, impl FnMut(CommitProcessingOutcome)), Error = Error>,
impl Sink<SinkItem = (u64, Commit<B>), SinkError = Error>,
impl Stream<Item = CommunicationIn<B>, Error = Error>,
impl Sink<SinkItem = CommunicationOut<B>, SinkError = Error>,
) {
self.validator.note_set(
set_id,
@@ -442,16 +448,123 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
self.validator.clone(),
);
let outgoing = outgoing.with(|out| {
let voter::CommunicationOut::Commit(round, commit) = out;
Ok((round, commit))
});
(incoming, outgoing)
}
}
fn incoming_global<B: BlockT, N: Network<B>>(
service: N,
mut service: N,
topic: B::Hash,
voters: Arc<VoterSet<AuthorityId>>,
gossip_validator: Arc<GossipValidator<B>>,
) -> impl Stream<Item = (u64, CompactCommit<B>, impl FnMut(CommitProcessingOutcome)), Error = Error> {
) -> impl Stream<Item = CommunicationIn<B>, Error = Error> {
let process_commit = move |
msg: FullCommitMessage<B>,
mut notification: network_gossip::TopicNotification,
service: &mut N,
gossip_validator: &Arc<GossipValidator<B>>,
voters: &VoterSet<AuthorityId>,
| {
let precommits_signed_by: Vec<String> =
msg.message.auth_data.iter().map(move |(_, a)| {
format!("{}", a)
}).collect();
telemetry!(CONSENSUS_INFO; "afg.received_commit";
"contains_precommits_signed_by" => ?precommits_signed_by,
"target_number" => ?msg.message.target_number.clone(),
"target_hash" => ?msg.message.target_hash.clone(),
);
if let Err(cost) = check_compact_commit::<B>(
&msg.message,
voters,
msg.round,
msg.set_id,
) {
if let Some(who) = notification.sender {
service.report(who, cost);
}
return None;
}
let round = msg.round.0;
let commit = msg.message;
let finalized_number = commit.target_number;
let gossip_validator = gossip_validator.clone();
let service = service.clone();
let cb = move |outcome| match outcome {
voter::CommitProcessingOutcome::Good(_) => {
// if it checks out, gossip it. not accounting for
// any discrepancy between the actual ghost and the claimed
// finalized number.
gossip_validator.note_commit_finalized(
finalized_number,
|to, neighbor_msg| service.send_message(
to,
GossipMessage::<B>::from(neighbor_msg).encode(),
),
);
service.gossip_message(topic, notification.message.clone(), false);
}
voter::CommitProcessingOutcome::Bad(_) => {
// report peer and do not gossip.
if let Some(who) = notification.sender.take() {
service.report(who, cost::INVALID_COMMIT);
}
}
};
let cb = voter::Callback::Work(Box::new(cb));
Some(voter::CommunicationIn::Commit(round, commit, cb))
};
let process_catch_up = move |
msg: FullCatchUpMessage<B>,
mut notification: network_gossip::TopicNotification,
service: &mut N,
gossip_validator: &Arc<GossipValidator<B>>,
voters: &VoterSet<AuthorityId>,
| {
let gossip_validator = gossip_validator.clone();
let service = service.clone();
if let Err(cost) = check_catch_up::<B>(
&msg.message,
voters,
msg.set_id,
) {
if let Some(who) = notification.sender {
service.report(who, cost);
}
return None;
}
let cb = move |outcome| {
if let voter::CatchUpProcessingOutcome::Bad(_) = outcome {
// report peer
if let Some(who) = notification.sender.take() {
service.report(who, cost::INVALID_CATCH_UP);
}
}
gossip_validator.note_catch_up_message_processed();
};
let cb = voter::Callback::Work(Box::new(cb));
Some(voter::CommunicationIn::CatchUp(msg.message, cb))
};
service.messages_for(topic)
.filter_map(|notification| {
// this could be optimized by decoding piecewise.
@@ -463,66 +576,16 @@ fn incoming_global<B: BlockT, N: Network<B>>(
})
.filter_map(move |(notification, msg)| {
match msg {
GossipMessage::Commit(msg) => {
let precommits_signed_by: Vec<String> =
msg.message.auth_data.iter().map(move |(_, a)| {
format!("{}", a)
}).collect();
telemetry!(CONSENSUS_INFO; "afg.received_commit";
"contains_precommits_signed_by" => ?precommits_signed_by,
"target_number" => ?msg.message.target_number.clone(),
"target_hash" => ?msg.message.target_hash.clone(),
);
if let Err(cost) = check_compact_commit::<B>(
&msg.message,
&*voters,
msg.round,
msg.set_id,
) {
if let Some(who) = notification.sender {
service.report(who, cost);
}
None
} else {
Some((msg, notification, service.clone()))
}
},
GossipMessage::Commit(msg) =>
process_commit(msg, notification, &mut service, &gossip_validator, &*voters),
GossipMessage::CatchUp(msg) =>
process_catch_up(msg, notification, &mut service, &gossip_validator, &*voters),
_ => {
debug!(target: "afg", "Skipping unknown message type");
return None;
}
}
})
.map(move |(msg, mut notification, service)| {
let round = msg.round.0;
let commit = msg.message;
let finalized_number = commit.target_number;
let gossip_validator = gossip_validator.clone();
let cb = move |outcome| match outcome {
CommitProcessingOutcome::Good => {
// if it checks out, gossip it. not accounting for
// any discrepancy between the actual ghost and the claimed
// finalized number.
gossip_validator.note_commit_finalized(
finalized_number,
|to, neighbor_msg| service.send_message(
to,
GossipMessage::<B>::from(neighbor_msg).encode(),
),
);
service.gossip_message(topic, notification.message.clone(), false);
}
CommitProcessingOutcome::Bad => {
// report peer and do not gossip.
if let Some(who) = notification.sender.take() {
service.report(who, cost::INVALID_COMMIT);
}
}
};
(round, commit, cb)
})
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
}
@@ -657,7 +720,8 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
}
}
// checks a compact commit. returns `None` if it was bad and
// checks a compact commit. returns the cost associated with processing it if
// the commit was bad.
fn check_compact_commit<Block: BlockT>(
msg: &CompactCommit<Block>,
voters: &VoterSet<AuthorityId>,
@@ -716,6 +780,114 @@ fn check_compact_commit<Block: BlockT>(
Ok(())
}
// checks a catch up. returns the cost associated with processing it if
// the catch up was bad.
fn check_catch_up<Block: BlockT>(
msg: &CatchUp<Block>,
voters: &VoterSet<AuthorityId>,
set_id: SetId,
) -> Result<(), i32> {
// 4f + 1 = equivocations from f voters.
let f = voters.total_weight() - voters.threshold();
let full_threshold = voters.total_weight() + f;
// check total weight is not out of range for a set of votes.
fn check_weight<'a>(
voters: &'a VoterSet<AuthorityId>,
votes: impl Iterator<Item=&'a AuthorityId>,
full_threshold: u64,
) -> Result<(), i32> {
let mut total_weight = 0;
for id in votes {
if let Some(weight) = voters.info(&id).map(|info| info.weight()) {
total_weight += weight;
if total_weight > full_threshold {
return Err(cost::MALFORMED_CATCH_UP);
}
} else {
debug!(target: "afg", "Skipping catch up message containing unknown voter {}", id);
return Err(cost::MALFORMED_CATCH_UP);
}
}
if total_weight < voters.threshold() {
return Err(cost::MALFORMED_CATCH_UP);
}
Ok(())
};
check_weight(
voters,
msg.prevotes.iter().map(|vote| &vote.id),
full_threshold,
)?;
check_weight(
voters,
msg.precommits.iter().map(|vote| &vote.id),
full_threshold,
)?;
fn check_signatures<'a, B, I>(
messages: I,
round: u64,
set_id: u64,
mut signatures_checked: usize,
) -> Result<usize, i32> where
B: BlockT,
I: Iterator<Item=(Message<B>, &'a AuthorityId, &'a AuthoritySignature)>,
{
use crate::communication::gossip::Misbehavior;
for (msg, id, sig) in messages {
signatures_checked += 1;
if let Err(()) = check_message_sig::<B>(
&msg,
id,
sig,
round,
set_id,
) {
debug!(target: "afg", "Bad catch up message signature {}", id);
telemetry!(CONSENSUS_DEBUG; "afg.bad_catch_up_msg_signature"; "id" => ?id);
let cost = Misbehavior::BadCatchUpMessage {
signatures_checked: signatures_checked as i32,
}.cost();
return Err(cost);
}
}
Ok(signatures_checked)
}
// check signatures on all contained prevotes.
let signatures_checked = check_signatures::<Block, _>(
msg.prevotes.iter().map(|vote| {
(grandpa::Message::Prevote(vote.prevote.clone()), &vote.id, &vote.signature)
}),
msg.round_number,
set_id.0,
0,
)?;
// check signatures on all contained precommits.
let _ = check_signatures::<Block, _>(
msg.precommits.iter().map(|vote| {
(grandpa::Message::Precommit(vote.precommit.clone()), &vote.id, &vote.signature)
}),
msg.round_number,
set_id.0,
signatures_checked,
)?;
Ok(())
}
/// An output sink for commit messages.
struct CommitsOut<Block: BlockT, N: Network<Block>> {
network: N,
@@ -26,6 +26,7 @@ use std::sync::Arc;
use keyring::AuthorityKeyring;
use parity_codec::Encode;
use crate::environment::SharedVoterSetState;
use super::gossip::{self, GossipValidator};
use super::{AuthorityId, VoterSet, Round, SetId};
@@ -92,6 +93,18 @@ impl super::Network<Block> for TestNetwork {
}
}
impl network_gossip::ValidatorContext<Block> for TestNetwork {
fn broadcast_topic(&mut self, _: Hash, _: bool) { }
fn broadcast_message(&mut self, _: Hash, _: Vec<u8>, _: bool) { }
fn send_message(&mut self, who: &network::PeerId, data: Vec<u8>) {
<Self as super::Network<Block>>::send_message(self, vec![who.clone()], data);
}
fn send_topic(&mut self, _: &network::PeerId, _: Hash, _: bool) { }
}
struct Tester {
net_handle: super::NetworkBridge<Block, TestNetwork>,
gossip_validator: Arc<GossipValidator<Block>>,
@@ -125,8 +138,38 @@ fn config() -> crate::Config {
}
}
// dummy voter set state
fn voter_set_state() -> SharedVoterSetState<Block> {
use crate::authorities::AuthoritySet;
use crate::environment::{CompletedRound, CompletedRounds, HasVoted, VoterSetState};
use grandpa::round::State as RoundState;
use substrate_primitives::H256;
let state = RoundState::genesis((H256::zero(), 0));
let base = state.prevote_ghost.unwrap();
let voters = AuthoritySet::genesis(Vec::new());
let set_state = VoterSetState::Live {
completed_rounds: CompletedRounds::new(
CompletedRound {
state,
number: 0,
votes: Vec::new(),
base,
},
0,
&voters,
),
current_round: HasVoted::No,
};
set_state.into()
}
// needs to run in a tokio runtime.
fn make_test_network() -> impl Future<Item=Tester,Error=()> {
fn make_test_network() -> (
impl Future<Item=Tester,Error=()>,
TestNetwork,
) {
let (tx, rx) = mpsc::unbounded();
let net = TestNetwork { sender: tx };
@@ -145,15 +188,18 @@ fn make_test_network() -> impl Future<Item=Tester,Error=()> {
let (bridge, startup_work) = super::NetworkBridge::new(
net.clone(),
config(),
None,
voter_set_state(),
Exit,
);
startup_work.map(move |()| Tester {
gossip_validator: bridge.validator.clone(),
net_handle: bridge,
events: rx,
})
(
startup_work.map(move |()| Tester {
gossip_validator: bridge.validator.clone(),
net_handle: bridge,
events: rx,
}),
net,
)
}
fn make_ids(keys: &[AuthorityKeyring]) -> Vec<(AuthorityId, u64)> {
@@ -217,7 +263,7 @@ fn good_commit_leads_to_relay() {
let id = network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);
let test = make_test_network()
let test = make_test_network().0
.and_then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL);
@@ -228,7 +274,7 @@ fn good_commit_leads_to_relay() {
let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false);
{
let (action, _) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]);
let (action, ..) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]);
match action {
gossip::Action::ProcessAndDiscard(t, _) => assert_eq!(t, global_topic),
_ => panic!("wrong expected outcome from initial commit validation"),
@@ -257,8 +303,12 @@ fn good_commit_leads_to_relay() {
// when the commit comes in, we'll tell the callback it was good.
let handle_commit = commits_in.into_future()
.map(|(item, _)| {
let (_, _, mut callback) = item.unwrap();
(callback)(super::CommitProcessingOutcome::Good);
match item.unwrap() {
grandpa::voter::CommunicationIn::Commit(_, _, mut callback) => {
callback.run(grandpa::voter::CommitProcessingOutcome::good());
},
_ => panic!("commit expected"),
}
})
.map_err(|_| panic!("could not process commit"));
@@ -328,7 +378,7 @@ fn bad_commit_leads_to_report() {
let id = network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);
let test = make_test_network()
let test = make_test_network().0
.and_then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL);
@@ -339,7 +389,7 @@ fn bad_commit_leads_to_report() {
let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false);
{
let (action, _) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]);
let (action, ..) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]);
match action {
gossip::Action::ProcessAndDiscard(t, _) => assert_eq!(t, global_topic),
_ => panic!("wrong expected outcome from initial commit validation"),
@@ -368,8 +418,12 @@ fn bad_commit_leads_to_report() {
// when the commit comes in, we'll tell the callback it was good.
let handle_commit = commits_in.into_future()
.map(|(item, _)| {
let (_, _, mut callback) = item.unwrap();
(callback)(super::CommitProcessingOutcome::Bad);
match item.unwrap() {
grandpa::voter::CommunicationIn::Commit(_, _, mut callback) => {
callback.run(grandpa::voter::CommitProcessingOutcome::bad());
},
_ => panic!("commit expected"),
}
})
.map_err(|_| panic!("could not process commit"));
@@ -393,3 +447,61 @@ fn bad_commit_leads_to_report() {
current_thread::block_on_all(test).unwrap();
}
#[test]
fn peer_with_higher_view_leads_to_catch_up_request() {
let id = network::PeerId::random();
let (tester, mut net) = make_test_network();
let test = tester
.and_then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL);
Ok((tester, id))
})
.and_then(move |(tester, id)| {
// send neighbor message at round 10 and height 50
let result = tester.gossip_validator.validate(
&mut net,
&id,
&gossip::GossipMessage::<Block>::from(gossip::NeighborPacket {
set_id: SetId(0),
round: Round(10),
commit_finalized_height: 50,
}).encode(),
);
// neighbor packets are always discard
match result {
network_gossip::ValidationResult::Discard => {},
_ => panic!("wrong expected outcome from neighbor validation"),
}
// a catch up request should be sent to the peer for round - 1
tester.filter_network_events(move |event| match event {
Event::SendMessage(peers, message) => {
assert_eq!(
peers,
vec![id.clone()],
);
assert_eq!(
message,
gossip::GossipMessage::<Block>::CatchUpRequest(
gossip::CatchUpRequestMessage {
set_id: SetId(0),
round: Round(9),
}
).encode(),
);
true
},
_ => false,
})
.map_err(|_| panic!("could not watch for peer send message"))
.map(|_| ())
});
current_thread::block_on_all(test).unwrap();
}