mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 05:47:58 +00:00
Automatically unsubscribe storage listeners when they're dropped (RCP node memory leak fix) (#10454)
* Automatically unsubscribe storage listeners when they're dropped * Fix tests' compilation in `sc-client-api` * Add an extra test * Align to review comments; cleanups * Update client/api/src/notifications.rs * FMT Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Bastian Köcher <info@kchr.de>
This commit is contained in:
@@ -20,13 +20,20 @@
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
pin::Pin,
|
||||
sync::{Arc, Weak},
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
use fnv::{FnvHashMap, FnvHashSet};
|
||||
use futures::Stream;
|
||||
use parking_lot::Mutex;
|
||||
use prometheus_endpoint::{register, CounterVec, Opts, Registry, U64};
|
||||
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
|
||||
use sp_core::storage::{StorageData, StorageKey};
|
||||
use sp_core::{
|
||||
hexdisplay::HexDisplay,
|
||||
storage::{StorageData, StorageKey},
|
||||
};
|
||||
use sp_runtime::traits::Block as BlockT;
|
||||
|
||||
/// Storage change set
|
||||
@@ -34,8 +41,8 @@ use sp_runtime::traits::Block as BlockT;
|
||||
pub struct StorageChangeSet {
|
||||
changes: Arc<Vec<(StorageKey, Option<StorageData>)>>,
|
||||
child_changes: Arc<Vec<(StorageKey, Vec<(StorageKey, Option<StorageData>)>)>>,
|
||||
filter: Option<HashSet<StorageKey>>,
|
||||
child_filters: Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
|
||||
filter: Keys,
|
||||
child_filters: ChildKeys,
|
||||
}
|
||||
|
||||
impl StorageChangeSet {
|
||||
@@ -74,7 +81,46 @@ impl StorageChangeSet {
|
||||
}
|
||||
|
||||
/// Type that implements `futures::Stream` of storage change events.
|
||||
pub type StorageEventStream<H> = TracingUnboundedReceiver<(H, StorageChangeSet)>;
|
||||
pub struct StorageEventStream<H> {
|
||||
rx: TracingUnboundedReceiver<(H, StorageChangeSet)>,
|
||||
storage_notifications: Weak<Mutex<StorageNotificationsImpl<H>>>,
|
||||
was_triggered: bool,
|
||||
id: u64,
|
||||
}
|
||||
|
||||
impl<H> Stream for StorageEventStream<H> {
|
||||
type Item = <TracingUnboundedReceiver<(H, StorageChangeSet)> as Stream>::Item;
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
let result = Stream::poll_next(Pin::new(&mut self.rx), cx);
|
||||
if result.is_ready() {
|
||||
self.was_triggered = true;
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl<H> Drop for StorageEventStream<H> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(storage_notifications) = self.storage_notifications.upgrade() {
|
||||
if let Some((keys, child_keys)) =
|
||||
storage_notifications.lock().remove_subscriber(self.id)
|
||||
{
|
||||
if !self.was_triggered {
|
||||
log::trace!(
|
||||
target: "storage_notifications",
|
||||
"Listener was never triggered: id={}, keys={:?}, child_keys={:?}",
|
||||
self.id,
|
||||
PrintKeys(&keys),
|
||||
PrintChildKeys(&child_keys),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type SubscriberId = u64;
|
||||
|
||||
@@ -82,7 +128,13 @@ type SubscribersGauge = CounterVec<U64>;
|
||||
|
||||
/// Manages storage listeners.
|
||||
#[derive(Debug)]
|
||||
pub struct StorageNotifications<Block: BlockT> {
|
||||
pub struct StorageNotifications<Block: BlockT>(Arc<Mutex<StorageNotificationsImpl<Block::Hash>>>);
|
||||
|
||||
type Keys = Option<HashSet<StorageKey>>;
|
||||
type ChildKeys = Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct StorageNotificationsImpl<Hash> {
|
||||
metrics: Option<SubscribersGauge>,
|
||||
next_id: SubscriberId,
|
||||
wildcard_listeners: FnvHashSet<SubscriberId>,
|
||||
@@ -93,15 +145,17 @@ pub struct StorageNotifications<Block: BlockT> {
|
||||
>,
|
||||
sinks: FnvHashMap<
|
||||
SubscriberId,
|
||||
(
|
||||
TracingUnboundedSender<(Block::Hash, StorageChangeSet)>,
|
||||
Option<HashSet<StorageKey>>,
|
||||
Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
|
||||
),
|
||||
(TracingUnboundedSender<(Hash, StorageChangeSet)>, Keys, ChildKeys),
|
||||
>,
|
||||
}
|
||||
|
||||
impl<Block: BlockT> Default for StorageNotifications<Block> {
|
||||
fn default() -> Self {
|
||||
Self(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash> Default for StorageNotificationsImpl<Hash> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
metrics: Default::default(),
|
||||
@@ -114,31 +168,39 @@ impl<Block: BlockT> Default for StorageNotifications<Block> {
|
||||
}
|
||||
}
|
||||
|
||||
struct PrintKeys<'a>(&'a Keys);
|
||||
impl<'a> std::fmt::Debug for PrintKeys<'a> {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
if let Some(keys) = self.0 {
|
||||
fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish()
|
||||
} else {
|
||||
write!(fmt, "None")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PrintChildKeys<'a>(&'a ChildKeys);
|
||||
impl<'a> std::fmt::Debug for PrintChildKeys<'a> {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
if let Some(map) = self.0 {
|
||||
fmt.debug_map()
|
||||
.entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values))))
|
||||
.finish()
|
||||
} else {
|
||||
write!(fmt, "None")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block: BlockT> StorageNotifications<Block> {
|
||||
/// Initialize a new StorageNotifications
|
||||
/// optionally pass a prometheus registry to send subscriber metrics to
|
||||
pub fn new(prometheus_registry: Option<Registry>) -> Self {
|
||||
let metrics = prometheus_registry.and_then(|r| {
|
||||
CounterVec::new(
|
||||
Opts::new(
|
||||
"substrate_storage_notification_subscribers",
|
||||
"Number of subscribers in storage notification sytem",
|
||||
),
|
||||
&["action"], // added | removed
|
||||
)
|
||||
.and_then(|g| register(g, &r))
|
||||
.ok()
|
||||
});
|
||||
|
||||
StorageNotifications {
|
||||
metrics,
|
||||
next_id: Default::default(),
|
||||
wildcard_listeners: Default::default(),
|
||||
listeners: Default::default(),
|
||||
child_listeners: Default::default(),
|
||||
sinks: Default::default(),
|
||||
}
|
||||
StorageNotifications(Arc::new(Mutex::new(StorageNotificationsImpl::new(
|
||||
prometheus_registry,
|
||||
))))
|
||||
}
|
||||
|
||||
/// Trigger notification to all listeners.
|
||||
///
|
||||
/// Note the changes are going to be filtered by listener's filter key.
|
||||
@@ -151,6 +213,54 @@ impl<Block: BlockT> StorageNotifications<Block> {
|
||||
Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
|
||||
>,
|
||||
) {
|
||||
self.0.lock().trigger(hash, changeset, child_changeset);
|
||||
}
|
||||
|
||||
/// Start listening for particular storage keys.
|
||||
pub fn listen(
|
||||
&mut self,
|
||||
filter_keys: Option<&[StorageKey]>,
|
||||
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
|
||||
) -> StorageEventStream<Block::Hash> {
|
||||
let (id, rx) = self.0.lock().listen(filter_keys, filter_child_keys);
|
||||
let storage_notifications = Arc::downgrade(&self.0);
|
||||
StorageEventStream { rx, storage_notifications, was_triggered: false, id }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash> StorageNotificationsImpl<Hash> {
|
||||
fn new(prometheus_registry: Option<Registry>) -> Self {
|
||||
let metrics = prometheus_registry.and_then(|r| {
|
||||
CounterVec::new(
|
||||
Opts::new(
|
||||
"substrate_storage_notification_subscribers",
|
||||
"Number of subscribers in storage notification sytem",
|
||||
),
|
||||
&["action"], // added | removed
|
||||
)
|
||||
.and_then(|g| register(g, &r))
|
||||
.ok()
|
||||
});
|
||||
|
||||
StorageNotificationsImpl {
|
||||
metrics,
|
||||
next_id: Default::default(),
|
||||
wildcard_listeners: Default::default(),
|
||||
listeners: Default::default(),
|
||||
child_listeners: Default::default(),
|
||||
sinks: Default::default(),
|
||||
}
|
||||
}
|
||||
fn trigger(
|
||||
&mut self,
|
||||
hash: &Hash,
|
||||
changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
|
||||
child_changeset: impl Iterator<
|
||||
Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
|
||||
>,
|
||||
) where
|
||||
Hash: Clone,
|
||||
{
|
||||
let has_wildcard = !self.wildcard_listeners.is_empty();
|
||||
|
||||
// early exit if no listeners
|
||||
@@ -244,7 +354,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
|
||||
|
||||
fn remove_subscriber_from(
|
||||
subscriber: &SubscriberId,
|
||||
filters: &Option<HashSet<StorageKey>>,
|
||||
filters: &Keys,
|
||||
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
|
||||
wildcards: &mut FnvHashSet<SubscriberId>,
|
||||
) {
|
||||
@@ -269,34 +379,35 @@ impl<Block: BlockT> StorageNotifications<Block> {
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_subscriber(&mut self, subscriber: SubscriberId) {
|
||||
if let Some((_, filters, child_filters)) = self.sinks.remove(&subscriber) {
|
||||
Self::remove_subscriber_from(
|
||||
&subscriber,
|
||||
&filters,
|
||||
&mut self.listeners,
|
||||
&mut self.wildcard_listeners,
|
||||
);
|
||||
if let Some(child_filters) = child_filters.as_ref() {
|
||||
for (c_key, filters) in child_filters {
|
||||
if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) {
|
||||
Self::remove_subscriber_from(
|
||||
&subscriber,
|
||||
&filters,
|
||||
&mut *listeners,
|
||||
&mut *wildcards,
|
||||
);
|
||||
fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> {
|
||||
let (_, filters, child_filters) = self.sinks.remove(&subscriber)?;
|
||||
Self::remove_subscriber_from(
|
||||
&subscriber,
|
||||
&filters,
|
||||
&mut self.listeners,
|
||||
&mut self.wildcard_listeners,
|
||||
);
|
||||
if let Some(child_filters) = child_filters.as_ref() {
|
||||
for (c_key, filters) in child_filters {
|
||||
if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) {
|
||||
Self::remove_subscriber_from(
|
||||
&subscriber,
|
||||
&filters,
|
||||
&mut *listeners,
|
||||
&mut *wildcards,
|
||||
);
|
||||
|
||||
if listeners.is_empty() && wildcards.is_empty() {
|
||||
self.child_listeners.remove(&c_key);
|
||||
}
|
||||
if listeners.is_empty() && wildcards.is_empty() {
|
||||
self.child_listeners.remove(&c_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(m) = self.metrics.as_ref() {
|
||||
m.with_label_values(&[&"removed"]).inc();
|
||||
}
|
||||
}
|
||||
if let Some(m) = self.metrics.as_ref() {
|
||||
m.with_label_values(&[&"removed"]).inc();
|
||||
}
|
||||
|
||||
Some((filters, child_filters))
|
||||
}
|
||||
|
||||
fn listen_from(
|
||||
@@ -304,7 +415,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
|
||||
filter_keys: &Option<impl AsRef<[StorageKey]>>,
|
||||
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
|
||||
wildcards: &mut FnvHashSet<SubscriberId>,
|
||||
) -> Option<HashSet<StorageKey>> {
|
||||
) -> Keys {
|
||||
match filter_keys {
|
||||
None => {
|
||||
wildcards.insert(current_id);
|
||||
@@ -325,12 +436,11 @@ impl<Block: BlockT> StorageNotifications<Block> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Start listening for particular storage keys.
|
||||
pub fn listen(
|
||||
fn listen(
|
||||
&mut self,
|
||||
filter_keys: Option<&[StorageKey]>,
|
||||
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
|
||||
) -> StorageEventStream<Block::Hash> {
|
||||
) -> (u64, TracingUnboundedReceiver<(Hash, StorageChangeSet)>) {
|
||||
self.next_id += 1;
|
||||
let current_id = self.next_id;
|
||||
|
||||
@@ -364,7 +474,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
|
||||
m.with_label_values(&[&"added"]).inc();
|
||||
}
|
||||
|
||||
rx
|
||||
(current_id, rx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -517,9 +627,9 @@ mod tests {
|
||||
let _recv3 = futures::executor::block_on_stream(notifications.listen(None, None));
|
||||
let _recv4 =
|
||||
futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter)));
|
||||
assert_eq!(notifications.listeners.len(), 2);
|
||||
assert_eq!(notifications.wildcard_listeners.len(), 2);
|
||||
assert_eq!(notifications.child_listeners.len(), 1);
|
||||
assert_eq!(notifications.0.lock().listeners.len(), 2);
|
||||
assert_eq!(notifications.0.lock().wildcard_listeners.len(), 2);
|
||||
assert_eq!(notifications.0.lock().child_listeners.len(), 1);
|
||||
}
|
||||
|
||||
// when
|
||||
@@ -528,9 +638,18 @@ mod tests {
|
||||
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset);
|
||||
|
||||
// then
|
||||
assert_eq!(notifications.listeners.len(), 0);
|
||||
assert_eq!(notifications.wildcard_listeners.len(), 0);
|
||||
assert_eq!(notifications.child_listeners.len(), 0);
|
||||
assert_eq!(notifications.0.lock().listeners.len(), 0);
|
||||
assert_eq!(notifications.0.lock().wildcard_listeners.len(), 0);
|
||||
assert_eq!(notifications.0.lock().child_listeners.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_cleanup_subscriber_if_stream_is_dropped() {
|
||||
let mut notifications = StorageNotifications::<Block>::default();
|
||||
let stream = notifications.listen(None, None);
|
||||
assert_eq!(notifications.0.lock().sinks.len(), 1);
|
||||
std::mem::drop(stream);
|
||||
assert_eq!(notifications.0.lock().sinks.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user