mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-08 19:28:01 +00:00
sc-consensus-beefy: improve gossip logic (#1852)
- Remove cached messages used for deduplication in `GossipValidator` since they're already deduplicated in upper layer `NetworkGossip`. - Add cache for "justified rounds" to quickly discard any further (even if potentially different) justifications at the gossip level, once a valid one (for a respective round) is submitted to the worker. - Add short-circuit in worker `finalize()` method to not attempt to finalize same block multiple times (for example when we get justifications for same block from multiple components like block-import, gossip or on-demand). - Change a test which had A LOT of latency in syncing blocks for some weird reason and would only run after ~150seconds. It now runs instantly. Fixes https://github.com/paritytech/polkadot-sdk/issues/1728
This commit is contained in:
@@ -16,11 +16,10 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
||||
use std::{collections::BTreeSet, sync::Arc, time::Duration};
|
||||
|
||||
use sc_network::{PeerId, ReputationChange};
|
||||
use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};
|
||||
use sp_core::hashing::twox_64;
|
||||
use sp_runtime::traits::{Block, Hash, Header, NumberFor};
|
||||
|
||||
use codec::{Decode, DecodeAll, Encode};
|
||||
@@ -115,9 +114,6 @@ where
|
||||
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy-justifications")
|
||||
}
|
||||
|
||||
/// A type that represents hash of the message.
|
||||
pub type MessageHash = [u8; 8];
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct GossipFilterCfg<'a, B: Block> {
|
||||
pub start: NumberFor<B>,
|
||||
@@ -133,18 +129,21 @@ struct FilterInner<B: Block> {
|
||||
}
|
||||
|
||||
struct Filter<B: Block> {
|
||||
// specifies live rounds
|
||||
inner: Option<FilterInner<B>>,
|
||||
live_votes: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>,
|
||||
// cache of seen valid justifications in active rounds
|
||||
rounds_with_valid_proofs: BTreeSet<NumberFor<B>>,
|
||||
}
|
||||
|
||||
impl<B: Block> Filter<B> {
|
||||
pub fn new() -> Self {
|
||||
Self { inner: None, live_votes: BTreeMap::new() }
|
||||
Self { inner: None, rounds_with_valid_proofs: BTreeSet::new() }
|
||||
}
|
||||
|
||||
/// Update filter to new `start` and `set_id`.
|
||||
fn update(&mut self, cfg: GossipFilterCfg<B>) {
|
||||
self.live_votes.retain(|&round, _| round >= cfg.start && round <= cfg.end);
|
||||
self.rounds_with_valid_proofs
|
||||
.retain(|&round| round >= cfg.start && round <= cfg.end);
|
||||
// only clone+overwrite big validator_set if set_id changed
|
||||
match self.inner.as_mut() {
|
||||
Some(f) if f.validator_set.id() == cfg.validator_set.id() => {
|
||||
@@ -203,14 +202,14 @@ impl<B: Block> Filter<B> {
|
||||
.unwrap_or(Consider::RejectOutOfScope)
|
||||
}
|
||||
|
||||
/// Add new _known_ `hash` to the round's known votes.
|
||||
fn add_known_vote(&mut self, round: NumberFor<B>, hash: MessageHash) {
|
||||
self.live_votes.entry(round).or_default().insert(hash);
|
||||
/// Add new _known_ `round` to the set of seen valid justifications.
|
||||
fn mark_round_as_proven(&mut self, round: NumberFor<B>) {
|
||||
self.rounds_with_valid_proofs.insert(round);
|
||||
}
|
||||
|
||||
/// Check if `hash` is already part of round's known votes.
|
||||
fn is_known_vote(&self, round: NumberFor<B>, hash: &MessageHash) -> bool {
|
||||
self.live_votes.get(&round).map(|known| known.contains(hash)).unwrap_or(false)
|
||||
/// Check if `round` is already part of seen valid justifications.
|
||||
fn is_already_proven(&self, round: NumberFor<B>) -> bool {
|
||||
self.rounds_with_valid_proofs.contains(&round)
|
||||
}
|
||||
|
||||
fn validator_set(&self) -> Option<&ValidatorSet<AuthorityId>> {
|
||||
@@ -273,16 +272,13 @@ where
|
||||
&self,
|
||||
vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>,
|
||||
sender: &PeerId,
|
||||
data: &[u8],
|
||||
) -> Action<B::Hash> {
|
||||
let msg_hash = twox_64(data);
|
||||
let round = vote.commitment.block_number;
|
||||
let set_id = vote.commitment.validator_set_id;
|
||||
self.known_peers.lock().note_vote_for(*sender, round);
|
||||
|
||||
// Verify general usefulness of the message.
|
||||
// We are going to discard old votes right away (without verification)
|
||||
// Also we keep track of already received votes to avoid verifying duplicates.
|
||||
// We are going to discard old votes right away (without verification).
|
||||
{
|
||||
let filter = self.gossip_filter.read();
|
||||
|
||||
@@ -293,10 +289,6 @@ where
|
||||
Consider::Accept => {},
|
||||
}
|
||||
|
||||
if filter.is_known_vote(round, &msg_hash) {
|
||||
return Action::Keep(self.votes_topic, benefit::KNOWN_VOTE_MESSAGE)
|
||||
}
|
||||
|
||||
// ensure authority is part of the set.
|
||||
if !filter
|
||||
.validator_set()
|
||||
@@ -309,7 +301,6 @@ where
|
||||
}
|
||||
|
||||
if BeefyKeystore::verify(&vote.id, &vote.signature, &vote.commitment.encode()) {
|
||||
self.gossip_filter.write().add_known_vote(round, msg_hash);
|
||||
Action::Keep(self.votes_topic, benefit::VOTE_MESSAGE)
|
||||
} else {
|
||||
debug!(
|
||||
@@ -328,34 +319,46 @@ where
|
||||
let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
|
||||
self.known_peers.lock().note_vote_for(*sender, round);
|
||||
|
||||
let guard = self.gossip_filter.read();
|
||||
// Verify general usefulness of the justification.
|
||||
match guard.consider_finality_proof(round, set_id) {
|
||||
Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
|
||||
Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
|
||||
Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
|
||||
Consider::Accept => {},
|
||||
let action = {
|
||||
let guard = self.gossip_filter.read();
|
||||
|
||||
// Verify general usefulness of the justification.
|
||||
match guard.consider_finality_proof(round, set_id) {
|
||||
Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
|
||||
Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
|
||||
Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
|
||||
Consider::Accept => {},
|
||||
}
|
||||
|
||||
if guard.is_already_proven(round) {
|
||||
return Action::Discard(benefit::NOT_INTERESTED)
|
||||
}
|
||||
|
||||
// Verify justification signatures.
|
||||
guard
|
||||
.validator_set()
|
||||
.map(|validator_set| {
|
||||
if let Err((_, signatures_checked)) =
|
||||
verify_with_validator_set::<B>(round, validator_set, &proof)
|
||||
{
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender
|
||||
);
|
||||
let mut cost = cost::INVALID_PROOF;
|
||||
cost.value +=
|
||||
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
|
||||
Action::Discard(cost)
|
||||
} else {
|
||||
Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF)
|
||||
}
|
||||
})
|
||||
.unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE))
|
||||
};
|
||||
if matches!(action, Action::Keep(_, _)) {
|
||||
self.gossip_filter.write().mark_round_as_proven(round);
|
||||
}
|
||||
// Verify justification signatures.
|
||||
guard
|
||||
.validator_set()
|
||||
.map(|validator_set| {
|
||||
if let Err((_, signatures_checked)) =
|
||||
verify_with_validator_set::<B>(round, validator_set, &proof)
|
||||
{
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender
|
||||
);
|
||||
let mut cost = cost::INVALID_PROOF;
|
||||
cost.value +=
|
||||
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
|
||||
Action::Discard(cost)
|
||||
} else {
|
||||
Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF)
|
||||
}
|
||||
})
|
||||
.unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE))
|
||||
action
|
||||
}
|
||||
}
|
||||
|
||||
@@ -375,7 +378,7 @@ where
|
||||
) -> ValidationResult<B::Hash> {
|
||||
let raw = data;
|
||||
let action = match GossipMessage::<B>::decode_all(&mut data) {
|
||||
Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, raw),
|
||||
Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender),
|
||||
Ok(GossipMessage::FinalityProof(proof)) => self.validate_finality_proof(proof, sender),
|
||||
Err(e) => {
|
||||
debug!(target: LOG_TARGET, "Error decoding message: {}", e);
|
||||
@@ -483,41 +486,6 @@ pub(crate) mod tests {
|
||||
};
|
||||
use sp_keystore::{testing::MemoryKeystore, Keystore};
|
||||
|
||||
#[test]
|
||||
fn known_votes_insert_remove() {
|
||||
let mut filter = Filter::<Block>::new();
|
||||
let msg_hash = twox_64(b"data");
|
||||
let keys = vec![Keyring::Alice.public()];
|
||||
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 1).unwrap();
|
||||
|
||||
filter.add_known_vote(1, msg_hash);
|
||||
filter.add_known_vote(1, msg_hash);
|
||||
filter.add_known_vote(2, msg_hash);
|
||||
assert_eq!(filter.live_votes.len(), 2);
|
||||
|
||||
filter.add_known_vote(3, msg_hash);
|
||||
assert!(filter.is_known_vote(3, &msg_hash));
|
||||
assert!(!filter.is_known_vote(3, &twox_64(b"other")));
|
||||
assert!(!filter.is_known_vote(4, &msg_hash));
|
||||
assert_eq!(filter.live_votes.len(), 3);
|
||||
|
||||
assert!(filter.inner.is_none());
|
||||
assert_eq!(filter.consider_vote(1, 1), Consider::RejectOutOfScope);
|
||||
|
||||
filter.update(GossipFilterCfg { start: 3, end: 10, validator_set: &validator_set });
|
||||
assert_eq!(filter.live_votes.len(), 1);
|
||||
assert!(filter.live_votes.contains_key(&3));
|
||||
assert_eq!(filter.consider_vote(2, 1), Consider::RejectPast);
|
||||
assert_eq!(filter.consider_vote(3, 1), Consider::Accept);
|
||||
assert_eq!(filter.consider_vote(4, 1), Consider::Accept);
|
||||
assert_eq!(filter.consider_vote(20, 1), Consider::RejectFuture);
|
||||
assert_eq!(filter.consider_vote(4, 2), Consider::RejectFuture);
|
||||
|
||||
let validator_set = ValidatorSet::<AuthorityId>::new(keys, 2).unwrap();
|
||||
filter.update(GossipFilterCfg { start: 5, end: 10, validator_set: &validator_set });
|
||||
assert!(filter.live_votes.is_empty());
|
||||
}
|
||||
|
||||
struct TestContext;
|
||||
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
|
||||
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
|
||||
@@ -610,20 +578,6 @@ pub(crate) mod tests {
|
||||
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
|
||||
expected_report.cost_benefit = benefit::VOTE_MESSAGE;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(
|
||||
gv.gossip_filter
|
||||
.read()
|
||||
.live_votes
|
||||
.get(&vote.commitment.block_number)
|
||||
.map(|x| x.len()),
|
||||
Some(1)
|
||||
);
|
||||
|
||||
// second time we should hit the cache
|
||||
let res = gv.validate(&mut context, &sender, &encoded);
|
||||
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
|
||||
expected_report.cost_benefit = benefit::KNOWN_VOTE_MESSAGE;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
|
||||
// reject vote, voter not in validator set
|
||||
let mut bad_vote = vote.clone();
|
||||
@@ -692,7 +646,7 @@ pub(crate) mod tests {
|
||||
// reject proof, bad signatures (Bob instead of Alice)
|
||||
let bad_validator_set =
|
||||
ValidatorSet::<AuthorityId>::new(vec![Keyring::Bob.public()], 0).unwrap();
|
||||
let proof = dummy_proof(20, &bad_validator_set);
|
||||
let proof = dummy_proof(21, &bad_validator_set);
|
||||
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
|
||||
let res = gv.validate(&mut context, &sender, &encoded_proof);
|
||||
assert!(matches!(res, ValidationResult::Discard));
|
||||
|
||||
@@ -102,7 +102,7 @@ mod cost {
|
||||
mod benefit {
|
||||
use sc_network::ReputationChange as Rep;
|
||||
pub(super) const VOTE_MESSAGE: Rep = Rep::new(100, "BEEFY: Round vote message");
|
||||
pub(super) const KNOWN_VOTE_MESSAGE: Rep = Rep::new(50, "BEEFY: Known vote");
|
||||
pub(super) const NOT_INTERESTED: Rep = Rep::new(10, "BEEFY: Not interested in round");
|
||||
pub(super) const VALIDATED_PROOF: Rep = Rep::new(100, "BEEFY: Justification");
|
||||
}
|
||||
|
||||
|
||||
@@ -1302,7 +1302,7 @@ async fn gossipped_finality_proofs() {
|
||||
// Only Alice and Bob are running the voter -> finality threshold not reached
|
||||
let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob];
|
||||
let validator_set = ValidatorSet::new(make_beefy_ids(&validators), 0).unwrap();
|
||||
let session_len = 30;
|
||||
let session_len = 10;
|
||||
let min_block_delta = 1;
|
||||
|
||||
let mut net = BeefyTestNet::new(3);
|
||||
@@ -1332,14 +1332,8 @@ async fn gossipped_finality_proofs() {
|
||||
|
||||
let net = Arc::new(Mutex::new(net));
|
||||
|
||||
// Pump net + Charlie gossip to see peers.
|
||||
let timeout = Box::pin(tokio::time::sleep(Duration::from_millis(200)));
|
||||
let gossip_engine_pump = &mut charlie_gossip_engine;
|
||||
let pump_with_timeout = future::select(gossip_engine_pump, timeout);
|
||||
run_until(pump_with_timeout, &net).await;
|
||||
|
||||
// push 10 blocks
|
||||
let hashes = net.lock().generate_blocks_and_sync(10, session_len, &validator_set, true).await;
|
||||
// push 42 blocks
|
||||
let hashes = net.lock().generate_blocks_and_sync(42, session_len, &validator_set, true).await;
|
||||
|
||||
let peers = peers.into_iter().enumerate();
|
||||
|
||||
|
||||
@@ -604,6 +604,11 @@ where
|
||||
VersionedFinalityProof::V1(ref sc) => sc.commitment.block_number,
|
||||
};
|
||||
|
||||
if block_num <= self.persisted_state.voting_oracle.best_beefy_block {
|
||||
// we've already finalized this round before, short-circuit.
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// Finalize inner round and update voting_oracle state.
|
||||
self.persisted_state.voting_oracle.finalize(block_num)?;
|
||||
|
||||
@@ -629,7 +634,7 @@ where
|
||||
self.backend
|
||||
.append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode()))
|
||||
}) {
|
||||
error!(
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"🥩 Error {:?} on appending justification: {:?}", e, finality_proof
|
||||
);
|
||||
@@ -648,7 +653,7 @@ where
|
||||
}
|
||||
|
||||
/// Handle previously buffered justifications, that now land in the voting interval.
|
||||
fn try_pending_justififactions(&mut self) -> Result<(), Error> {
|
||||
fn try_pending_justifications(&mut self) -> Result<(), Error> {
|
||||
// Interval of blocks for which we can process justifications and votes right now.
|
||||
let (start, end) = self.voting_oracle().accepted_interval()?;
|
||||
// Process pending justifications.
|
||||
@@ -782,7 +787,7 @@ where
|
||||
|
||||
fn process_new_state(&mut self) {
|
||||
// Handle pending justifications and/or votes for now GRANDPA finalized blocks.
|
||||
if let Err(err) = self.try_pending_justififactions() {
|
||||
if let Err(err) = self.try_pending_justifications() {
|
||||
debug!(target: LOG_TARGET, "🥩 {}", err);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user