// This file is part of Substrate. // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see . //! Gossip and politeness for polite-grandpa. //! //! This module implements the following message types: //! #### Neighbor Packet //! //! The neighbor packet is sent to only our neighbors. It contains this information //! //! - Current Round //! - Current voter set ID //! - Last finalized hash from commit messages. //! //! If a peer is at a given voter set, it is impolite to send messages from //! an earlier voter set. It is extremely impolite to send messages //! from a future voter set. "future-set" messages can be dropped and ignored. //! //! If a peer is at round r, is impolite to send messages about r-2 or earlier and extremely //! impolite to send messages about r+1 or later. "future-round" messages can //! be dropped and ignored. //! //! It is impolite to send a neighbor packet which moves backwards or does not progress //! protocol state. //! //! This is beneficial if it conveys some progress in the protocol state of the peer. //! //! #### Prevote / Precommit //! //! These are votes within a round. Noting that we receive these messages //! from our peers who are not necessarily voters, we have to account the benefit //! based on what they might have seen. //! //! #### Propose //! //! This is a broadcast by a known voter of the last-round estimate. //! //! #### Commit //! //! These are used to announce past agreement of finality. //! //! It is impolite to send commits which are earlier than the last commit //! sent. It is especially impolite to send commits which are invalid, or from //! a different Set ID than the receiving peer has indicated. //! //! Sending a commit is polite when it may finalize something that the receiving peer //! was not aware of. //! //! #### Catch Up //! //! These allow a peer to request another peer, which they perceive to be in a //! later round, to provide all the votes necessary to complete a given round //! `R`. //! //! It is impolite to send a catch up request for a round `R` to a peer whose //! announced view is behind `R`. It is also impolite to send a catch up request //! to a peer in a new different Set ID. //! //! The logic for issuing and tracking pending catch up requests is implemented //! in the `GossipValidator`. A catch up request is issued anytime we see a //! neighbor packet from a peer at a round `CATCH_UP_THRESHOLD` higher than at //! we are. //! //! ## Expiration //! //! We keep some amount of recent rounds' messages, but do not accept new ones from rounds //! older than our current_round - 1. //! //! ## Message Validation //! //! We only send polite messages to peers, use ahash::{AHashMap, AHashSet}; use log::{debug, trace}; use parity_scale_codec::{Decode, Encode}; use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64}; use rand::seq::SliceRandom; use sc_network::{PeerId, ReputationChange}; use sc_network_common::protocol::role::ObservedRole; use sc_network_gossip::{MessageIntent, ValidatorContext}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_finality_grandpa::AuthorityId; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; use super::{benefit, cost, Round, SetId, NEIGHBOR_REBROADCAST_PERIOD}; use crate::{environment, CatchUp, CompactCommit, SignedMessage}; use std::{ collections::{HashSet, VecDeque}, time::{Duration, Instant}, }; const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5); const CATCH_UP_REQUEST_TIMEOUT: Duration = Duration::from_secs(45); const CATCH_UP_PROCESS_TIMEOUT: Duration = Duration::from_secs(30); /// Maximum number of rounds we are behind a peer before issuing a /// catch up request. const CATCH_UP_THRESHOLD: u64 = 2; /// The total round duration measured in periods of gossip duration: /// 2 gossip durations for prevote timer /// 2 gossip durations for precommit timer /// 1 gossip duration for precommits to spread const ROUND_DURATION: u32 = 5; /// The period, measured in rounds, since the latest round start, after which we will start /// propagating gossip messages to more nodes than just the lucky ones. const PROPAGATION_SOME: f32 = 1.5; /// The period, measured in rounds, since the latest round start, after which we will start /// propagating gossip messages to all the nodes we are connected to. const PROPAGATION_ALL: f32 = 3.0; /// Assuming a network of 3000 nodes, using a fanout of 4, after about 6 iterations /// of gossip a message has very likely reached all nodes on the network (`log4(3000)`). const LUCKY_PEERS: usize = 4; type Report = (PeerId, ReputationChange); /// An outcome of examining a message. #[derive(Debug, PartialEq, Clone, Copy)] enum Consider { /// Accept the message. Accept, /// Message is too early. Reject. RejectPast, /// Message is from the future. Reject. RejectFuture, /// Message cannot be evaluated. Reject. RejectOutOfScope, } /// A view of protocol state. #[derive(Debug)] struct View { round: Round, // the current round we are at. set_id: SetId, // the current voter set id. last_commit: Option, // commit-finalized block height, if any. last_update: Option, // last time we heard from peer, used for spamming detection. } impl Default for View { fn default() -> Self { View { round: Round(1), set_id: SetId(0), last_commit: None, last_update: None } } } impl View { /// Consider a round and set ID combination under a current view. fn consider_vote(&self, round: Round, set_id: SetId) -> Consider { // only from current set if set_id < self.set_id { return Consider::RejectPast } if set_id > self.set_id { return Consider::RejectFuture } // only r-1 ... r+1 if round.0 > self.round.0.saturating_add(1) { return Consider::RejectFuture } if round.0 < self.round.0.saturating_sub(1) { return Consider::RejectPast } Consider::Accept } /// Consider a set-id global message. Rounds are not taken into account, but are implicitly /// because we gate on finalization of a further block than a previous commit. fn consider_global(&self, set_id: SetId, number: N) -> Consider { // only from current set if set_id < self.set_id { return Consider::RejectPast } if set_id > self.set_id { return Consider::RejectFuture } // only commits which claim to prove a higher block number than // the one we're aware of. match self.last_commit { None => Consider::Accept, Some(ref num) => if num < &number { Consider::Accept } else { Consider::RejectPast }, } } } /// A local view of protocol state. Similar to `View` but we additionally track /// the round and set id at which the last commit was observed, and the instant /// at which the current round started. struct LocalView { round: Round, set_id: SetId, last_commit: Option<(N, Round, SetId)>, round_start: Instant, } impl LocalView { /// Creates a new `LocalView` at the given set id and round. fn new(set_id: SetId, round: Round) -> LocalView { LocalView { set_id, round, last_commit: None, round_start: Instant::now() } } /// Converts the local view to a `View` discarding round and set id /// information about the last commit. fn as_view(&self) -> View<&N> { View { round: self.round, set_id: self.set_id, last_commit: self.last_commit_height(), last_update: None, } } /// Update the set ID. implies a reset to round 1. fn update_set(&mut self, set_id: SetId) { if set_id != self.set_id { self.set_id = set_id; self.round = Round(1); self.round_start = Instant::now(); } } /// Updates the current round. fn update_round(&mut self, round: Round) { self.round = round; self.round_start = Instant::now(); } /// Returns the height of the block that the last observed commit finalizes. fn last_commit_height(&self) -> Option<&N> { self.last_commit.as_ref().map(|(number, _, _)| number) } } const KEEP_RECENT_ROUNDS: usize = 3; /// Tracks gossip topics that we are keeping messages for. We keep topics of: /// /// - the last `KEEP_RECENT_ROUNDS` complete GRANDPA rounds, /// /// - the topic for the current and next round, /// /// - and a global topic for commit and catch-up messages. struct KeepTopics { current_set: SetId, rounds: VecDeque<(Round, SetId)>, reverse_map: AHashMap, SetId)>, } impl KeepTopics { fn new() -> Self { KeepTopics { current_set: SetId(0), rounds: VecDeque::with_capacity(KEEP_RECENT_ROUNDS + 2), reverse_map: Default::default(), } } fn push(&mut self, round: Round, set_id: SetId) { self.current_set = std::cmp::max(self.current_set, set_id); // under normal operation the given round is already tracked (since we // track one round ahead). if we skip rounds (with a catch up) the given // round topic might not be tracked yet. if !self.rounds.contains(&(round, set_id)) { self.rounds.push_back((round, set_id)); } // we also accept messages for the next round self.rounds.push_back((Round(round.0.saturating_add(1)), set_id)); // the 2 is for the current and next round. while self.rounds.len() > KEEP_RECENT_ROUNDS + 2 { let _ = self.rounds.pop_front(); } let mut map = AHashMap::with_capacity(KEEP_RECENT_ROUNDS + 3); map.insert(super::global_topic::(self.current_set.0), (None, self.current_set)); for &(round, set) in &self.rounds { map.insert(super::round_topic::(round.0, set.0), (Some(round), set)); } self.reverse_map = map; } fn topic_info(&self, topic: &B::Hash) -> Option<(Option, SetId)> { self.reverse_map.get(topic).cloned() } } // topics to send to a neighbor based on their view. fn neighbor_topics(view: &View>) -> Vec { let s = view.set_id; let mut topics = vec![super::global_topic::(s.0), super::round_topic::(view.round.0, s.0)]; if view.round.0 != 0 { let r = Round(view.round.0 - 1); topics.push(super::round_topic::(r.0, s.0)) } topics } /// Grandpa gossip message type. /// This is the root type that gets encoded and sent on the network. #[derive(Debug, Encode, Decode)] pub(super) enum GossipMessage { /// Grandpa message with round and set info. Vote(VoteMessage), /// Grandpa commit message with round and set info. Commit(FullCommitMessage), /// A neighbor packet. Not repropagated. Neighbor(VersionedNeighborPacket>), /// Grandpa catch up request message with round and set info. Not repropagated. CatchUpRequest(CatchUpRequestMessage), /// Grandpa catch up message with round and set info. Not repropagated. CatchUp(FullCatchUpMessage), } impl From>> for GossipMessage { fn from(neighbor: NeighborPacket>) -> Self { GossipMessage::Neighbor(VersionedNeighborPacket::V1(neighbor)) } } /// Network level vote message with topic information. #[derive(Debug, Encode, Decode)] pub(super) struct VoteMessage { /// The round this message is from. pub(super) round: Round, /// The voter set ID this message is from. pub(super) set_id: SetId, /// The message itself. pub(super) message: SignedMessage, } /// Network level commit message with topic information. #[derive(Debug, Encode, Decode)] pub(super) struct FullCommitMessage { /// The round this message is from. pub(super) round: Round, /// The voter set ID this message is from. pub(super) set_id: SetId, /// The compact commit message. pub(super) message: CompactCommit, } /// V1 neighbor packet. Neighbor packets are sent from nodes to their peers /// and are not repropagated. These contain information about the node's state. #[derive(Debug, Encode, Decode, Clone)] pub(super) struct NeighborPacket { /// The round the node is currently at. pub(super) round: Round, /// The set ID the node is currently at. pub(super) set_id: SetId, /// The highest finalizing commit observed. pub(super) commit_finalized_height: N, } /// A versioned neighbor packet. #[derive(Debug, Encode, Decode)] pub(super) enum VersionedNeighborPacket { #[codec(index = 1)] V1(NeighborPacket), } impl VersionedNeighborPacket { fn into_neighbor_packet(self) -> NeighborPacket { match self { VersionedNeighborPacket::V1(p) => p, } } } /// A catch up request for a given round (or any further round) localized by set id. #[derive(Clone, Debug, Encode, Decode)] pub(super) struct CatchUpRequestMessage { /// The round that we want to catch up to. pub(super) round: Round, /// The voter set ID this message is from. pub(super) set_id: SetId, } /// Network level catch up message with topic information. #[derive(Debug, Encode, Decode)] pub(super) struct FullCatchUpMessage { /// The voter set ID this message is from. pub(super) set_id: SetId, /// The compact commit message. pub(super) message: CatchUp, } /// Misbehavior that peers can perform. /// /// `cost` gives a cost that can be used to perform cost/benefit analysis of a /// peer. #[derive(Clone, Copy, Debug, PartialEq)] pub(super) enum Misbehavior { // invalid neighbor message, considering the last one. InvalidViewChange, // duplicate neighbor message. DuplicateNeighborMessage, // could not decode neighbor message. bytes-length of the packet. UndecodablePacket(i32), // Bad catch up message (invalid signatures). BadCatchUpMessage { signatures_checked: i32 }, // Bad commit message BadCommitMessage { signatures_checked: i32, blocks_loaded: i32, equivocations_caught: i32 }, // A message received that's from the future relative to our view. // always misbehavior. FutureMessage, // A message received that cannot be evaluated relative to our view. // This happens before we have a view and have sent out neighbor packets. // always misbehavior. OutOfScopeMessage, } impl Misbehavior { pub(super) fn cost(&self) -> ReputationChange { use Misbehavior::*; match *self { InvalidViewChange => cost::INVALID_VIEW_CHANGE, DuplicateNeighborMessage => cost::DUPLICATE_NEIGHBOR_MESSAGE, UndecodablePacket(bytes) => ReputationChange::new( bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE), "Grandpa: Bad packet", ), BadCatchUpMessage { signatures_checked } => ReputationChange::new( cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked), "Grandpa: Bad cath-up message", ), BadCommitMessage { signatures_checked, blocks_loaded, equivocations_caught } => { let cost = cost::PER_SIGNATURE_CHECKED .saturating_mul(signatures_checked) .saturating_add(cost::PER_BLOCK_LOADED.saturating_mul(blocks_loaded)); let benefit = equivocations_caught.saturating_mul(benefit::PER_EQUIVOCATION); ReputationChange::new( (benefit as i32).saturating_add(cost as i32), "Grandpa: Bad commit", ) }, FutureMessage => cost::FUTURE_MESSAGE, OutOfScopeMessage => cost::OUT_OF_SCOPE_MESSAGE, } } } #[derive(Debug)] struct PeerInfo { view: View, roles: ObservedRole, } impl PeerInfo { fn new(roles: ObservedRole) -> Self { PeerInfo { view: View::default(), roles } } } /// The peers we're connected to in gossip. struct Peers { inner: AHashMap>, /// The randomly picked set of `LUCKY_PEERS` we'll gossip to in the first stage of round /// gossiping. first_stage_peers: AHashSet, /// The randomly picked set of peers we'll gossip to in the second stage of gossiping if the /// first stage didn't allow us to spread the voting data enough to conclude the round. This /// set should have size `sqrt(connected_peers)`. second_stage_peers: HashSet, /// The randomly picked set of `LUCKY_PEERS` light clients we'll gossip commit messages to. lucky_light_peers: HashSet, /// Neighbor packet rebroadcast period --- we reduce the reputation of peers sending duplicate /// packets too often. neighbor_rebroadcast_period: Duration, } impl Peers { fn new(neighbor_rebroadcast_period: Duration) -> Self { Peers { inner: Default::default(), first_stage_peers: Default::default(), second_stage_peers: Default::default(), lucky_light_peers: Default::default(), neighbor_rebroadcast_period, } } fn new_peer(&mut self, who: PeerId, role: ObservedRole) { match role { ObservedRole::Authority if self.first_stage_peers.len() < LUCKY_PEERS => { self.first_stage_peers.insert(who); }, ObservedRole::Authority if self.second_stage_peers.len() < LUCKY_PEERS => { self.second_stage_peers.insert(who); }, ObservedRole::Light if self.lucky_light_peers.len() < LUCKY_PEERS => { self.lucky_light_peers.insert(who); }, _ => {}, } self.inner.insert(who, PeerInfo::new(role)); } fn peer_disconnected(&mut self, who: &PeerId) { self.inner.remove(who); // This does not happen often enough compared to round duration, // so we don't reshuffle. self.first_stage_peers.remove(who); self.second_stage_peers.remove(who); self.lucky_light_peers.remove(who); } // returns a reference to the new view, if the peer is known. fn update_peer_state( &mut self, who: &PeerId, update: NeighborPacket, ) -> Result>, Misbehavior> { let peer = match self.inner.get_mut(who) { None => return Ok(None), Some(p) => p, }; let invalid_change = peer.view.set_id > update.set_id || peer.view.round > update.round && peer.view.set_id == update.set_id || peer.view.last_commit.as_ref() > Some(&update.commit_finalized_height); if invalid_change { return Err(Misbehavior::InvalidViewChange) } let now = Instant::now(); let duplicate_packet = (update.set_id, update.round, Some(&update.commit_finalized_height)) == (peer.view.set_id, peer.view.round, peer.view.last_commit.as_ref()); if duplicate_packet { if let Some(last_update) = peer.view.last_update { if now < last_update + self.neighbor_rebroadcast_period / 2 { return Err(Misbehavior::DuplicateNeighborMessage) } } } peer.view = View { round: update.round, set_id: update.set_id, last_commit: Some(update.commit_finalized_height), last_update: Some(now), }; trace!(target: "afg", "Peer {} updated view. Now at {:?}, {:?}", who, peer.view.round, peer.view.set_id); Ok(Some(&peer.view)) } fn update_commit_height(&mut self, who: &PeerId, new_height: N) -> Result<(), Misbehavior> { let peer = match self.inner.get_mut(who) { None => return Ok(()), Some(p) => p, }; // this doesn't allow a peer to send us unlimited commits with the // same height, because there is still a misbehavior condition based on // sending commits that are <= the best we are aware of. if peer.view.last_commit.as_ref() > Some(&new_height) { return Err(Misbehavior::InvalidViewChange) } peer.view.last_commit = Some(new_height); Ok(()) } fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo> { self.inner.get(who) } fn reshuffle(&mut self) { // we want to randomly select peers into three sets according to the following logic: // - first set: LUCKY_PEERS random peers where at least LUCKY_PEERS/2 are authorities // (unless // we're not connected to that many authorities) // - second set: max(LUCKY_PEERS, sqrt(peers)) peers where at least LUCKY_PEERS are // authorities. // - third set: LUCKY_PEERS random light client peers let shuffled_peers = { let mut peers = self.inner.iter().map(|(peer_id, info)| (*peer_id, info)).collect::>(); peers.shuffle(&mut rand::thread_rng()); peers }; let shuffled_authorities = shuffled_peers.iter().filter_map(|(peer_id, info)| { if matches!(info.roles, ObservedRole::Authority) { Some(peer_id) } else { None } }); let mut first_stage_peers = AHashSet::new(); let mut second_stage_peers = HashSet::new(); // we start by allocating authorities to the first stage set and when the minimum of // `LUCKY_PEERS / 2` is filled we start allocating to the second stage set. let half_lucky = LUCKY_PEERS / 2; let one_and_a_half_lucky = LUCKY_PEERS + half_lucky; for (n_authorities_added, peer_id) in shuffled_authorities.enumerate() { if n_authorities_added < half_lucky { first_stage_peers.insert(*peer_id); } else if n_authorities_added < one_and_a_half_lucky { second_stage_peers.insert(*peer_id); } else { break } } // fill up first and second sets with remaining peers (either full or authorities) // prioritizing filling the first set over the second. let n_second_stage_peers = LUCKY_PEERS.max((shuffled_peers.len() as f32).sqrt() as usize); for (peer_id, info) in &shuffled_peers { if info.roles.is_light() { continue } if first_stage_peers.len() < LUCKY_PEERS { first_stage_peers.insert(*peer_id); second_stage_peers.remove(peer_id); } else if second_stage_peers.len() < n_second_stage_peers { if !first_stage_peers.contains(peer_id) { second_stage_peers.insert(*peer_id); } } else { break } } // pick `LUCKY_PEERS` random light peers let lucky_light_peers = shuffled_peers .into_iter() .filter_map(|(peer_id, info)| if info.roles.is_light() { Some(peer_id) } else { None }) .take(LUCKY_PEERS) .collect(); self.first_stage_peers = first_stage_peers; self.second_stage_peers = second_stage_peers; self.lucky_light_peers = lucky_light_peers; } } #[derive(Debug, PartialEq)] pub(super) enum Action { // repropagate under given topic, to the given peers, applying cost/benefit to originator. Keep(H, ReputationChange), // discard and process. ProcessAndDiscard(H, ReputationChange), // discard, applying cost/benefit to originator. Discard(ReputationChange), } /// State of catch up request handling. #[derive(Debug)] enum PendingCatchUp { /// No pending catch up requests. None, /// Pending catch up request which has not been answered yet. Requesting { who: PeerId, request: CatchUpRequestMessage, instant: Instant }, /// Pending catch up request that was answered and is being processed. Processing { instant: Instant }, } /// Configuration for the round catch-up mechanism. enum CatchUpConfig { /// Catch requests are enabled, our node will issue them whenever it sees a /// neighbor packet for a round further than `CATCH_UP_THRESHOLD`. If /// `only_from_authorities` is set, the node will only send catch-up /// requests to other authorities it is connected to. This is useful if the /// GRANDPA observer protocol is live on the network, in which case full /// nodes (non-authorities) don't have the necessary round data to answer /// catch-up requests. Enabled { only_from_authorities: bool }, /// Catch-up requests are disabled, our node will never issue them. This is /// useful for the GRANDPA observer mode, where we are only interested in /// commit messages and don't need to follow the full round protocol. Disabled, } impl CatchUpConfig { fn enabled(only_from_authorities: bool) -> CatchUpConfig { CatchUpConfig::Enabled { only_from_authorities } } fn disabled() -> CatchUpConfig { CatchUpConfig::Disabled } fn request_allowed(&self, peer: &PeerInfo) -> bool { match self { CatchUpConfig::Disabled => false, CatchUpConfig::Enabled { only_from_authorities, .. } => match peer.roles { ObservedRole::Authority => true, ObservedRole::Light => false, ObservedRole::Full => !only_from_authorities, }, } } } struct Inner { local_view: Option>>, peers: Peers>, live_topics: KeepTopics, authorities: Vec, config: crate::Config, next_rebroadcast: Instant, pending_catch_up: PendingCatchUp, catch_up_config: CatchUpConfig, } type MaybeMessage = Option<(Vec, NeighborPacket>)>; impl Inner { fn new(config: crate::Config) -> Self { let catch_up_config = if config.observer_enabled { if config.local_role.is_authority() { // since the observer protocol is enabled, we will only issue // catch-up requests if we are an authority (and only to other // authorities). CatchUpConfig::enabled(true) } else { // otherwise, we are running the observer protocol and don't // care about catch-up requests. CatchUpConfig::disabled() } } else { // if the observer protocol isn't enabled and we're not a light client, then any full // node should be able to answer catch-up requests. CatchUpConfig::enabled(false) }; Inner { local_view: None, peers: Peers::new(NEIGHBOR_REBROADCAST_PERIOD), live_topics: KeepTopics::new(), next_rebroadcast: Instant::now() + REBROADCAST_AFTER, authorities: Vec::new(), pending_catch_up: PendingCatchUp::None, catch_up_config, config, } } /// Note a round in the current set has started. Does nothing if the last /// call to the function was with the same `round`. fn note_round(&mut self, round: Round) -> MaybeMessage { { let local_view = match self.local_view { None => return None, Some(ref mut v) => if v.round == round { // Do not send neighbor packets out if `round` has not changed --- // such behavior is punishable. return None } else { v }, }; let set_id = local_view.set_id; debug!(target: "afg", "Voter {} noting beginning of round {:?} to network.", self.config.name(), (round, set_id)); local_view.update_round(round); self.live_topics.push(round, set_id); self.peers.reshuffle(); } self.multicast_neighbor_packet() } /// Note that a voter set with given ID has started. Does nothing if the last /// call to the function was with the same `set_id`. fn note_set(&mut self, set_id: SetId, authorities: Vec) -> MaybeMessage { { let local_view = match self.local_view { ref mut x @ None => x.get_or_insert(LocalView::new(set_id, Round(1))), Some(ref mut v) => if v.set_id == set_id { let diff_authorities = self.authorities.iter().collect::>() != authorities.iter().collect::>(); if diff_authorities { debug!(target: "afg", "Gossip validator noted set {:?} twice with different authorities. \ Was the authority set hard forked?", set_id, ); self.authorities = authorities; } // Do not send neighbor packets out if the `set_id` has not changed --- // such behavior is punishable. return None } else { v }, }; local_view.update_set(set_id); self.live_topics.push(Round(1), set_id); self.authorities = authorities; } self.multicast_neighbor_packet() } /// Note that we've imported a commit finalizing a given block. Does nothing if the last /// call to the function was with the same or higher `finalized` number. /// `set_id` & `round` are the ones the commit message is from. fn note_commit_finalized( &mut self, round: Round, set_id: SetId, finalized: NumberFor, ) -> MaybeMessage { { match self.local_view { None => return None, Some(ref mut v) => if v.last_commit_height() < Some(&finalized) { v.last_commit = Some((finalized, round, set_id)); } else { return None }, }; } self.multicast_neighbor_packet() } fn consider_vote(&self, round: Round, set_id: SetId) -> Consider { self.local_view .as_ref() .map(LocalView::as_view) .map(|v| v.consider_vote(round, set_id)) .unwrap_or(Consider::RejectOutOfScope) } fn consider_global(&self, set_id: SetId, number: NumberFor) -> Consider { self.local_view .as_ref() .map(LocalView::as_view) .map(|v| v.consider_global(set_id, &number)) .unwrap_or(Consider::RejectOutOfScope) } fn cost_past_rejection( &self, _who: &PeerId, _round: Round, _set_id: SetId, ) -> ReputationChange { // hardcoded for now. cost::PAST_REJECTION } fn validate_round_message( &self, who: &PeerId, full: &VoteMessage, ) -> Action { match self.consider_vote(full.round, full.set_id) { Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()), Consider::RejectOutOfScope => return Action::Discard(Misbehavior::OutOfScopeMessage.cost()), Consider::RejectPast => return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)), Consider::Accept => {}, } // ensure authority is part of the set. if !self.authorities.contains(&full.message.id) { debug!(target: "afg", "Message from unknown voter: {}", full.message.id); telemetry!( self.config.telemetry; CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id, ); return Action::Discard(cost::UNKNOWN_VOTER) } if !sp_finality_grandpa::check_message_signature( &full.message.message, &full.message.id, &full.message.signature, full.round.0, full.set_id.0, ) { debug!(target: "afg", "Bad message signature {}", full.message.id); telemetry!( self.config.telemetry; CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id, ); return Action::Discard(cost::BAD_SIGNATURE) } let topic = super::round_topic::(full.round.0, full.set_id.0); Action::Keep(topic, benefit::ROUND_MESSAGE) } fn validate_commit_message( &mut self, who: &PeerId, full: &FullCommitMessage, ) -> Action { if let Err(misbehavior) = self.peers.update_commit_height(who, full.message.target_number) { return Action::Discard(misbehavior.cost()) } match self.consider_global(full.set_id, full.message.target_number) { Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()), Consider::RejectPast => return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)), Consider::RejectOutOfScope => return Action::Discard(Misbehavior::OutOfScopeMessage.cost()), Consider::Accept => {}, } if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() { debug!(target: "afg", "Malformed compact commit"); telemetry!( self.config.telemetry; CONSENSUS_DEBUG; "afg.malformed_compact_commit"; "precommits_len" => ?full.message.precommits.len(), "auth_data_len" => ?full.message.auth_data.len(), "precommits_is_empty" => ?full.message.precommits.is_empty(), ); return Action::Discard(cost::MALFORMED_COMMIT) } // always discard commits initially and rebroadcast after doing full // checking. let topic = super::global_topic::(full.set_id.0); Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_COMMIT) } fn validate_catch_up_message( &mut self, who: &PeerId, full: &FullCatchUpMessage, ) -> Action { match &self.pending_catch_up { PendingCatchUp::Requesting { who: peer, request, instant } => { if peer != who { return Action::Discard(Misbehavior::OutOfScopeMessage.cost()) } if request.set_id != full.set_id { return Action::Discard(cost::MALFORMED_CATCH_UP) } if request.round.0 > full.message.round_number { return Action::Discard(cost::MALFORMED_CATCH_UP) } if full.message.prevotes.is_empty() || full.message.precommits.is_empty() { return Action::Discard(cost::MALFORMED_CATCH_UP) } // move request to pending processing state, we won't push out // any catch up requests until we import this one (either with a // success or failure). self.pending_catch_up = PendingCatchUp::Processing { instant: *instant }; // always discard catch up messages, they're point-to-point let topic = super::global_topic::(full.set_id.0); Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_CATCH_UP) }, _ => Action::Discard(Misbehavior::OutOfScopeMessage.cost()), } } fn note_catch_up_message_processed(&mut self) { match &self.pending_catch_up { PendingCatchUp::Processing { .. } => { self.pending_catch_up = PendingCatchUp::None; }, state => debug!(target: "afg", "Noted processed catch up message when state was: {:?}", state, ), } } fn handle_catch_up_request( &mut self, who: &PeerId, request: CatchUpRequestMessage, set_state: &environment::SharedVoterSetState, ) -> (Option>, Action) { let local_view = match self.local_view { None => return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())), Some(ref view) => view, }; if request.set_id != local_view.set_id { // NOTE: When we're close to a set change there is potentially a // race where the peer sent us the request before it observed that // we had transitioned to a new set. In this case we charge a lower // cost. if request.set_id.0.saturating_add(1) == local_view.set_id.0 && local_view.round.0.saturating_sub(CATCH_UP_THRESHOLD) == 0 { return (None, Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP)) } return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())) } match self.peers.peer(who) { None => return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())), Some(peer) if peer.view.round >= request.round => return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())), _ => {}, } let last_completed_round = set_state.read().last_completed_round(); if last_completed_round.number < request.round.0 { return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())) } trace!(target: "afg", "Replying to catch-up request for round {} from {} with round {}", request.round.0, who, last_completed_round.number, ); let mut prevotes = Vec::new(); let mut precommits = Vec::new(); // NOTE: the set of votes stored in `LastCompletedRound` is a minimal // set of votes, i.e. at most one equivocation is stored per voter. The // code below assumes this invariant is maintained when creating the // catch up reply since peers won't accept catch-up messages that have // too many equivocations (we exceed the fault-tolerance bound). for vote in last_completed_round.votes { match vote.message { finality_grandpa::Message::Prevote(prevote) => { prevotes.push(finality_grandpa::SignedPrevote { prevote, signature: vote.signature, id: vote.id, }); }, finality_grandpa::Message::Precommit(precommit) => { precommits.push(finality_grandpa::SignedPrecommit { precommit, signature: vote.signature, id: vote.id, }); }, _ => {}, } } let (base_hash, base_number) = last_completed_round.base; let catch_up = CatchUp:: { round_number: last_completed_round.number, prevotes, precommits, base_hash, base_number, }; let full_catch_up = GossipMessage::CatchUp::(FullCatchUpMessage { set_id: request.set_id, message: catch_up, }); (Some(full_catch_up), Action::Discard(cost::CATCH_UP_REPLY)) } fn try_catch_up(&mut self, who: &PeerId) -> (Option>, Option) { let mut catch_up = None; let mut report = None; // if the peer is on the same set and ahead of us by a margin bigger // than `CATCH_UP_THRESHOLD` then we should ask it for a catch up // message. we only send catch-up requests to authorities, observers // won't be able to reply since they don't follow the full GRANDPA // protocol and therefore might not have the vote data available. if let (Some(peer), Some(local_view)) = (self.peers.peer(who), &self.local_view) { if self.catch_up_config.request_allowed(peer) && peer.view.set_id == local_view.set_id && peer.view.round.0.saturating_sub(CATCH_UP_THRESHOLD) > local_view.round.0 { // send catch up request if allowed let round = peer.view.round.0 - 1; // peer.view.round is > 0 let request = CatchUpRequestMessage { set_id: peer.view.set_id, round: Round(round) }; let (catch_up_allowed, catch_up_report) = self.note_catch_up_request(who, &request); if catch_up_allowed { debug!(target: "afg", "Sending catch-up request for round {} to {}", round, who, ); catch_up = Some(GossipMessage::::CatchUpRequest(request)); } report = catch_up_report; } } (catch_up, report) } fn import_neighbor_message( &mut self, who: &PeerId, update: NeighborPacket>, ) -> (Vec, Action, Option>, Option) { let update_res = self.peers.update_peer_state(who, update); let (cost_benefit, topics) = match update_res { Ok(view) => (benefit::NEIGHBOR_MESSAGE, view.map(|view| neighbor_topics::(view))), Err(misbehavior) => (misbehavior.cost(), None), }; let (catch_up, report) = match update_res { Ok(_) => self.try_catch_up(who), _ => (None, None), }; let neighbor_topics = topics.unwrap_or_default(); // always discard neighbor messages, it's only valid for one hop. let action = Action::Discard(cost_benefit); (neighbor_topics, action, catch_up, report) } fn multicast_neighbor_packet(&self) -> MaybeMessage { self.local_view.as_ref().map(|local_view| { let packet = NeighborPacket { round: local_view.round, set_id: local_view.set_id, commit_finalized_height: *local_view.last_commit_height().unwrap_or(&Zero::zero()), }; let peers = self .peers .inner .iter() .filter_map(|(id, info)| { // light clients don't participate in the full GRANDPA voter protocol // and therefore don't need to be informed about view updates if info.roles.is_light() { None } else { Some(id) } }) .cloned() .collect(); (peers, packet) }) } fn note_catch_up_request( &mut self, who: &PeerId, catch_up_request: &CatchUpRequestMessage, ) -> (bool, Option) { let report = match &self.pending_catch_up { PendingCatchUp::Requesting { who: peer, instant, .. } => { if instant.elapsed() <= CATCH_UP_REQUEST_TIMEOUT { return (false, None) } else { // report peer for timeout Some((*peer, cost::CATCH_UP_REQUEST_TIMEOUT)) } }, PendingCatchUp::Processing { instant, .. } => { if instant.elapsed() < CATCH_UP_PROCESS_TIMEOUT { return (false, None) } else { None } }, _ => None, }; self.pending_catch_up = PendingCatchUp::Requesting { who: *who, request: catch_up_request.clone(), instant: Instant::now(), }; (true, report) } /// The initial logic for filtering round messages follows the given state /// transitions: /// /// - State 1: allowed to LUCKY_PEERS random peers (where at least LUCKY_PEERS/2 are /// authorities) /// - State 2: allowed to max(LUCKY_PEERS, sqrt(random peers)) (where at least LUCKY_PEERS are /// authorities) /// - State 3: allowed to all peers /// /// Transitions will be triggered on repropagation attempts by the underlying gossip layer. fn round_message_allowed(&self, who: &PeerId) -> bool { let round_duration = self.config.gossip_duration * ROUND_DURATION; let round_elapsed = match self.local_view { Some(ref local_view) => local_view.round_start.elapsed(), None => return false, }; if round_elapsed < round_duration.mul_f32(PROPAGATION_SOME) { self.peers.first_stage_peers.contains(who) } else if round_elapsed < round_duration.mul_f32(PROPAGATION_ALL) { self.peers.first_stage_peers.contains(who) || self.peers.second_stage_peers.contains(who) } else { self.peers.peer(who).map(|info| !info.roles.is_light()).unwrap_or(false) } } /// The initial logic for filtering global messages follows the given state /// transitions: /// /// - State 1: allowed to max(LUCKY_PEERS, sqrt(peers)) (where at least LUCKY_PEERS are /// authorities) /// - State 2: allowed to all peers /// /// We are more lenient with global messages since there should be a lot /// less global messages than round messages (just commits), and we want /// these to propagate to non-authorities fast enough so that they can /// observe finality. /// /// Transitions will be triggered on repropagation attempts by the /// underlying gossip layer, which should happen every 30 seconds. fn global_message_allowed(&self, who: &PeerId) -> bool { let round_duration = self.config.gossip_duration * ROUND_DURATION; let round_elapsed = match self.local_view { Some(ref local_view) => local_view.round_start.elapsed(), None => return false, }; if round_elapsed < round_duration.mul_f32(PROPAGATION_ALL) { self.peers.first_stage_peers.contains(who) || self.peers.second_stage_peers.contains(who) || self.peers.lucky_light_peers.contains(who) } else { true } } } // Prometheus metrics for [`GossipValidator`]. pub(crate) struct Metrics { messages_validated: CounterVec, } impl Metrics { pub(crate) fn register( registry: &prometheus_endpoint::Registry, ) -> Result { Ok(Self { messages_validated: register( CounterVec::new( Opts::new( "substrate_finality_grandpa_communication_gossip_validator_messages", "Number of messages validated by the finality grandpa gossip validator.", ), &["message", "action"], )?, registry, )?, }) } } /// A validator for GRANDPA gossip messages. pub(super) struct GossipValidator { inner: parking_lot::RwLock>, set_state: environment::SharedVoterSetState, report_sender: TracingUnboundedSender, metrics: Option, telemetry: Option, } impl GossipValidator { /// Create a new gossip-validator. The current set is initialized to 0. If /// `catch_up_enabled` is set to false then the validator will not issue any /// catch up requests (useful e.g. when running just the GRANDPA observer). pub(super) fn new( config: crate::Config, set_state: environment::SharedVoterSetState, prometheus_registry: Option<&Registry>, telemetry: Option, ) -> (GossipValidator, TracingUnboundedReceiver) { let metrics = match prometheus_registry.map(Metrics::register) { Some(Ok(metrics)) => Some(metrics), Some(Err(e)) => { debug!(target: "afg", "Failed to register metrics: {:?}", e); None }, None => None, }; let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator"); let val = GossipValidator { inner: parking_lot::RwLock::new(Inner::new(config)), set_state, report_sender: tx, metrics, telemetry, }; (val, rx) } /// Note a round in the current set has started. pub(super) fn note_round(&self, round: Round, send_neighbor: F) where F: FnOnce(Vec, NeighborPacket>), { let maybe_msg = self.inner.write().note_round(round); if let Some((to, msg)) = maybe_msg { send_neighbor(to, msg); } } /// Note that a voter set with given ID has started. Updates the current set to given /// value and initializes the round to 0. pub(super) fn note_set(&self, set_id: SetId, authorities: Vec, send_neighbor: F) where F: FnOnce(Vec, NeighborPacket>), { let maybe_msg = self.inner.write().note_set(set_id, authorities); if let Some((to, msg)) = maybe_msg { send_neighbor(to, msg); } } /// Note that we've imported a commit finalizing a given block. /// `set_id` & `round` are the ones the commit message is from and not necessarily /// the latest set ID & round started. pub(super) fn note_commit_finalized( &self, round: Round, set_id: SetId, finalized: NumberFor, send_neighbor: F, ) where F: FnOnce(Vec, NeighborPacket>), { let maybe_msg = self.inner.write().note_commit_finalized(round, set_id, finalized); if let Some((to, msg)) = maybe_msg { send_neighbor(to, msg); } } /// Note that we've processed a catch up message. pub(super) fn note_catch_up_message_processed(&self) { self.inner.write().note_catch_up_message_processed(); } fn report(&self, who: PeerId, cost_benefit: ReputationChange) { let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit }); } pub(super) fn do_validate( &self, who: &PeerId, mut data: &[u8], ) -> (Action, Vec, Option>) { let mut broadcast_topics = Vec::new(); let mut peer_reply = None; // Message name for Prometheus metric recording. let message_name; let action = { match GossipMessage::::decode(&mut data) { Ok(GossipMessage::Vote(ref message)) => { message_name = Some("vote"); self.inner.write().validate_round_message(who, message) }, Ok(GossipMessage::Commit(ref message)) => { message_name = Some("commit"); self.inner.write().validate_commit_message(who, message) }, Ok(GossipMessage::Neighbor(update)) => { message_name = Some("neighbor"); let (topics, action, catch_up, report) = self .inner .write() .import_neighbor_message(who, update.into_neighbor_packet()); if let Some((peer, cost_benefit)) = report { self.report(peer, cost_benefit); } broadcast_topics = topics; peer_reply = catch_up; action }, Ok(GossipMessage::CatchUp(ref message)) => { message_name = Some("catch_up"); self.inner.write().validate_catch_up_message(who, message) }, Ok(GossipMessage::CatchUpRequest(request)) => { message_name = Some("catch_up_request"); let (reply, action) = self.inner.write().handle_catch_up_request(who, request, &self.set_state); peer_reply = reply; action }, Err(e) => { message_name = None; debug!(target: "afg", "Error decoding message: {}", e); telemetry!( self.telemetry; CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => "", ); let len = std::cmp::min(i32::MAX as usize, data.len()) as i32; Action::Discard(Misbehavior::UndecodablePacket(len).cost()) }, } }; // Prometheus metric recording. if let (Some(metrics), Some(message_name)) = (&self.metrics, message_name) { let action_name = match action { Action::Keep(_, _) => "keep", Action::ProcessAndDiscard(_, _) => "process_and_discard", Action::Discard(_) => "discard", }; metrics.messages_validated.with_label_values(&[message_name, action_name]).inc(); } (action, broadcast_topics, peer_reply) } #[cfg(test)] fn inner(&self) -> &parking_lot::RwLock> { &self.inner } } impl sc_network_gossip::Validator for GossipValidator { fn new_peer( &self, context: &mut dyn ValidatorContext, who: &PeerId, roles: ObservedRole, ) { let packet = { let mut inner = self.inner.write(); inner.peers.new_peer(*who, roles); inner.local_view.as_ref().map(|v| NeighborPacket { round: v.round, set_id: v.set_id, commit_finalized_height: *v.last_commit_height().unwrap_or(&Zero::zero()), }) }; if let Some(packet) = packet { let packet_data = GossipMessage::::from(packet).encode(); context.send_message(who, packet_data); } } fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, who: &PeerId) { self.inner.write().peers.peer_disconnected(who); } fn validate( &self, context: &mut dyn ValidatorContext, who: &PeerId, data: &[u8], ) -> sc_network_gossip::ValidationResult { let (action, broadcast_topics, peer_reply) = self.do_validate(who, data); // not with lock held! if let Some(msg) = peer_reply { context.send_message(who, msg.encode()); } for topic in broadcast_topics { context.send_topic(who, topic, false); } match action { Action::Keep(topic, cb) => { self.report(*who, cb); context.broadcast_message(topic, data.to_vec(), false); sc_network_gossip::ValidationResult::ProcessAndKeep(topic) }, Action::ProcessAndDiscard(topic, cb) => { self.report(*who, cb); sc_network_gossip::ValidationResult::ProcessAndDiscard(topic) }, Action::Discard(cb) => { self.report(*who, cb); sc_network_gossip::ValidationResult::Discard }, } } fn message_allowed<'a>( &'a self, ) -> Box bool + 'a> { let (inner, do_rebroadcast) = { use parking_lot::RwLockWriteGuard; let mut inner = self.inner.write(); let now = Instant::now(); let do_rebroadcast = if now >= inner.next_rebroadcast { inner.next_rebroadcast = now + REBROADCAST_AFTER; true } else { false }; // downgrade to read-lock. (RwLockWriteGuard::downgrade(inner), do_rebroadcast) }; Box::new(move |who, intent, topic, mut data| { if let MessageIntent::PeriodicRebroadcast = intent { return do_rebroadcast } let peer = match inner.peers.peer(who) { None => return false, Some(x) => x, }; // if the topic is not something we're keeping at the moment, // do not send. let (maybe_round, set_id) = match inner.live_topics.topic_info(topic) { None => return false, Some(x) => x, }; if let MessageIntent::Broadcast = intent { if maybe_round.is_some() { if !inner.round_message_allowed(who) { // early return if the vote message isn't allowed at this stage. return false } } else if !inner.global_message_allowed(who) { // early return if the global message isn't allowed at this stage. return false } } // if the topic is not something the peer accepts, discard. if let Some(round) = maybe_round { return peer.view.consider_vote(round, set_id) == Consider::Accept } // global message. let local_view = match inner.local_view { Some(ref v) => v, None => return false, // cannot evaluate until we have a local view. }; match GossipMessage::::decode(&mut data) { Err(_) => false, Ok(GossipMessage::Commit(full)) => { // we only broadcast commit messages if they're for the same // set the peer is in and if the commit is better than the // last received by peer, additionally we make sure to only // broadcast our best commit. peer.view.consider_global(set_id, full.message.target_number) == Consider::Accept && Some(&full.message.target_number) == local_view.last_commit_height() }, Ok(GossipMessage::Neighbor(_)) => false, Ok(GossipMessage::CatchUpRequest(_)) => false, Ok(GossipMessage::CatchUp(_)) => false, Ok(GossipMessage::Vote(_)) => false, // should not be the case. } }) } fn message_expired<'a>(&'a self) -> Box bool + 'a> { let inner = self.inner.read(); Box::new(move |topic, mut data| { // if the topic is not one of the ones that we are keeping at the moment, // it is expired. match inner.live_topics.topic_info(&topic) { None => return true, // round messages don't require further checking. Some((Some(_), _)) => return false, Some((None, _)) => {}, }; let local_view = match inner.local_view { Some(ref v) => v, None => return true, // no local view means we can't evaluate or hold any topic. }; // global messages -- only keep the best commit. match GossipMessage::::decode(&mut data) { Err(_) => true, Ok(GossipMessage::Commit(full)) => match local_view.last_commit { Some((number, round, set_id)) => // we expire any commit message that doesn't target the same block // as our best commit or isn't from the same round and set id !(full.message.target_number == number && full.round == round && full.set_id == set_id), None => true, }, Ok(_) => true, } }) } } /// Report specifying a reputation change for a given peer. pub(super) struct PeerReport { pub who: PeerId, pub cost_benefit: ReputationChange, } #[cfg(test)] mod tests { use super::{super::NEIGHBOR_REBROADCAST_PERIOD, environment::SharedVoterSetState, *}; use crate::communication; use sc_network::config::Role; use sc_network_gossip::Validator as GossipValidatorT; use sp_core::{crypto::UncheckedFrom, H256}; use std::time::Instant; use substrate_test_runtime_client::runtime::{Block, Header}; // some random config (not really needed) fn config() -> crate::Config { crate::Config { gossip_duration: Duration::from_millis(10), justification_period: 256, keystore: None, name: None, local_role: Role::Authority, observer_enabled: true, telemetry: None, protocol_name: communication::grandpa_protocol_name::NAME.into(), } } // dummy voter set state fn voter_set_state() -> SharedVoterSetState { use crate::{authorities::AuthoritySet, environment::VoterSetState}; let base = (H256::zero(), 0); let voters = vec![(AuthorityId::unchecked_from([1; 32]), 1)]; let voters = AuthoritySet::genesis(voters).unwrap(); let set_state = VoterSetState::live(0, &voters, base); set_state.into() } #[test] fn view_vote_rules() { let view = View { round: Round(100), set_id: SetId(1), last_commit: Some(1000u64), last_update: None, }; assert_eq!(view.consider_vote(Round(98), SetId(1)), Consider::RejectPast); assert_eq!(view.consider_vote(Round(1), SetId(0)), Consider::RejectPast); assert_eq!(view.consider_vote(Round(1000), SetId(0)), Consider::RejectPast); assert_eq!(view.consider_vote(Round(99), SetId(1)), Consider::Accept); assert_eq!(view.consider_vote(Round(100), SetId(1)), Consider::Accept); assert_eq!(view.consider_vote(Round(101), SetId(1)), Consider::Accept); assert_eq!(view.consider_vote(Round(102), SetId(1)), Consider::RejectFuture); assert_eq!(view.consider_vote(Round(1), SetId(2)), Consider::RejectFuture); assert_eq!(view.consider_vote(Round(1000), SetId(2)), Consider::RejectFuture); } #[test] fn view_global_message_rules() { let view = View { round: Round(100), set_id: SetId(2), last_commit: Some(1000u64), last_update: None, }; assert_eq!(view.consider_global(SetId(3), 1), Consider::RejectFuture); assert_eq!(view.consider_global(SetId(3), 1000), Consider::RejectFuture); assert_eq!(view.consider_global(SetId(3), 10000), Consider::RejectFuture); assert_eq!(view.consider_global(SetId(1), 1), Consider::RejectPast); assert_eq!(view.consider_global(SetId(1), 1000), Consider::RejectPast); assert_eq!(view.consider_global(SetId(1), 10000), Consider::RejectPast); assert_eq!(view.consider_global(SetId(2), 1), Consider::RejectPast); assert_eq!(view.consider_global(SetId(2), 1000), Consider::RejectPast); assert_eq!(view.consider_global(SetId(2), 1001), Consider::Accept); assert_eq!(view.consider_global(SetId(2), 10000), Consider::Accept); } #[test] fn unknown_peer_cannot_be_updated() { let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD); let id = PeerId::random(); let update = NeighborPacket { round: Round(5), set_id: SetId(10), commit_finalized_height: 50 }; let res = peers.update_peer_state(&id, update.clone()); assert!(res.unwrap().is_none()); // connect & disconnect. peers.new_peer(id, ObservedRole::Authority); peers.peer_disconnected(&id); let res = peers.update_peer_state(&id, update.clone()); assert!(res.unwrap().is_none()); } #[test] fn update_peer_state() { let update1 = NeighborPacket { round: Round(5), set_id: SetId(10), commit_finalized_height: 50u32 }; let update2 = NeighborPacket { round: Round(6), set_id: SetId(10), commit_finalized_height: 60 }; let update3 = NeighborPacket { round: Round(2), set_id: SetId(11), commit_finalized_height: 61 }; let update4 = NeighborPacket { round: Round(3), set_id: SetId(11), commit_finalized_height: 80 }; // Use shorter rebroadcast period to safely roll the clock back in the last test // and don't hit the system boot time on systems with unsigned time. const SHORT_NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(1); let mut peers = Peers::new(SHORT_NEIGHBOR_REBROADCAST_PERIOD); let id = PeerId::random(); peers.new_peer(id, ObservedRole::Authority); let check_update = |peers: &mut Peers<_>, update: NeighborPacket<_>| { let view = peers.update_peer_state(&id, update.clone()).unwrap().unwrap(); assert_eq!(view.round, update.round); assert_eq!(view.set_id, update.set_id); assert_eq!(view.last_commit, Some(update.commit_finalized_height)); }; check_update(&mut peers, update1); check_update(&mut peers, update2); check_update(&mut peers, update3); check_update(&mut peers, update4.clone()); // Allow duplicate neighbor packets if enough time has passed. peers.inner.get_mut(&id).unwrap().view.last_update = Some(Instant::now() - SHORT_NEIGHBOR_REBROADCAST_PERIOD); check_update(&mut peers, update4); } #[test] fn invalid_view_change() { let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD); let id = PeerId::random(); peers.new_peer(id, ObservedRole::Authority); peers .update_peer_state( &id, NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 }, ) .unwrap() .unwrap(); let mut check_update = move |update: NeighborPacket<_>, misbehavior| { let err = peers.update_peer_state(&id, update.clone()).unwrap_err(); assert_eq!(err, misbehavior); }; // round moves backwards. check_update( NeighborPacket { round: Round(9), set_id: SetId(10), commit_finalized_height: 10 }, Misbehavior::InvalidViewChange, ); // set ID moves backwards. check_update( NeighborPacket { round: Round(10), set_id: SetId(9), commit_finalized_height: 10 }, Misbehavior::InvalidViewChange, ); // commit finalized height moves backwards. check_update( NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 9 }, Misbehavior::InvalidViewChange, ); // duplicate packet without grace period. check_update( NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 }, Misbehavior::DuplicateNeighborMessage, ); // commit finalized height moves backwards while round moves forward. check_update( NeighborPacket { round: Round(11), set_id: SetId(10), commit_finalized_height: 9 }, Misbehavior::InvalidViewChange, ); // commit finalized height moves backwards while set ID moves forward. check_update( NeighborPacket { round: Round(10), set_id: SetId(11), commit_finalized_height: 9 }, Misbehavior::InvalidViewChange, ); } #[test] fn messages_not_expired_immediately() { let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); let set_id = 1; val.note_set(SetId(set_id), Vec::new(), |_, _| {}); for round_num in 1u64..10 { val.note_round(Round(round_num), |_, _| {}); } { let mut is_expired = val.message_expired(); let last_kept_round = 10u64 - KEEP_RECENT_ROUNDS as u64 - 1; // messages from old rounds are expired. for round_num in 1u64..last_kept_round { let topic = communication::round_topic::(round_num, 1); assert!(is_expired(topic, &[1, 2, 3])); } // messages from not-too-old rounds are not expired. for round_num in last_kept_round..10 { let topic = communication::round_topic::(round_num, 1); assert!(!is_expired(topic, &[1, 2, 3])); } } } #[test] fn message_from_unknown_authority_discarded() { assert!(cost::UNKNOWN_VOTER != cost::BAD_SIGNATURE); let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); let set_id = 1; let auth = AuthorityId::unchecked_from([1u8; 32]); let peer = PeerId::random(); val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {}); val.note_round(Round(1), |_, _| {}); let inner = val.inner.read(); let unknown_voter = inner.validate_round_message( &peer, &VoteMessage { round: Round(1), set_id: SetId(set_id), message: SignedMessage::
{ message: finality_grandpa::Message::Prevote(finality_grandpa::Prevote { target_hash: Default::default(), target_number: 10, }), signature: UncheckedFrom::unchecked_from([1; 64]), id: UncheckedFrom::unchecked_from([2u8; 32]), }, }, ); let bad_sig = inner.validate_round_message( &peer, &VoteMessage { round: Round(1), set_id: SetId(set_id), message: SignedMessage::
{ message: finality_grandpa::Message::Prevote(finality_grandpa::Prevote { target_hash: Default::default(), target_number: 10, }), signature: UncheckedFrom::unchecked_from([1; 64]), id: auth.clone(), }, }, ); assert_eq!(unknown_voter, Action::Discard(cost::UNKNOWN_VOTER)); assert_eq!(bad_sig, Action::Discard(cost::BAD_SIGNATURE)); } #[test] fn unsolicited_catch_up_messages_discarded() { let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); let set_id = 1; let auth = AuthorityId::unchecked_from([1u8; 32]); let peer = PeerId::random(); val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {}); val.note_round(Round(1), |_, _| {}); let validate_catch_up = || { let mut inner = val.inner.write(); inner.validate_catch_up_message( &peer, &FullCatchUpMessage { set_id: SetId(set_id), message: finality_grandpa::CatchUp { round_number: 10, prevotes: Default::default(), precommits: Default::default(), base_hash: Default::default(), base_number: Default::default(), }, }, ) }; // the catch up is discarded because we have no pending request assert_eq!(validate_catch_up(), Action::Discard(cost::OUT_OF_SCOPE_MESSAGE)); let noted = val.inner.write().note_catch_up_request( &peer, &CatchUpRequestMessage { set_id: SetId(set_id), round: Round(10) }, ); assert!(noted.0); // catch up is allowed because we have requested it, but it's rejected // because it's malformed (empty prevotes and precommits) assert_eq!(validate_catch_up(), Action::Discard(cost::MALFORMED_CATCH_UP)); } #[test] fn unanswerable_catch_up_requests_discarded() { // create voter set state with round 2 completed let set_state: SharedVoterSetState = { let mut completed_rounds = voter_set_state().read().completed_rounds(); completed_rounds.push(environment::CompletedRound { number: 2, state: finality_grandpa::round::State::genesis(Default::default()), base: Default::default(), votes: Default::default(), }); let mut current_rounds = environment::CurrentRounds::::new(); current_rounds.insert(3, environment::HasVoted::No); let set_state = environment::VoterSetState::::Live { completed_rounds, current_rounds }; set_state.into() }; let (val, _) = GossipValidator::::new(config(), set_state.clone(), None, None); let set_id = 1; let auth = AuthorityId::unchecked_from([1u8; 32]); let peer = PeerId::random(); val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {}); val.note_round(Round(3), |_, _| {}); // add the peer making the request to the validator, // otherwise it is discarded let mut inner = val.inner.write(); inner.peers.new_peer(peer, ObservedRole::Authority); let res = inner.handle_catch_up_request( &peer, CatchUpRequestMessage { set_id: SetId(set_id), round: Round(10) }, &set_state, ); // we're at round 3, a catch up request for round 10 is out of scope assert!(res.0.is_none()); assert_eq!(res.1, Action::Discard(cost::OUT_OF_SCOPE_MESSAGE)); let res = inner.handle_catch_up_request( &peer, CatchUpRequestMessage { set_id: SetId(set_id), round: Round(2) }, &set_state, ); // a catch up request for round 2 should be answered successfully match res.0.unwrap() { GossipMessage::CatchUp(catch_up) => { assert_eq!(catch_up.set_id, SetId(set_id)); assert_eq!(catch_up.message.round_number, 2); assert_eq!(res.1, Action::Discard(cost::CATCH_UP_REPLY)); }, _ => panic!("expected catch up message"), }; } #[test] fn detects_honest_out_of_scope_catch_requests() { let set_state = voter_set_state(); let (val, _) = GossipValidator::::new(config(), set_state.clone(), None, None); // the validator starts at set id 2 val.note_set(SetId(2), Vec::new(), |_, _| {}); // add the peer making the request to the validator, // otherwise it is discarded let peer = PeerId::random(); val.inner.write().peers.new_peer(peer, ObservedRole::Authority); let send_request = |set_id, round| { let mut inner = val.inner.write(); inner.handle_catch_up_request( &peer, CatchUpRequestMessage { set_id: SetId(set_id), round: Round(round) }, &set_state, ) }; let assert_res = |res: (Option<_>, Action<_>), honest| { assert!(res.0.is_none()); assert_eq!( res.1, if honest { Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP) } else { Action::Discard(Misbehavior::OutOfScopeMessage.cost()) }, ); }; // the validator is at set id 2 and round 0. requests for set id 1 // should not be answered but they should be considered an honest // mistake assert_res(send_request(1, 1), true); assert_res(send_request(1, 10), true); // requests for set id 0 should be considered out of scope assert_res(send_request(0, 1), false); assert_res(send_request(0, 10), false); // after the validator progresses further than CATCH_UP_THRESHOLD in set // id 2, any request for set id 1 should no longer be considered an // honest mistake. val.note_round(Round(3), |_, _| {}); assert_res(send_request(1, 1), false); assert_res(send_request(1, 2), false); } #[test] fn issues_catch_up_request_on_neighbor_packet_import() { let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); // the validator starts at set id 1. val.note_set(SetId(1), Vec::new(), |_, _| {}); // add the peer making the request to the validator, // otherwise it is discarded. let peer = PeerId::random(); val.inner.write().peers.new_peer(peer, ObservedRole::Authority); let import_neighbor_message = |set_id, round| { let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message( &peer, NeighborPacket { round: Round(round), set_id: SetId(set_id), commit_finalized_height: 42, }, ); catch_up_request }; // importing a neighbor message from a peer in the same set in a later // round should lead to a catch up request for the previous round. match import_neighbor_message(1, 42) { Some(GossipMessage::CatchUpRequest(request)) => { assert_eq!(request.set_id, SetId(1)); assert_eq!(request.round, Round(41)); }, _ => panic!("expected catch up message"), } // we note that we're at round 41. val.note_round(Round(41), |_, _| {}); // if we import a neighbor message within CATCH_UP_THRESHOLD then we // won't request a catch up. match import_neighbor_message(1, 42) { None => {}, _ => panic!("expected no catch up message"), } // or if the peer is on a lower round. match import_neighbor_message(1, 40) { None => {}, _ => panic!("expected no catch up message"), } // we also don't request a catch up if the peer is in a different set. match import_neighbor_message(2, 42) { None => {}, _ => panic!("expected no catch up message"), } } #[test] fn doesnt_send_catch_up_requests_when_disabled() { // we create a gossip validator with catch up requests disabled. let config = { let mut c = config(); // if the observer protocol is enabled and we are not an authority, // then we don't issue any catch-up requests. c.local_role = Role::Full; c.observer_enabled = true; c }; let (val, _) = GossipValidator::::new(config, voter_set_state(), None, None); // the validator starts at set id 1. val.note_set(SetId(1), Vec::new(), |_, _| {}); // add the peer making the request to the validator, // otherwise it is discarded. let peer = PeerId::random(); val.inner.write().peers.new_peer(peer, ObservedRole::Authority); // importing a neighbor message from a peer in the same set in a later // round should lead to a catch up request but since they're disabled // we should get `None`. let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message( &peer, NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 }, ); match catch_up_request { None => {}, _ => panic!("expected no catch up message"), } } #[test] fn doesnt_send_catch_up_requests_to_non_authorities_when_observer_enabled() { let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); // the validator starts at set id 1. val.note_set(SetId(1), Vec::new(), |_, _| {}); // add the peers making the requests to the validator, // otherwise it is discarded. let peer_authority = PeerId::random(); let peer_full = PeerId::random(); val.inner.write().peers.new_peer(peer_authority, ObservedRole::Authority); val.inner.write().peers.new_peer(peer_full, ObservedRole::Full); let import_neighbor_message = |peer| { let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message( &peer, NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 }, ); catch_up_request }; // importing a neighbor message from a peer in the same set in a later // round should lead to a catch up request but since the node is not an // authority we should get `None`. if import_neighbor_message(peer_full).is_some() { panic!("expected no catch up message"); } // importing the same neighbor message from a peer who is an authority // should lead to a catch up request. match import_neighbor_message(peer_authority) { Some(GossipMessage::CatchUpRequest(request)) => { assert_eq!(request.set_id, SetId(1)); assert_eq!(request.round, Round(41)); }, _ => panic!("expected catch up message"), } } #[test] fn sends_catch_up_requests_to_non_authorities_when_observer_disabled() { let config = { let mut c = config(); // if the observer protocol is disable any full-node should be able // to answer catch-up requests. c.observer_enabled = false; c }; let (val, _) = GossipValidator::::new(config, voter_set_state(), None, None); // the validator starts at set id 1. val.note_set(SetId(1), Vec::new(), |_, _| {}); // add the peer making the requests to the validator, otherwise it is // discarded. let peer_full = PeerId::random(); val.inner.write().peers.new_peer(peer_full, ObservedRole::Full); let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message( &peer_full, NeighborPacket { round: Round(42), set_id: SetId(1), commit_finalized_height: 50 }, ); // importing a neighbor message from a peer in the same set in a later // round should lead to a catch up request, the node is not an // authority, but since the observer protocol is disabled we should // issue a catch-up request to it anyway. match catch_up_request { Some(GossipMessage::CatchUpRequest(request)) => { assert_eq!(request.set_id, SetId(1)); assert_eq!(request.round, Round(41)); }, _ => panic!("expected catch up message"), } } #[test] fn doesnt_expire_next_round_messages() { // NOTE: this is a regression test let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); // the validator starts at set id 1. val.note_set(SetId(1), Vec::new(), |_, _| {}); // we are at round 10 val.note_round(Round(9), |_, _| {}); val.note_round(Round(10), |_, _| {}); let mut is_expired = val.message_expired(); // we accept messages from rounds 9, 10 and 11 // therefore neither of those should be considered expired for round in &[9, 10, 11] { assert!(!is_expired(communication::round_topic::(*round, 1), &[])) } } #[test] fn progressively_gossips_to_more_peers_as_round_duration_increases() { let mut config = config(); config.gossip_duration = Duration::from_secs(300); // Set to high value to prevent test race let round_duration = config.gossip_duration * ROUND_DURATION; let (val, _) = GossipValidator::::new(config, voter_set_state(), None, None); // the validator start at set id 0 val.note_set(SetId(0), Vec::new(), |_, _| {}); // add 60 peers, 30 authorities and 30 full nodes let mut authorities = Vec::new(); authorities.resize_with(30, || PeerId::random()); let mut full_nodes = Vec::new(); full_nodes.resize_with(30, || PeerId::random()); for i in 0..30 { val.inner.write().peers.new_peer(authorities[i], ObservedRole::Authority); val.inner.write().peers.new_peer(full_nodes[i], ObservedRole::Full); } let test = |rounds_elapsed, peers| { // rewind n round durations val.inner.write().local_view.as_mut().unwrap().round_start = Instant::now() - Duration::from_millis( (round_duration.as_millis() as f32 * rounds_elapsed) as u64, ); val.inner.write().peers.reshuffle(); let mut message_allowed = val.message_allowed(); move || { let mut allowed = 0; for peer in peers { if message_allowed( peer, MessageIntent::Broadcast, &communication::round_topic::(1, 0), &[], ) { allowed += 1; } } allowed } }; fn trial usize>(mut test: F) -> usize { let mut results = Vec::new(); let n = 1000; for _ in 0..n { results.push(test()); } let n = results.len(); let sum: usize = results.iter().sum(); sum / n } let all_peers = authorities.iter().chain(full_nodes.iter()).cloned().collect(); // on the first attempt we will only gossip to 4 peers, either // authorities or full nodes, but we'll guarantee that half of those // are authorities assert!(trial(test(1.0, &authorities)) >= LUCKY_PEERS / 2); assert_eq!(trial(test(1.0, &all_peers)), LUCKY_PEERS); // after more than 1.5 round durations have elapsed we should gossip to // `sqrt(peers)` we're connected to, but we guarantee that at least 4 of // those peers are authorities (plus the `LUCKY_PEERS` from the previous // stage) assert!(trial(test(PROPAGATION_SOME * 1.1, &authorities)) >= LUCKY_PEERS); assert_eq!( trial(test(2.0, &all_peers)), LUCKY_PEERS + (all_peers.len() as f64).sqrt() as usize, ); // after 3 rounds durations we should gossip to all peers we are // connected to assert_eq!(trial(test(PROPAGATION_ALL * 1.1, &all_peers)), all_peers.len()); } #[test] fn never_gossips_round_messages_to_light_clients() { let config = config(); let round_duration = config.gossip_duration * ROUND_DURATION; let (val, _) = GossipValidator::::new(config, voter_set_state(), None, None); // the validator starts at set id 0 val.note_set(SetId(0), Vec::new(), |_, _| {}); // add a new light client as peer let light_peer = PeerId::random(); val.inner.write().peers.new_peer(light_peer, ObservedRole::Light); assert!(!val.message_allowed()( &light_peer, MessageIntent::Broadcast, &communication::round_topic::(1, 0), &[], )); // we reverse the round start time so that the elapsed time is higher // (which should lead to more peers getting the message) val.inner.write().local_view.as_mut().unwrap().round_start = Instant::now() - round_duration * 10; // even after the round has been going for 10 round durations we will never // gossip to light clients assert!(!val.message_allowed()( &light_peer, MessageIntent::Broadcast, &communication::round_topic::(1, 0), &[], )); // update the peer state and local state wrt commits val.inner .write() .peers .update_peer_state( &light_peer, NeighborPacket { round: Round(1), set_id: SetId(0), commit_finalized_height: 1 }, ) .unwrap(); val.note_commit_finalized(Round(1), SetId(0), 2, |_, _| {}); let commit = { let commit = finality_grandpa::CompactCommit { target_hash: H256::random(), target_number: 2, precommits: Vec::new(), auth_data: Vec::new(), }; communication::gossip::GossipMessage::::Commit( communication::gossip::FullCommitMessage { round: Round(2), set_id: SetId(0), message: commit, }, ) .encode() }; // global messages are gossiped to light clients though assert!(val.message_allowed()( &light_peer, MessageIntent::Broadcast, &communication::global_topic::(0), &commit, )); } #[test] fn only_gossip_commits_to_peers_on_same_set() { let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); // the validator starts at set id 1 val.note_set(SetId(1), Vec::new(), |_, _| {}); // add a new peer at set id 1 let peer1 = PeerId::random(); val.inner.write().peers.new_peer(peer1, ObservedRole::Authority); val.inner .write() .peers .update_peer_state( &peer1, NeighborPacket { round: Round(1), set_id: SetId(1), commit_finalized_height: 1 }, ) .unwrap(); // peer2 will default to set id 0 let peer2 = PeerId::random(); val.inner.write().peers.new_peer(peer2, ObservedRole::Authority); // create a commit for round 1 of set id 1 // targeting a block at height 2 let commit = { let commit = finality_grandpa::CompactCommit { target_hash: H256::random(), target_number: 2, precommits: Vec::new(), auth_data: Vec::new(), }; communication::gossip::GossipMessage::::Commit( communication::gossip::FullCommitMessage { round: Round(1), set_id: SetId(1), message: commit, }, ) .encode() }; // note the commit in the validator val.note_commit_finalized(Round(1), SetId(1), 2, |_, _| {}); let mut message_allowed = val.message_allowed(); // the commit should be allowed to peer 1 assert!(message_allowed( &peer1, MessageIntent::Broadcast, &communication::global_topic::(1), &commit, )); // but disallowed to peer 2 since the peer is on set id 0 // the commit should be allowed to peer 1 assert!(!message_allowed( &peer2, MessageIntent::Broadcast, &communication::global_topic::(1), &commit, )); } #[test] fn expire_commits_from_older_rounds() { let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); let commit = |round, set_id, target_number| { let commit = finality_grandpa::CompactCommit { target_hash: H256::random(), target_number, precommits: Vec::new(), auth_data: Vec::new(), }; communication::gossip::GossipMessage::::Commit( communication::gossip::FullCommitMessage { round: Round(round), set_id: SetId(set_id), message: commit, }, ) .encode() }; // note the beginning of a new set with id 1 val.note_set(SetId(1), Vec::new(), |_, _| {}); // note a commit for round 1 in the validator // finalizing a block at height 2 val.note_commit_finalized(Round(1), SetId(1), 2, |_, _| {}); let mut message_expired = val.message_expired(); // a commit message for round 1 that finalizes the same height as we // have observed previously should not be expired assert!(!message_expired(communication::global_topic::(1), &commit(1, 1, 2),)); // it should be expired if it is for a lower block assert!(message_expired(communication::global_topic::(1), &commit(1, 1, 1))); // or the same block height but from the previous round assert!(message_expired(communication::global_topic::(1), &commit(0, 1, 2))); } #[test] fn allow_noting_different_authorities_for_same_set() { let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); let a1 = vec![UncheckedFrom::unchecked_from([0; 32])]; val.note_set(SetId(1), a1.clone(), |_, _| {}); assert_eq!(val.inner().read().authorities, a1); let a2 = vec![UncheckedFrom::unchecked_from([1; 32]), UncheckedFrom::unchecked_from([2; 32])]; val.note_set(SetId(1), a2.clone(), |_, _| {}); assert_eq!(val.inner().read().authorities, a2); } }