Fix leak in stream notifications (#5739)

This commit is contained in:
Benjamin Kampmann
2020-04-23 12:01:09 +02:00
committed by GitHub
parent ff9c88d21c
commit 624e95b1af
4 changed files with 67 additions and 17 deletions
+63 -15
View File
@@ -25,6 +25,7 @@ use fnv::{FnvHashSet, FnvHashMap};
use sp_core::storage::{StorageKey, StorageData};
use sp_runtime::traits::Block as BlockT;
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};
use prometheus_endpoint::{Registry, CounterVec, Opts, U64, register};
/// Storage change set
#[derive(Debug)]
@@ -71,9 +72,12 @@ pub type StorageEventStream<H> = TracingUnboundedReceiver<(H, StorageChangeSet)>
type SubscriberId = u64;
type SubscribersGauge = CounterVec<U64>;
/// Manages storage listeners.
#[derive(Debug)]
pub struct StorageNotifications<Block: BlockT> {
metrics: Option<SubscribersGauge>,
next_id: SubscriberId,
wildcard_listeners: FnvHashSet<SubscriberId>,
listeners: HashMap<StorageKey, FnvHashSet<SubscriberId>>,
@@ -90,7 +94,8 @@ pub struct StorageNotifications<Block: BlockT> {
impl<Block: BlockT> Default for StorageNotifications<Block> {
fn default() -> Self {
StorageNotifications {
Self {
metrics: Default::default(),
next_id: Default::default(),
wildcard_listeners: Default::default(),
listeners: Default::default(),
@@ -101,6 +106,29 @@ impl<Block: BlockT> Default for StorageNotifications<Block> {
}
impl<Block: BlockT> StorageNotifications<Block> {
/// Initialize a new StorageNotifications
/// optionally pass a prometheus registry to send subscriber metrics to
pub fn new(prometheus_registry: Option<Registry>) -> Self {
let metrics = prometheus_registry.and_then(|r|
CounterVec::new(
Opts::new(
"storage_notification_subscribers",
"Number of subscribers in storage notification sytem"
),
&["action"], //added | removed
).and_then(|g| register(g, &r))
.ok()
);
StorageNotifications {
metrics,
next_id: Default::default(),
wildcard_listeners: Default::default(),
listeners: Default::default(),
child_listeners: Default::default(),
sinks: Default::default(),
}
}
/// Trigger notification to all listeners.
///
/// Note the changes are going to be filtered by listener's filter key.
@@ -113,6 +141,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
Item=(Vec<u8>, impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>)
>,
) {
let has_wildcard = !self.wildcard_listeners.is_empty();
// early exit if no listeners
@@ -169,21 +198,32 @@ impl<Block: BlockT> StorageNotifications<Block> {
let changes = Arc::new(changes);
let child_changes = Arc::new(child_changes);
// Trigger the events
for subscriber in subscribers {
let should_remove = {
let &(ref sink, ref filter, ref child_filters) = self.sinks.get(&subscriber)
.expect("subscribers returned from self.listeners are always in self.sinks; qed");
sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
child_changes: child_changes.clone(),
filter: filter.clone(),
child_filters: child_filters.clone(),
})).is_err()
};
if should_remove {
self.remove_subscriber(subscriber);
}
let to_remove = self.sinks
.iter()
.filter_map(|(subscriber, &(ref sink, ref filter, ref child_filters))| {
let should_remove = {
if subscribers.contains(subscriber) {
sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
child_changes: child_changes.clone(),
filter: filter.clone(),
child_filters: child_filters.clone(),
})).is_err()
} else {
sink.is_closed()
}
};
if should_remove {
Some(subscriber.clone())
} else {
None
}
}).collect::<Vec<_>>();
for sub_id in to_remove {
self.remove_subscriber(sub_id);
}
}
@@ -241,6 +281,9 @@ impl<Block: BlockT> StorageNotifications<Block> {
}
}
}
if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"removed"]).inc();
}
}
}
@@ -301,6 +344,11 @@ impl<Block: BlockT> StorageNotifications<Block> {
// insert sink
let (tx, rx) = tracing_unbounded("mpsc_storage_notification_items");
self.sinks.insert(current_id, (tx, keys, child_keys));
if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"added"]).inc();
}
rx
}
}