Pump the gossip engine while waiting for the BEEFY runtime pallet (memory leak fix) (#11694)

* Pump the gossip engine while waiting for the BEEFY runtime pallet

This fixes a memory leak when the BEEFY gadget is turned on, but
the runtime doesn't actually use BEEFY.

* Implement `FusedFuture` for `GossipEngine`

* Fuse futures outside of loops
This commit is contained in:
Koute
2022-06-21 00:52:43 +09:00
committed by GitHub
parent 94a7e278fa
commit 9cee3c699d
2 changed files with 54 additions and 36 deletions
+40 -35
View File
@@ -25,11 +25,10 @@ use std::{
}; };
use codec::{Codec, Decode, Encode}; use codec::{Codec, Decode, Encode};
use futures::{future, FutureExt, StreamExt}; use futures::StreamExt;
use log::{debug, error, info, log_enabled, trace, warn}; use log::{debug, error, info, log_enabled, trace, warn};
use parking_lot::Mutex;
use sc_client_api::{Backend, FinalityNotification, FinalityNotifications}; use sc_client_api::{Backend, FinalityNotification};
use sc_network_gossip::GossipEngine; use sc_network_gossip::GossipEngine;
use sp_api::{BlockId, ProvideRuntimeApi}; use sp_api::{BlockId, ProvideRuntimeApi};
@@ -80,7 +79,7 @@ pub(crate) struct BeefyWorker<B: Block, BE, C, R, SO> {
runtime: Arc<R>, runtime: Arc<R>,
key_store: BeefyKeystore, key_store: BeefyKeystore,
signed_commitment_sender: BeefySignedCommitmentSender<B>, signed_commitment_sender: BeefySignedCommitmentSender<B>,
gossip_engine: Arc<Mutex<GossipEngine<B>>>, gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>, gossip_validator: Arc<GossipValidator<B>>,
/// Min delta in block numbers between two blocks, BEEFY should vote on /// Min delta in block numbers between two blocks, BEEFY should vote on
min_block_delta: u32, min_block_delta: u32,
@@ -88,7 +87,6 @@ pub(crate) struct BeefyWorker<B: Block, BE, C, R, SO> {
rounds: Option<Rounds<Payload, B>>, rounds: Option<Rounds<Payload, B>>,
/// Buffer holding votes for blocks that the client hasn't seen finality for. /// Buffer holding votes for blocks that the client hasn't seen finality for.
pending_votes: BTreeMap<NumberFor<B>, Vec<VoteMessage<NumberFor<B>, AuthorityId, Signature>>>, pending_votes: BTreeMap<NumberFor<B>, Vec<VoteMessage<NumberFor<B>, AuthorityId, Signature>>>,
finality_notifications: FinalityNotifications<B>,
/// Best block we received a GRANDPA notification for /// Best block we received a GRANDPA notification for
best_grandpa_block_header: <B as Block>::Header, best_grandpa_block_header: <B as Block>::Header,
/// Best block a BEEFY voting round has been concluded for /// Best block a BEEFY voting round has been concluded for
@@ -143,14 +141,13 @@ where
runtime, runtime,
key_store, key_store,
signed_commitment_sender, signed_commitment_sender,
gossip_engine: Arc::new(Mutex::new(gossip_engine)), gossip_engine,
gossip_validator, gossip_validator,
// always target at least one block better than current best beefy // always target at least one block better than current best beefy
min_block_delta: min_block_delta.max(1), min_block_delta: min_block_delta.max(1),
metrics, metrics,
rounds: None, rounds: None,
pending_votes: BTreeMap::new(), pending_votes: BTreeMap::new(),
finality_notifications: client.finality_notification_stream(),
best_grandpa_block_header: last_finalized_header, best_grandpa_block_header: last_finalized_header,
best_beefy_block: None, best_beefy_block: None,
last_signed_id: 0, last_signed_id: 0,
@@ -471,15 +468,21 @@ where
true, true,
); );
self.gossip_engine.lock().gossip_message(topic::<B>(), encoded_message, false); self.gossip_engine.gossip_message(topic::<B>(), encoded_message, false);
} }
/// Wait for BEEFY runtime pallet to be available. /// Wait for BEEFY runtime pallet to be available.
async fn wait_for_runtime_pallet(&mut self) { async fn wait_for_runtime_pallet(&mut self) {
self.client let mut gossip_engine = &mut self.gossip_engine;
.finality_notification_stream() let mut finality_stream = self.client.finality_notification_stream().fuse();
.take_while(|notif| { loop {
let at = BlockId::hash(notif.header.hash()); futures::select! {
notif = finality_stream.next() => {
let notif = match notif {
Some(notif) => notif,
None => break
};
let at = BlockId::hash(notif.header.hash());
if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() { if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() {
if active.id() == GENESIS_AUTHORITY_SET_ID { if active.id() == GENESIS_AUTHORITY_SET_ID {
// When starting from genesis, there is no session boundary digest. // When starting from genesis, there is no session boundary digest.
@@ -490,18 +493,18 @@ where
// worker won't vote until it witnesses a session change. // worker won't vote until it witnesses a session change.
// Once we'll implement 'initial sync' (catch-up), the worker will be able to // Once we'll implement 'initial sync' (catch-up), the worker will be able to
// start voting right away. // start voting right away.
self.handle_finality_notification(notif); self.handle_finality_notification(&notif);
future::ready(false) break
} else { } else {
trace!(target: "beefy", "🥩 Finality notification: {:?}", notif); trace!(target: "beefy", "🥩 Finality notification: {:?}", notif);
debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available..."); debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available...");
future::ready(true)
} }
}) },
.for_each(|_| future::ready(())) _ = gossip_engine => {
.await; break
// get a new stream that provides _new_ notifications (from here on out) }
self.finality_notifications = self.client.finality_notification_stream(); }
}
} }
/// Main loop for BEEFY worker. /// Main loop for BEEFY worker.
@@ -512,16 +515,20 @@ where
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number()); info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number());
self.wait_for_runtime_pallet().await; self.wait_for_runtime_pallet().await;
let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::<B>()).filter_map( let mut finality_notifications = self.client.finality_notification_stream().fuse();
|notification| async move { let mut votes = Box::pin(
trace!(target: "beefy", "🥩 Got vote message: {:?}", notification); self.gossip_engine
.messages_for(topic::<B>())
.filter_map(|notification| async move {
trace!(target: "beefy", "🥩 Got vote message: {:?}", notification);
VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode( VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode(
&mut &notification.message[..], &mut &notification.message[..],
) )
.ok() .ok()
}, })
)); .fuse(),
);
loop { loop {
while self.sync_oracle.is_major_syncing() { while self.sync_oracle.is_major_syncing() {
@@ -529,18 +536,16 @@ where
futures_timer::Delay::new(Duration::from_secs(5)).await; futures_timer::Delay::new(Duration::from_secs(5)).await;
} }
let engine = self.gossip_engine.clone(); let mut gossip_engine = &mut self.gossip_engine;
let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx));
futures::select! { futures::select! {
notification = self.finality_notifications.next().fuse() => { notification = finality_notifications.next() => {
if let Some(notification) = notification { if let Some(notification) = notification {
self.handle_finality_notification(&notification); self.handle_finality_notification(&notification);
} else { } else {
return; return;
} }
}, },
vote = votes.next().fuse() => { vote = votes.next() => {
if let Some(vote) = vote { if let Some(vote) = vote {
let block_num = vote.commitment.block_number; let block_num = vote.commitment.block_number;
if block_num > *self.best_grandpa_block_header.number() { if block_num > *self.best_grandpa_block_header.number() {
@@ -563,7 +568,7 @@ where
return; return;
} }
}, },
_ = gossip_engine.fuse() => { _ = gossip_engine => {
error!(target: "beefy", "🥩 Gossip engine has terminated."); error!(target: "beefy", "🥩 Gossip engine has terminated.");
return; return;
} }
+14 -1
View File
@@ -53,6 +53,8 @@ pub struct GossipEngine<B: BlockT> {
message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>, message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
/// Buffered messages (see [`ForwardingState`]). /// Buffered messages (see [`ForwardingState`]).
forwarding_state: ForwardingState<B>, forwarding_state: ForwardingState<B>,
is_terminated: bool,
} }
/// A gossip engine receives messages from the network via the `network_event_stream` and forwards /// A gossip engine receives messages from the network via the `network_event_stream` and forwards
@@ -94,6 +96,8 @@ impl<B: BlockT> GossipEngine<B> {
network_event_stream, network_event_stream,
message_sinks: HashMap::new(), message_sinks: HashMap::new(),
forwarding_state: ForwardingState::Idle, forwarding_state: ForwardingState::Idle,
is_terminated: false,
} }
} }
@@ -214,7 +218,10 @@ impl<B: BlockT> Future for GossipEngine<B> {
Event::Dht(_) => {}, Event::Dht(_) => {},
}, },
// The network event stream closed. Do the same for [`GossipValidator`]. // The network event stream closed. Do the same for [`GossipValidator`].
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => {
self.is_terminated = true;
return Poll::Ready(())
},
Poll::Pending => break, Poll::Pending => break,
} }
}, },
@@ -288,6 +295,12 @@ impl<B: BlockT> Future for GossipEngine<B> {
} }
} }
impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
fn is_terminated(&self) -> bool {
self.is_terminated
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;