mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-18 03:41:02 +00:00
sc-beefy-consensus: Remove unneeded stream. (#4015)
The stream was just used to communicate from the validator the peer reports back to the gossip engine. Internally the gossip engine just forwards these reports to the networking engine. So, we can just do this directly. The reporting stream was also pumped [in the worker behind the engine](https://github.com/paritytech/polkadot-sdk/blob/9d6261892814fa27c97881c0321c008d7340b54b/substrate/client/consensus/beefy/src/worker.rs#L939). This means if there was a lot of data incoming over the engine, the reporting stream was almost never processed and thus, it could have started to grow and we have seen issues around this. Partly Closes: https://github.com/paritytech/polkadot-sdk/issues/3945
This commit is contained in:
@@ -18,21 +18,17 @@
|
||||
|
||||
use std::{collections::BTreeSet, sync::Arc, time::Duration};
|
||||
|
||||
use sc_network::{PeerId, ReputationChange};
|
||||
use sc_network::{NetworkPeers, PeerId, ReputationChange};
|
||||
use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};
|
||||
use sp_runtime::traits::{Block, Hash, Header, NumberFor};
|
||||
|
||||
use codec::{Decode, DecodeAll, Encode};
|
||||
use log::{debug, trace};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
||||
use wasm_timer::Instant;
|
||||
|
||||
use crate::{
|
||||
communication::{
|
||||
benefit, cost,
|
||||
peers::{KnownPeers, PeerReport},
|
||||
},
|
||||
communication::{benefit, cost, peers::KnownPeers},
|
||||
justification::{
|
||||
proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof,
|
||||
},
|
||||
@@ -223,7 +219,7 @@ impl<B: Block> Filter<B> {
|
||||
/// rejected/expired.
|
||||
///
|
||||
///All messaging is handled in a single BEEFY global topic.
|
||||
pub(crate) struct GossipValidator<B>
|
||||
pub(crate) struct GossipValidator<B, N>
|
||||
where
|
||||
B: Block,
|
||||
{
|
||||
@@ -232,26 +228,22 @@ where
|
||||
gossip_filter: RwLock<Filter<B>>,
|
||||
next_rebroadcast: Mutex<Instant>,
|
||||
known_peers: Arc<Mutex<KnownPeers<B>>>,
|
||||
report_sender: TracingUnboundedSender<PeerReport>,
|
||||
network: Arc<N>,
|
||||
}
|
||||
|
||||
impl<B> GossipValidator<B>
|
||||
impl<B, N> GossipValidator<B, N>
|
||||
where
|
||||
B: Block,
|
||||
{
|
||||
pub(crate) fn new(
|
||||
known_peers: Arc<Mutex<KnownPeers<B>>>,
|
||||
) -> (GossipValidator<B>, TracingUnboundedReceiver<PeerReport>) {
|
||||
let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 100_000);
|
||||
let val = GossipValidator {
|
||||
pub(crate) fn new(known_peers: Arc<Mutex<KnownPeers<B>>>, network: Arc<N>) -> Self {
|
||||
Self {
|
||||
votes_topic: votes_topic::<B>(),
|
||||
justifs_topic: proofs_topic::<B>(),
|
||||
gossip_filter: RwLock::new(Filter::new()),
|
||||
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
|
||||
known_peers,
|
||||
report_sender: tx,
|
||||
};
|
||||
(val, rx)
|
||||
network,
|
||||
}
|
||||
}
|
||||
|
||||
/// Update gossip validator filter.
|
||||
@@ -265,9 +257,15 @@ where
|
||||
);
|
||||
self.gossip_filter.write().update(filter);
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, N> GossipValidator<B, N>
|
||||
where
|
||||
B: Block,
|
||||
N: NetworkPeers,
|
||||
{
|
||||
fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
|
||||
let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
|
||||
self.network.report_peer(who, cost_benefit);
|
||||
}
|
||||
|
||||
fn validate_vote(
|
||||
@@ -370,9 +368,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> Validator<B> for GossipValidator<B>
|
||||
impl<B, N> Validator<B> for GossipValidator<B, N>
|
||||
where
|
||||
B: Block,
|
||||
N: NetworkPeers + Send + Sync,
|
||||
{
|
||||
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, who: &PeerId) {
|
||||
self.known_peers.lock().remove(who);
|
||||
@@ -486,7 +485,7 @@ where
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::keystore::BeefyKeystore;
|
||||
use crate::{communication::peers::PeerReport, keystore::BeefyKeystore};
|
||||
use sc_network_test::Block;
|
||||
use sp_application_crypto::key_types::BEEFY as BEEFY_KEY_TYPE;
|
||||
use sp_consensus_beefy::{
|
||||
@@ -495,20 +494,109 @@ pub(crate) mod tests {
|
||||
};
|
||||
use sp_keystore::{testing::MemoryKeystore, Keystore};
|
||||
|
||||
pub(crate) struct TestNetwork {
|
||||
report_sender: futures::channel::mpsc::UnboundedSender<PeerReport>,
|
||||
}
|
||||
|
||||
impl TestNetwork {
|
||||
pub fn new() -> (Self, futures::channel::mpsc::UnboundedReceiver<PeerReport>) {
|
||||
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||
|
||||
(Self { report_sender: tx }, rx)
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkPeers for TestNetwork {
|
||||
fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn set_authorized_only(&self, _: bool) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
|
||||
let _ = self.report_sender.unbounded_send(PeerReport { who: peer_id, cost_benefit });
|
||||
}
|
||||
|
||||
fn peer_reputation(&self, _: &PeerId) -> i32 {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn accept_unreserved_peers(&self) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn deny_unreserved_peers(&self) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn add_reserved_peer(
|
||||
&self,
|
||||
_: sc_network::config::MultiaddrWithPeerId,
|
||||
) -> Result<(), String> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn remove_reserved_peer(&self, _: PeerId) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn set_reserved_peers(
|
||||
&self,
|
||||
_: sc_network::ProtocolName,
|
||||
_: std::collections::HashSet<sc_network::Multiaddr>,
|
||||
) -> Result<(), String> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn add_peers_to_reserved_set(
|
||||
&self,
|
||||
_: sc_network::ProtocolName,
|
||||
_: std::collections::HashSet<sc_network::Multiaddr>,
|
||||
) -> Result<(), String> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn remove_peers_from_reserved_set(
|
||||
&self,
|
||||
_: sc_network::ProtocolName,
|
||||
_: Vec<PeerId>,
|
||||
) -> Result<(), String> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn sync_num_connected(&self) -> usize {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
struct TestContext;
|
||||
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
|
||||
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
|
||||
todo!()
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec<u8>, _force: bool) {}
|
||||
|
||||
fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec<u8>) {
|
||||
todo!()
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn send_topic(&mut self, _who: &sc_network::PeerId, _topic: B::Hash, _force: bool) {
|
||||
todo!()
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -560,8 +648,13 @@ pub(crate) mod tests {
|
||||
fn should_validate_messages() {
|
||||
let keys = vec![Keyring::<AuthorityId>::Alice.public()];
|
||||
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
|
||||
let (gv, mut report_stream) =
|
||||
GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
|
||||
|
||||
let (network, mut report_stream) = TestNetwork::new();
|
||||
|
||||
let gv = GossipValidator::<Block, _>::new(
|
||||
Arc::new(Mutex::new(KnownPeers::new())),
|
||||
Arc::new(network),
|
||||
);
|
||||
let sender = PeerId::random();
|
||||
let mut context = TestContext;
|
||||
|
||||
@@ -574,7 +667,7 @@ pub(crate) mod tests {
|
||||
let mut expected_report = PeerReport { who: sender, cost_benefit: expected_cost };
|
||||
let res = gv.validate(&mut context, &sender, bad_encoding);
|
||||
assert!(matches!(res, ValidationResult::Discard));
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
|
||||
// verify votes validation
|
||||
|
||||
@@ -585,14 +678,14 @@ pub(crate) mod tests {
|
||||
let res = gv.validate(&mut context, &sender, &encoded);
|
||||
assert!(matches!(res, ValidationResult::Discard));
|
||||
// nothing reported
|
||||
assert!(report_stream.try_recv().is_err());
|
||||
assert!(report_stream.try_next().is_err());
|
||||
|
||||
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
|
||||
// nothing in cache first time
|
||||
let res = gv.validate(&mut context, &sender, &encoded);
|
||||
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
|
||||
expected_report.cost_benefit = benefit::VOTE_MESSAGE;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
|
||||
// reject vote, voter not in validator set
|
||||
let mut bad_vote = vote.clone();
|
||||
@@ -601,7 +694,7 @@ pub(crate) mod tests {
|
||||
let res = gv.validate(&mut context, &sender, &bad_vote);
|
||||
assert!(matches!(res, ValidationResult::Discard));
|
||||
expected_report.cost_benefit = cost::UNKNOWN_VOTER;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
|
||||
// reject if the round is not GRANDPA finalized
|
||||
gv.update_filter(GossipFilterCfg { start: 1, end: 2, validator_set: &validator_set });
|
||||
@@ -611,7 +704,7 @@ pub(crate) mod tests {
|
||||
let res = gv.validate(&mut context, &sender, &encoded);
|
||||
assert!(matches!(res, ValidationResult::Discard));
|
||||
expected_report.cost_benefit = cost::FUTURE_MESSAGE;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
|
||||
// reject if the round is not live anymore
|
||||
gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set });
|
||||
@@ -621,7 +714,7 @@ pub(crate) mod tests {
|
||||
let res = gv.validate(&mut context, &sender, &encoded);
|
||||
assert!(matches!(res, ValidationResult::Discard));
|
||||
expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
|
||||
// now verify proofs validation
|
||||
|
||||
@@ -631,7 +724,7 @@ pub(crate) mod tests {
|
||||
let res = gv.validate(&mut context, &sender, &encoded_proof);
|
||||
assert!(matches!(res, ValidationResult::Discard));
|
||||
expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
|
||||
// accept next proof with good set_id
|
||||
let proof = dummy_proof(7, &validator_set);
|
||||
@@ -639,7 +732,7 @@ pub(crate) mod tests {
|
||||
let res = gv.validate(&mut context, &sender, &encoded_proof);
|
||||
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
|
||||
expected_report.cost_benefit = benefit::VALIDATED_PROOF;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
|
||||
// accept future proof with good set_id
|
||||
let proof = dummy_proof(20, &validator_set);
|
||||
@@ -647,7 +740,7 @@ pub(crate) mod tests {
|
||||
let res = gv.validate(&mut context, &sender, &encoded_proof);
|
||||
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
|
||||
expected_report.cost_benefit = benefit::VALIDATED_PROOF;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
|
||||
// reject proof, future set_id
|
||||
let bad_validator_set = ValidatorSet::<AuthorityId>::new(keys, 1).unwrap();
|
||||
@@ -656,7 +749,7 @@ pub(crate) mod tests {
|
||||
let res = gv.validate(&mut context, &sender, &encoded_proof);
|
||||
assert!(matches!(res, ValidationResult::Discard));
|
||||
expected_report.cost_benefit = cost::FUTURE_MESSAGE;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
|
||||
// reject proof, bad signatures (Bob instead of Alice)
|
||||
let bad_validator_set =
|
||||
@@ -667,14 +760,17 @@ pub(crate) mod tests {
|
||||
assert!(matches!(res, ValidationResult::Discard));
|
||||
expected_report.cost_benefit = cost::INVALID_PROOF;
|
||||
expected_report.cost_benefit.value += cost::PER_SIGNATURE_CHECKED;
|
||||
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
|
||||
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn messages_allowed_and_expired() {
|
||||
let keys = vec![Keyring::Alice.public()];
|
||||
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
|
||||
let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
|
||||
let gv = GossipValidator::<Block, _>::new(
|
||||
Arc::new(Mutex::new(KnownPeers::new())),
|
||||
Arc::new(TestNetwork::new().0),
|
||||
);
|
||||
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
|
||||
let sender = sc_network::PeerId::random();
|
||||
let topic = Default::default();
|
||||
@@ -751,7 +847,10 @@ pub(crate) mod tests {
|
||||
fn messages_rebroadcast() {
|
||||
let keys = vec![Keyring::Alice.public()];
|
||||
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
|
||||
let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
|
||||
let gv = GossipValidator::<Block, _>::new(
|
||||
Arc::new(Mutex::new(KnownPeers::new())),
|
||||
Arc::new(TestNetwork::new().0),
|
||||
);
|
||||
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
|
||||
let sender = sc_network::PeerId::random();
|
||||
let topic = Default::default();
|
||||
|
||||
Reference in New Issue
Block a user