sc-consensus-beefy: improve beefy gossip validator (#13606)

* sc-consensus-beefy: improve beefy gossip validator

Old gossip validator was pretty dumb, being very permissive with
incoming votes - only condition it had was to be newer than best
finalized.

New filter conditions:
 - voter rounds are initialized (discarding votes until voter is
   actually active),
 - only votes for current active set id are accepted,
 - only votes for rounds in the current voting session are accepted,
 - only votes for GRANDPA finalized blocks are accepted,
 - when BEEFY voter reaches mandatory round, only votes for said
   mandatory round are accepted.

New validator uses the VoterOracle to easily implement above conditions
and only allow through votes that are immediately useful to the voter.

After every GRANDPA or BEEFY finality, the gossip validator filter is
updated.

* sc-consensus-beefy: remove votes enqueueing

Since gossip validator will simply disallow votes for future rounds,
and only allow votes that the voter can immediately process, there
is no need for the voter to enqueue votes.

It will see these "future" votes later in rebroadcasts, when voter
will also be able to process them. Only at that point does gossip
accept and consume them.

* sc-consensus-beefy: refactor persistent state

Move best-beefy and best-grandpa into VoterOracle instead
of passing them around as params.
VoterOracle ultimately needs to know best-beefy and/or best-grandpa
for most of its functions.

* sc-consensus-beefy: further restrict gossip validator

Assuming mandatory done in current session:
Instead of allowing votes for any round in the current session, only
accept votes for rounds equal or better than best BEEFY finalized.

* sc-consensus-beefy: add a couple of comments

* sc-consensus-beefy: fix tests involving multiple tasks

Finalize blocks one a time in tests where we want gossip to happen
in a certain round. Otherwise, some tasks may be left behind in
terms of gossip round numbers because once "scheduled" a task will
greedily process as much as possible.

This change should be in line with the real-world scenario where
voters run "in parallel" across nodes, the only points of
synchronization being the finality notifications.

* sc-consensus-beefy: address review comments

---------

Signed-off-by: acatangiu <adrian@parity.io>
This commit is contained in:
Adrian Catangiu
2023-03-16 11:02:39 +02:00
committed by GitHub
parent 3d6d2954ce
commit 3708b156d9
6 changed files with 384 additions and 573 deletions
@@ -31,11 +31,14 @@ use wasm_timer::Instant;
use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore, LOG_TARGET};
use sp_consensus_beefy::{
crypto::{Public, Signature},
VoteMessage,
ValidatorSetId, VoteMessage,
};
// Timeout for rebroadcasting messages.
const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
#[cfg(not(test))]
const REBROADCAST_AFTER: Duration = Duration::from_secs(60);
#[cfg(test)]
const REBROADCAST_AFTER: Duration = Duration::from_secs(5);
/// Gossip engine messages topic
pub(crate) fn topic<B: Block>() -> B::Hash
@@ -45,45 +48,51 @@ where
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy")
}
#[derive(Debug)]
pub(crate) struct GossipVoteFilter<B: Block> {
pub start: NumberFor<B>,
pub end: NumberFor<B>,
pub validator_set_id: ValidatorSetId,
}
/// A type that represents hash of the message.
pub type MessageHash = [u8; 8];
struct KnownVotes<B: Block> {
last_done: Option<NumberFor<B>>,
struct VotesFilter<B: Block> {
filter: Option<GossipVoteFilter<B>>,
live: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>,
}
impl<B: Block> KnownVotes<B> {
impl<B: Block> VotesFilter<B> {
pub fn new() -> Self {
Self { last_done: None, live: BTreeMap::new() }
Self { filter: None, live: BTreeMap::new() }
}
/// Create new round votes set if not already present.
fn insert(&mut self, round: NumberFor<B>) {
self.live.entry(round).or_default();
/// Update filter to new `start` and `set_id`.
fn update(&mut self, filter: GossipVoteFilter<B>) {
self.live.retain(|&round, _| round >= filter.start && round <= filter.end);
self.filter = Some(filter);
}
/// Remove `round` and older from live set, update `last_done` accordingly.
fn conclude(&mut self, round: NumberFor<B>) {
self.live.retain(|&number, _| number > round);
self.last_done = self.last_done.max(Some(round));
}
/// Return true if `round` is newer than previously concluded rounds.
/// Return true if `round` is >= than `max(session_start, best_beefy)`,
/// and vote set id matches session set id.
///
/// Latest concluded round is still considered alive to allow proper gossiping for it.
fn is_live(&self, round: &NumberFor<B>) -> bool {
Some(*round) >= self.last_done
fn is_live(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
self.filter
.as_ref()
.map(|f| set_id == f.validator_set_id && round >= f.start && round <= f.end)
.unwrap_or(false)
}
/// Add new _known_ `hash` to the round's known votes.
fn add_known(&mut self, round: &NumberFor<B>, hash: MessageHash) {
self.live.get_mut(round).map(|known| known.insert(hash));
fn add_known(&mut self, round: NumberFor<B>, hash: MessageHash) {
self.live.entry(round).or_default().insert(hash);
}
/// Check if `hash` is already part of round's known votes.
fn is_known(&self, round: &NumberFor<B>, hash: &MessageHash) -> bool {
self.live.get(round).map(|known| known.contains(hash)).unwrap_or(false)
fn is_known(&self, round: NumberFor<B>, hash: &MessageHash) -> bool {
self.live.get(&round).map(|known| known.contains(hash)).unwrap_or(false)
}
}
@@ -100,7 +109,7 @@ where
B: Block,
{
topic: B::Hash,
known_votes: RwLock<KnownVotes<B>>,
votes_filter: RwLock<VotesFilter<B>>,
next_rebroadcast: Mutex<Instant>,
known_peers: Arc<Mutex<KnownPeers<B>>>,
}
@@ -112,26 +121,18 @@ where
pub fn new(known_peers: Arc<Mutex<KnownPeers<B>>>) -> GossipValidator<B> {
GossipValidator {
topic: topic::<B>(),
known_votes: RwLock::new(KnownVotes::new()),
votes_filter: RwLock::new(VotesFilter::new()),
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
known_peers,
}
}
/// Note a voting round.
/// Update gossip validator filter.
///
/// Noting round will track gossiped votes for `round`.
pub(crate) fn note_round(&self, round: NumberFor<B>) {
debug!(target: LOG_TARGET, "🥩 About to note gossip round #{}", round);
self.known_votes.write().insert(round);
}
/// Conclude a voting round.
///
/// This can be called once round is complete so we stop gossiping for it.
pub(crate) fn conclude_round(&self, round: NumberFor<B>) {
debug!(target: LOG_TARGET, "🥩 About to drop gossip round #{}", round);
self.known_votes.write().conclude(round);
/// Only votes for `set_id` and rounds `start <= round <= end` will be accepted.
pub(crate) fn update_filter(&self, filter: GossipVoteFilter<B>) {
debug!(target: LOG_TARGET, "🥩 New gossip filter {:?}", filter);
self.votes_filter.write().update(filter);
}
}
@@ -152,25 +153,26 @@ where
if let Ok(msg) = VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
let msg_hash = twox_64(data);
let round = msg.commitment.block_number;
let set_id = msg.commitment.validator_set_id;
self.known_peers.lock().note_vote_for(*sender, round);
// Verify general usefulness of the message.
// We are going to discard old votes right away (without verification)
// Also we keep track of already received votes to avoid verifying duplicates.
{
let known_votes = self.known_votes.read();
let filter = self.votes_filter.read();
if !known_votes.is_live(&round) {
if !filter.is_live(round, set_id) {
return ValidationResult::Discard
}
if known_votes.is_known(&round, &msg_hash) {
if filter.is_known(round, &msg_hash) {
return ValidationResult::ProcessAndKeep(self.topic)
}
}
if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) {
self.known_votes.write().add_known(&round, msg_hash);
self.known_peers.lock().note_vote_for(*sender, round);
self.votes_filter.write().add_known(round, msg_hash);
return ValidationResult::ProcessAndKeep(self.topic)
} else {
// TODO: report peer
@@ -185,7 +187,7 @@ where
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
let known_votes = self.known_votes.read();
let filter = self.votes_filter.read();
Box::new(move |_topic, mut data| {
let msg = match VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
Ok(vote) => vote,
@@ -193,7 +195,8 @@ where
};
let round = msg.commitment.block_number;
let expired = !known_votes.is_live(&round);
let set_id = msg.commitment.validator_set_id;
let expired = !filter.is_live(round, set_id);
trace!(target: LOG_TARGET, "🥩 Message for round #{} expired: {}", round, expired);
@@ -208,6 +211,7 @@ where
let now = Instant::now();
let mut next_rebroadcast = self.next_rebroadcast.lock();
if now >= *next_rebroadcast {
trace!(target: LOG_TARGET, "🥩 Gossip rebroadcast");
*next_rebroadcast = now + REBROADCAST_AFTER;
true
} else {
@@ -215,7 +219,7 @@ where
}
};
let known_votes = self.known_votes.read();
let filter = self.votes_filter.read();
Box::new(move |_who, intent, _topic, mut data| {
if let MessageIntent::PeriodicRebroadcast = intent {
return do_rebroadcast
@@ -227,7 +231,8 @@ where
};
let round = msg.commitment.block_number;
let allowed = known_votes.is_live(&round);
let set_id = msg.commitment.validator_set_id;
let allowed = filter.is_live(round, set_id);
trace!(target: LOG_TARGET, "🥩 Message for round #{} allowed: {}", round, allowed);
@@ -252,81 +257,35 @@ mod tests {
#[test]
fn known_votes_insert_remove() {
let mut kv = KnownVotes::<Block>::new();
let mut kv = VotesFilter::<Block>::new();
let msg_hash = twox_64(b"data");
kv.insert(1);
kv.insert(1);
kv.insert(2);
kv.add_known(1, msg_hash);
kv.add_known(1, msg_hash);
kv.add_known(2, msg_hash);
assert_eq!(kv.live.len(), 2);
let mut kv = KnownVotes::<Block>::new();
kv.insert(1);
kv.insert(2);
kv.insert(3);
kv.add_known(3, msg_hash);
assert!(kv.is_known(3, &msg_hash));
assert!(!kv.is_known(3, &twox_64(b"other")));
assert!(!kv.is_known(4, &msg_hash));
assert_eq!(kv.live.len(), 3);
assert!(kv.last_done.is_none());
kv.conclude(2);
assert!(kv.filter.is_none());
assert!(!kv.is_live(1, 1));
kv.update(GossipVoteFilter { start: 3, end: 10, validator_set_id: 1 });
assert_eq!(kv.live.len(), 1);
assert!(!kv.live.contains_key(&2));
assert_eq!(kv.last_done, Some(2));
assert!(kv.live.contains_key(&3));
assert!(!kv.is_live(2, 1));
assert!(kv.is_live(3, 1));
assert!(kv.is_live(4, 1));
assert!(!kv.is_live(4, 2));
kv.conclude(1);
assert_eq!(kv.last_done, Some(2));
kv.conclude(3);
assert_eq!(kv.last_done, Some(3));
kv.update(GossipVoteFilter { start: 5, end: 10, validator_set_id: 2 });
assert!(kv.live.is_empty());
}
#[test]
fn note_and_drop_round_works() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.note_round(1u64);
assert!(gv.known_votes.read().is_live(&1u64));
gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);
assert_eq!(gv.known_votes.read().live.len(), 4);
gv.conclude_round(7u64);
let votes = gv.known_votes.read();
// rounds 1 and 3 are outdated, don't gossip anymore
assert!(!votes.is_live(&1u64));
assert!(!votes.is_live(&3u64));
// latest concluded round is still gossiped
assert!(votes.is_live(&7u64));
// round 10 is alive and in-progress
assert!(votes.is_live(&10u64));
}
#[test]
fn note_same_round_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);
assert_eq!(gv.known_votes.read().live.len(), 3);
// note round #7 again -> should not change anything
gv.note_round(7u64);
let votes = gv.known_votes.read();
assert_eq!(votes.live.len(), 3);
assert!(votes.is_live(&3u64));
assert!(votes.is_live(&7u64));
assert!(votes.is_live(&10u64));
}
struct TestContext;
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
@@ -368,21 +327,18 @@ mod tests {
#[test]
fn should_avoid_verifying_signatures_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
let sender = sc_network::PeerId::random();
let mut context = TestContext;
let vote = dummy_vote(3);
gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);
// first time the cache should be populated
let res = gv.validate(&mut context, &sender, &vote.encode());
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
assert_eq!(
gv.known_votes.read().live.get(&vote.commitment.block_number).map(|x| x.len()),
gv.votes_filter.read().live.get(&vote.commitment.block_number).map(|x| x.len()),
Some(1)
);
@@ -392,9 +348,11 @@ mod tests {
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
// next we should quickly reject if the round is not live
gv.conclude_round(7_u64);
gv.update_filter(GossipVoteFilter { start: 7, end: 10, validator_set_id: 0 });
assert!(!gv.known_votes.read().is_live(&vote.commitment.block_number));
let number = vote.commitment.block_number;
let set_id = vote.commitment.validator_set_id;
assert!(!gv.votes_filter.read().is_live(number, set_id));
let res = gv.validate(&mut context, &sender, &vote.encode());
@@ -404,14 +362,13 @@ mod tests {
#[test]
fn messages_allowed_and_expired() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
let sender = sc_network::PeerId::random();
let topic = Default::default();
let intent = MessageIntent::Broadcast;
// note round 2 and 3, then conclude 2
gv.note_round(2u64);
gv.note_round(3u64);
gv.conclude_round(2u64);
// conclude 2
gv.update_filter(GossipVoteFilter { start: 2, end: 10, validator_set_id: 0 });
let mut allowed = gv.message_allowed();
let mut expired = gv.message_expired();
@@ -447,6 +404,7 @@ mod tests {
#[test]
fn messages_rebroadcast() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
let sender = sc_network::PeerId::random();
let topic = Default::default();