mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 18:01:03 +00:00
BEEFY: optimize voter event loop for fewer 'active' wakeups (#12760)
* client/beefy: remove high-freq network events from main loop Network events are many and very frequent, remove the net-event-stream from the main voter loop and drastically reduce BEEFY voter task 'wakeups'. Instead have the `GossipValidator` track known peers as it already has callbacks for that coming from `GossipEngine`. Signed-off-by: acatangiu <adrian@parity.io>
This commit is contained in:
@@ -16,42 +16,6 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, VecDeque},
|
||||
fmt::Debug,
|
||||
marker::PhantomData,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use codec::{Codec, Decode, Encode};
|
||||
use futures::{stream::Fuse, FutureExt, StreamExt};
|
||||
use log::{debug, error, info, log_enabled, trace, warn};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
|
||||
use sc_network_common::{
|
||||
protocol::event::Event as NetEvent,
|
||||
service::{NetworkEventStream, NetworkRequest},
|
||||
};
|
||||
use sc_network_gossip::GossipEngine;
|
||||
|
||||
use sp_api::{BlockId, ProvideRuntimeApi};
|
||||
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
|
||||
use sp_consensus::SyncOracle;
|
||||
use sp_mmr_primitives::MmrApi;
|
||||
use sp_runtime::{
|
||||
generic::OpaqueDigestItemId,
|
||||
traits::{Block, Header, NumberFor, Zero},
|
||||
SaturatedConversion,
|
||||
};
|
||||
|
||||
use beefy_primitives::{
|
||||
crypto::{AuthorityId, Signature},
|
||||
BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment,
|
||||
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
|
||||
};
|
||||
use sc_utils::notification::NotificationReceiver;
|
||||
|
||||
use crate::{
|
||||
communication::{
|
||||
gossip::{topic, GossipValidator},
|
||||
@@ -63,7 +27,34 @@ use crate::{
|
||||
metric_inc, metric_set,
|
||||
metrics::Metrics,
|
||||
round::Rounds,
|
||||
BeefyVoterLinks, KnownPeers,
|
||||
BeefyVoterLinks,
|
||||
};
|
||||
use beefy_primitives::{
|
||||
crypto::{AuthorityId, Signature},
|
||||
BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment,
|
||||
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
|
||||
};
|
||||
use codec::{Codec, Decode, Encode};
|
||||
use futures::{stream::Fuse, FutureExt, StreamExt};
|
||||
use log::{debug, error, info, log_enabled, trace, warn};
|
||||
use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
|
||||
use sc_network_common::service::{NetworkEventStream, NetworkRequest};
|
||||
use sc_network_gossip::GossipEngine;
|
||||
use sc_utils::notification::NotificationReceiver;
|
||||
use sp_api::{BlockId, ProvideRuntimeApi};
|
||||
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
|
||||
use sp_consensus::SyncOracle;
|
||||
use sp_mmr_primitives::MmrApi;
|
||||
use sp_runtime::{
|
||||
generic::OpaqueDigestItemId,
|
||||
traits::{Block, Header, NumberFor, Zero},
|
||||
SaturatedConversion,
|
||||
};
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, VecDeque},
|
||||
fmt::Debug,
|
||||
marker::PhantomData,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
pub(crate) enum RoundAction {
|
||||
@@ -253,7 +244,6 @@ pub(crate) struct WorkerParams<B: Block, BE, P, R, N> {
|
||||
pub payload_provider: P,
|
||||
pub network: N,
|
||||
pub key_store: BeefyKeystore,
|
||||
pub known_peers: Arc<Mutex<KnownPeers<B>>>,
|
||||
pub gossip_engine: GossipEngine<B>,
|
||||
pub gossip_validator: Arc<GossipValidator<B>>,
|
||||
pub on_demand_justifications: OnDemandJustificationsEngine<B, R>,
|
||||
@@ -305,7 +295,6 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, R, N> {
|
||||
key_store: BeefyKeystore,
|
||||
|
||||
// communication
|
||||
known_peers: Arc<Mutex<KnownPeers<B>>>,
|
||||
gossip_engine: GossipEngine<B>,
|
||||
gossip_validator: Arc<GossipValidator<B>>,
|
||||
on_demand_justifications: OnDemandJustificationsEngine<B, R>,
|
||||
@@ -349,7 +338,6 @@ where
|
||||
gossip_engine,
|
||||
gossip_validator,
|
||||
on_demand_justifications,
|
||||
known_peers,
|
||||
links,
|
||||
metrics,
|
||||
persisted_state,
|
||||
@@ -359,7 +347,6 @@ where
|
||||
backend,
|
||||
payload_provider,
|
||||
network,
|
||||
known_peers,
|
||||
key_store,
|
||||
gossip_engine,
|
||||
gossip_validator,
|
||||
@@ -783,6 +770,29 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_new_state(&mut self) {
|
||||
// Handle pending justifications and/or votes for now GRANDPA finalized blocks.
|
||||
if let Err(err) = self.try_pending_justif_and_votes() {
|
||||
debug!(target: "beefy", "🥩 {}", err);
|
||||
}
|
||||
|
||||
// Don't bother voting or requesting justifications during major sync.
|
||||
if !self.network.is_major_syncing() {
|
||||
// There were external events, 'state' is changed, author a vote if needed/possible.
|
||||
if let Err(err) = self.try_to_vote() {
|
||||
debug!(target: "beefy", "🥩 {}", err);
|
||||
}
|
||||
// If the current target is a mandatory block,
|
||||
// make sure there's also an on-demand justification request out for it.
|
||||
if let Some(block) = self.voting_oracle().mandatory_pending() {
|
||||
// This only starts new request if there isn't already an active one.
|
||||
self.on_demand_justifications.request(block);
|
||||
}
|
||||
} else {
|
||||
debug!(target: "beefy", "🥩 Skipping voting while major syncing.");
|
||||
}
|
||||
}
|
||||
|
||||
/// Main loop for BEEFY worker.
|
||||
///
|
||||
/// Wait for BEEFY runtime pallet to be available, then start the main async loop
|
||||
@@ -794,7 +804,6 @@ where
|
||||
) {
|
||||
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block());
|
||||
|
||||
let mut network_events = self.network.event_stream("network-gossip").fuse();
|
||||
let mut votes = Box::pin(
|
||||
self.gossip_engine
|
||||
.messages_for(topic::<B>())
|
||||
@@ -810,25 +819,12 @@ where
|
||||
);
|
||||
|
||||
loop {
|
||||
// Don't bother voting or requesting justifications during major sync.
|
||||
if !self.network.is_major_syncing() {
|
||||
// If the current target is a mandatory block,
|
||||
// make sure there's also an on-demand justification request out for it.
|
||||
if let Some(block) = self.voting_oracle().mandatory_pending() {
|
||||
// This only starts new request if there isn't already an active one.
|
||||
self.on_demand_justifications.request(block);
|
||||
}
|
||||
// There were external events, 'state' is changed, author a vote if needed/possible.
|
||||
if let Err(err) = self.try_to_vote() {
|
||||
debug!(target: "beefy", "🥩 {}", err);
|
||||
}
|
||||
} else {
|
||||
debug!(target: "beefy", "🥩 Skipping voting while major syncing.");
|
||||
}
|
||||
// Act on changed 'state'.
|
||||
self.process_new_state();
|
||||
|
||||
let mut gossip_engine = &mut self.gossip_engine;
|
||||
// Wait for, and handle external events.
|
||||
// The branches below only change 'state', actual voting happen afterwards,
|
||||
// The branches below only change 'state', actual voting happens afterwards,
|
||||
// based on the new resulting 'state'.
|
||||
futures::select_biased! {
|
||||
// Use `select_biased!` to prioritize order below.
|
||||
@@ -837,15 +833,6 @@ where
|
||||
error!(target: "beefy", "🥩 Gossip engine has terminated, closing worker.");
|
||||
return;
|
||||
},
|
||||
// Keep track of connected peers.
|
||||
net_event = network_events.next() => {
|
||||
if let Some(net_event) = net_event {
|
||||
self.handle_network_event(net_event);
|
||||
} else {
|
||||
error!(target: "beefy", "🥩 Network events stream terminated, closing worker.");
|
||||
return;
|
||||
}
|
||||
},
|
||||
// Process finality notifications first since these drive the voter.
|
||||
notification = finality_notifications.next() => {
|
||||
if let Some(notification) = notification {
|
||||
@@ -888,25 +875,6 @@ where
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// Handle pending justifications and/or votes for now GRANDPA finalized blocks.
|
||||
if let Err(err) = self.try_pending_justif_and_votes() {
|
||||
debug!(target: "beefy", "🥩 {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update known peers based on network events.
|
||||
fn handle_network_event(&mut self, event: NetEvent) {
|
||||
match event {
|
||||
NetEvent::SyncConnected { remote } => {
|
||||
self.known_peers.lock().add_new(remote);
|
||||
},
|
||||
NetEvent::SyncDisconnected { remote } => {
|
||||
self.known_peers.lock().remove(&remote);
|
||||
},
|
||||
// We don't care about other events.
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -976,11 +944,11 @@ pub(crate) mod tests {
|
||||
create_beefy_keystore, get_beefy_streams, make_beefy_ids, two_validators::TestApi,
|
||||
BeefyPeer, BeefyTestNet,
|
||||
},
|
||||
BeefyRPCLinks,
|
||||
BeefyRPCLinks, KnownPeers,
|
||||
};
|
||||
|
||||
use beefy_primitives::{known_payloads, mmr::MmrRootProvider};
|
||||
use futures::{executor::block_on, future::poll_fn, task::Poll};
|
||||
use parking_lot::Mutex;
|
||||
use sc_client_api::{Backend as BackendT, HeaderBackend};
|
||||
use sc_network::NetworkService;
|
||||
use sc_network_test::TestNetFactory;
|
||||
@@ -1058,7 +1026,7 @@ pub(crate) mod tests {
|
||||
network.clone(),
|
||||
api.clone(),
|
||||
"/beefy/justifs/1".into(),
|
||||
known_peers.clone(),
|
||||
known_peers,
|
||||
);
|
||||
let at = BlockId::number(Zero::zero());
|
||||
let genesis_header = backend.blockchain().expect_header(at).unwrap();
|
||||
@@ -1074,7 +1042,6 @@ pub(crate) mod tests {
|
||||
backend,
|
||||
payload_provider,
|
||||
key_store: Some(keystore).into(),
|
||||
known_peers,
|
||||
links,
|
||||
gossip_engine,
|
||||
gossip_validator,
|
||||
|
||||
Reference in New Issue
Block a user