Punish peers for duplicate GRANDPA neighbor messages (#12462)

* Decrease peer reputation for duplicate GRANDPA neighbor messages.

* Fix comparison

* Fix update_peer_state() validity condition

* Add negative test

* Rework update_peer_state() validity condition, add tests

* update_peer_state() validity condition: invert comparison

* Split InvalidViewChange and DuplicateNeighborMessage misbehaviors

* Enforce rate-limiting of duplicate GRANDPA neighbor packets

* Update client/finality-grandpa/src/communication/gossip.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Make rolling clock back in a test safer

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Dmitrii Markin
2022-10-13 12:24:31 +03:00
committed by GitHub
parent b38c4165a4
commit b0b2b6799c
3 changed files with 124 additions and 52 deletions
@@ -35,7 +35,8 @@
//! impolite to send messages about r+1 or later. "future-round" messages can
//! be dropped and ignored.
//!
//! It is impolite to send a neighbor packet which moves backwards in protocol state.
//! It is impolite to send a neighbor packet which moves backwards or does not progress
//! protocol state.
//!
//! This is beneficial if it conveys some progress in the protocol state of the peer.
//!
@@ -97,7 +98,7 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound
use sp_finality_grandpa::AuthorityId;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use super::{benefit, cost, Round, SetId};
use super::{benefit, cost, Round, SetId, NEIGHBOR_REBROADCAST_PERIOD};
use crate::{environment, CatchUp, CompactCommit, SignedMessage};
use std::{
@@ -151,11 +152,12 @@ struct View<N> {
round: Round, // the current round we are at.
set_id: SetId, // the current voter set id.
last_commit: Option<N>, // commit-finalized block height, if any.
last_update: Option<Instant>, // last time we heard from peer, used for spamming detection.
}
impl<N> Default for View<N> {
fn default() -> Self {
View { round: Round(1), set_id: SetId(0), last_commit: None }
View { round: Round(1), set_id: SetId(0), last_commit: None, last_update: None }
}
}
@@ -225,7 +227,12 @@ impl<N> LocalView<N> {
/// Converts the local view to a `View` discarding round and set id
/// information about the last commit.
fn as_view(&self) -> View<&N> {
View { round: self.round, set_id: self.set_id, last_commit: self.last_commit_height() }
View {
round: self.round,
set_id: self.set_id,
last_commit: self.last_commit_height(),
last_update: None,
}
}
/// Update the set ID. implies a reset to round 1.
@@ -417,6 +424,8 @@ pub(super) struct FullCatchUpMessage<Block: BlockT> {
pub(super) enum Misbehavior {
// invalid neighbor message, considering the last one.
InvalidViewChange,
// duplicate neighbor message.
DuplicateNeighborMessage,
// could not decode neighbor message. bytes-length of the packet.
UndecodablePacket(i32),
// Bad catch up message (invalid signatures).
@@ -438,6 +447,7 @@ impl Misbehavior {
match *self {
InvalidViewChange => cost::INVALID_VIEW_CHANGE,
DuplicateNeighborMessage => cost::DUPLICATE_NEIGHBOR_MESSAGE,
UndecodablePacket(bytes) => ReputationChange::new(
bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
"Grandpa: Bad packet",
@@ -488,20 +498,22 @@ struct Peers<N> {
second_stage_peers: HashSet<PeerId>,
/// The randomly picked set of `LUCKY_PEERS` light clients we'll gossip commit messages to.
lucky_light_peers: HashSet<PeerId>,
/// Neighbor packet rebroadcast period --- we reduce the reputation of peers sending duplicate
/// packets too often.
neighbor_rebroadcast_period: Duration,
}
impl<N> Default for Peers<N> {
fn default() -> Self {
impl<N: Ord> Peers<N> {
fn new(neighbor_rebroadcast_period: Duration) -> Self {
Peers {
inner: Default::default(),
first_stage_peers: Default::default(),
second_stage_peers: Default::default(),
lucky_light_peers: Default::default(),
neighbor_rebroadcast_period,
}
}
}
impl<N: Ord> Peers<N> {
fn new_peer(&mut self, who: PeerId, role: ObservedRole) {
match role {
ObservedRole::Authority if self.first_stage_peers.len() < LUCKY_PEERS => {
@@ -547,10 +559,23 @@ impl<N: Ord> Peers<N> {
return Err(Misbehavior::InvalidViewChange)
}
let now = Instant::now();
let duplicate_packet = (update.set_id, update.round, Some(&update.commit_finalized_height)) ==
(peer.view.set_id, peer.view.round, peer.view.last_commit.as_ref());
if duplicate_packet {
if let Some(last_update) = peer.view.last_update {
if now < last_update + self.neighbor_rebroadcast_period / 2 {
return Err(Misbehavior::DuplicateNeighborMessage)
}
}
}
peer.view = View {
round: update.round,
set_id: update.set_id,
last_commit: Some(update.commit_finalized_height),
last_update: Some(now),
};
trace!(target: "afg", "Peer {} updated view. Now at {:?}, {:?}",
@@ -748,7 +773,7 @@ impl<Block: BlockT> Inner<Block> {
Inner {
local_view: None,
peers: Peers::default(),
peers: Peers::new(NEIGHBOR_REBROADCAST_PERIOD),
live_topics: KeepTopics::new(),
next_rebroadcast: Instant::now() + REBROADCAST_AFTER,
authorities: Vec::new(),
@@ -758,13 +783,16 @@ impl<Block: BlockT> Inner<Block> {
}
}
/// Note a round in the current set has started.
/// Note a round in the current set has started. Does nothing if the last
/// call to the function was with the same `round`.
fn note_round(&mut self, round: Round) -> MaybeMessage<Block> {
{
let local_view = match self.local_view {
None => return None,
Some(ref mut v) =>
if v.round == round {
// Do not send neighbor packets out if `round` has not changed ---
// such behavior is punishable.
return None
} else {
v
@@ -803,6 +831,8 @@ impl<Block: BlockT> Inner<Block> {
);
self.authorities = authorities;
}
// Do not send neighbor packets out if the `set_id` has not changed ---
// such behavior is punishable.
return None
} else {
v
@@ -816,7 +846,9 @@ impl<Block: BlockT> Inner<Block> {
self.multicast_neighbor_packet()
}
/// Note that we've imported a commit finalizing a given block.
/// Note that we've imported a commit finalizing a given block. Does nothing if the last
/// call to the function was with the same or higher `finalized` number.
/// `set_id` & `round` are the ones the commit message is from.
fn note_commit_finalized(
&mut self,
round: Round,
@@ -1357,6 +1389,8 @@ impl<Block: BlockT> GossipValidator<Block> {
}
/// Note that we've imported a commit finalizing a given block.
/// `set_id` & `round` are the ones the commit message is from and not necessarily
/// the latest set ID & round started.
pub(super) fn note_commit_finalized<F>(
&self,
round: Round,
@@ -1647,11 +1681,12 @@ pub(super) struct PeerReport {
#[cfg(test)]
mod tests {
use super::{environment::SharedVoterSetState, *};
use super::{super::NEIGHBOR_REBROADCAST_PERIOD, environment::SharedVoterSetState, *};
use crate::communication;
use sc_network::config::Role;
use sc_network_gossip::Validator as GossipValidatorT;
use sp_core::{crypto::UncheckedFrom, H256};
use std::time::Instant;
use substrate_test_runtime_client::runtime::{Block, Header};
// some random config (not really needed)
@@ -1684,7 +1719,12 @@ mod tests {
#[test]
fn view_vote_rules() {
let view = View { round: Round(100), set_id: SetId(1), last_commit: Some(1000u64) };
let view = View {
round: Round(100),
set_id: SetId(1),
last_commit: Some(1000u64),
last_update: None,
};
assert_eq!(view.consider_vote(Round(98), SetId(1)), Consider::RejectPast);
assert_eq!(view.consider_vote(Round(1), SetId(0)), Consider::RejectPast);
@@ -1701,7 +1741,12 @@ mod tests {
#[test]
fn view_global_message_rules() {
let view = View { round: Round(100), set_id: SetId(2), last_commit: Some(1000u64) };
let view = View {
round: Round(100),
set_id: SetId(2),
last_commit: Some(1000u64),
last_update: None,
};
assert_eq!(view.consider_global(SetId(3), 1), Consider::RejectFuture);
assert_eq!(view.consider_global(SetId(3), 1000), Consider::RejectFuture);
@@ -1719,7 +1764,7 @@ mod tests {
#[test]
fn unknown_peer_cannot_be_updated() {
let mut peers = Peers::default();
let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
let id = PeerId::random();
let update =
@@ -1750,27 +1795,35 @@ mod tests {
let update4 =
NeighborPacket { round: Round(3), set_id: SetId(11), commit_finalized_height: 80 };
let mut peers = Peers::default();
// Use shorter rebroadcast period to safely roll the clock back in the last test
// and don't hit the system boot time on systems with unsigned time.
const SHORT_NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(1);
let mut peers = Peers::new(SHORT_NEIGHBOR_REBROADCAST_PERIOD);
let id = PeerId::random();
peers.new_peer(id, ObservedRole::Authority);
let mut check_update = move |update: NeighborPacket<_>| {
let check_update = |peers: &mut Peers<_>, update: NeighborPacket<_>| {
let view = peers.update_peer_state(&id, update.clone()).unwrap().unwrap();
assert_eq!(view.round, update.round);
assert_eq!(view.set_id, update.set_id);
assert_eq!(view.last_commit, Some(update.commit_finalized_height));
};
check_update(update1);
check_update(update2);
check_update(update3);
check_update(update4);
check_update(&mut peers, update1);
check_update(&mut peers, update2);
check_update(&mut peers, update3);
check_update(&mut peers, update4.clone());
// Allow duplicate neighbor packets if enough time has passed.
peers.inner.get_mut(&id).unwrap().view.last_update =
Some(Instant::now() - SHORT_NEIGHBOR_REBROADCAST_PERIOD);
check_update(&mut peers, update4);
}
#[test]
fn invalid_view_change() {
let mut peers = Peers::default();
let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
let id = PeerId::random();
peers.new_peer(id, ObservedRole::Authority);
@@ -1783,29 +1836,41 @@ mod tests {
.unwrap()
.unwrap();
let mut check_update = move |update: NeighborPacket<_>| {
let mut check_update = move |update: NeighborPacket<_>, misbehavior| {
let err = peers.update_peer_state(&id, update.clone()).unwrap_err();
assert_eq!(err, Misbehavior::InvalidViewChange);
assert_eq!(err, misbehavior);
};
// round moves backwards.
check_update(NeighborPacket {
round: Round(9),
set_id: SetId(10),
commit_finalized_height: 10,
});
// commit finalized height moves backwards.
check_update(NeighborPacket {
round: Round(10),
set_id: SetId(10),
commit_finalized_height: 9,
});
check_update(
NeighborPacket { round: Round(9), set_id: SetId(10), commit_finalized_height: 10 },
Misbehavior::InvalidViewChange,
);
// set ID moves backwards.
check_update(NeighborPacket {
round: Round(10),
set_id: SetId(9),
commit_finalized_height: 10,
});
check_update(
NeighborPacket { round: Round(10), set_id: SetId(9), commit_finalized_height: 10 },
Misbehavior::InvalidViewChange,
);
// commit finalized height moves backwards.
check_update(
NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 9 },
Misbehavior::InvalidViewChange,
);
// duplicate packet without grace period.
check_update(
NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 },
Misbehavior::DuplicateNeighborMessage,
);
// commit finalized height moves backwards while round moves forward.
check_update(
NeighborPacket { round: Round(11), set_id: SetId(10), commit_finalized_height: 9 },
Misbehavior::InvalidViewChange,
);
// commit finalized height moves backwards while set ID moves forward.
check_update(
NeighborPacket { round: Round(10), set_id: SetId(11), commit_finalized_height: 9 },
Misbehavior::InvalidViewChange,
);
}
#[test]
@@ -37,6 +37,7 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use finality_grandpa::{
@@ -68,6 +69,9 @@ mod periodic;
#[cfg(test)]
pub(crate) mod tests;
// How often to rebroadcast neighbor packets, in cases where no new packets are created.
pub(crate) const NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(2 * 60);
pub mod grandpa_protocol_name {
use sc_chain_spec::ChainSpec;
use sc_network_common::protocol::ProtocolName;
@@ -103,6 +107,8 @@ mod cost {
pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "Grandpa: Unknown voter");
pub(super) const INVALID_VIEW_CHANGE: Rep = Rep::new(-500, "Grandpa: Invalid view change");
pub(super) const DUPLICATE_NEIGHBOR_MESSAGE: Rep =
Rep::new(-500, "Grandpa: Duplicate neighbor message without grace period");
pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
pub(super) const PER_BLOCK_LOADED: i32 = -10;
@@ -279,7 +285,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
let (neighbor_packet_worker, neighbor_packet_sender) =
periodic::NeighborPacketWorker::new();
periodic::NeighborPacketWorker::new(NEIGHBOR_REBROADCAST_PERIOD);
NetworkBridge {
service,
@@ -32,9 +32,6 @@ use super::gossip::{GossipMessage, NeighborPacket};
use sc_network::PeerId;
use sp_runtime::traits::{Block as BlockT, NumberFor};
// How often to rebroadcast, in cases where no new packets are created.
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
/// A sender used to send neighbor packets to a background job.
#[derive(Clone)]
pub(super) struct NeighborPacketSender<B: BlockT>(
@@ -60,6 +57,7 @@ impl<B: BlockT> NeighborPacketSender<B> {
/// implementation). Periodically it sends out the last packet in cases where no new ones arrive.
pub(super) struct NeighborPacketWorker<B: BlockT> {
last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
rebroadcast_period: Duration,
delay: Delay,
rx: TracingUnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
}
@@ -67,13 +65,16 @@ pub(super) struct NeighborPacketWorker<B: BlockT> {
impl<B: BlockT> Unpin for NeighborPacketWorker<B> {}
impl<B: BlockT> NeighborPacketWorker<B> {
pub(super) fn new() -> (Self, NeighborPacketSender<B>) {
pub(super) fn new(rebroadcast_period: Duration) -> (Self, NeighborPacketSender<B>) {
let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>(
"mpsc_grandpa_neighbor_packet_worker",
);
let delay = Delay::new(REBROADCAST_AFTER);
let delay = Delay::new(rebroadcast_period);
(NeighborPacketWorker { last: None, delay, rx }, NeighborPacketSender(tx))
(
NeighborPacketWorker { last: None, rebroadcast_period, delay, rx },
NeighborPacketSender(tx),
)
}
}
@@ -85,7 +86,7 @@ impl<B: BlockT> Stream for NeighborPacketWorker<B> {
match this.rx.poll_next_unpin(cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some((to, packet))) => {
this.delay.reset(REBROADCAST_AFTER);
this.delay.reset(this.rebroadcast_period);
this.last = Some((to.clone(), packet.clone()));
return Poll::Ready(Some((to, GossipMessage::<B>::from(packet))))
@@ -98,7 +99,7 @@ impl<B: BlockT> Stream for NeighborPacketWorker<B> {
// Getting this far here implies that the timer fired.
this.delay.reset(REBROADCAST_AFTER);
this.delay.reset(this.rebroadcast_period);
// Make sure the underlying task is scheduled for wake-up.
//