// This file is part of Substrate. // Copyright (C) Parity Technologies (UK) Ltd. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see . use super::*; use sp_core::hexdisplay::HexDisplay; use fnv::{FnvHashMap, FnvHashSet}; use prometheus_endpoint::{register, CounterVec, Opts, U64}; use sc_utils::{ id_sequence::SeqID as SubscriberId, pubsub::{Dispatch, Subscribe, Unsubscribe}, }; type SubscribersGauge = CounterVec; /// A command to subscribe with the specified filters. /// /// Used by the implementation of [`Subscribe`] trait for [`Registry]. pub(super) struct SubscribeOp<'a> { pub filter_keys: Option<&'a [StorageKey]>, pub filter_child_keys: Option<&'a [(StorageKey, Option>)]>, } #[derive(Debug, Default)] pub(super) struct Registry { pub(super) metrics: Option, pub(super) wildcard_listeners: FnvHashSet, pub(super) listeners: HashMap>, pub(super) child_listeners: HashMap< StorageKey, (HashMap>, FnvHashSet), >, pub(super) sinks: FnvHashMap, } #[derive(Debug)] pub(super) struct SubscriberSink { subs_id: SubscriberId, keys: Keys, child_keys: ChildKeys, was_triggered: bool, } impl Drop for SubscriberSink { fn drop(&mut self) { if !self.was_triggered { log::trace!( target: "storage_notifications", "Listener was never triggered: id={}, keys={:?}, child_keys={:?}", self.subs_id, PrintKeys(&self.keys), PrintChildKeys(&self.child_keys), ); } } } impl SubscriberSink { fn new(subs_id: SubscriberId, keys: Keys, child_keys: ChildKeys) -> Self { Self { subs_id, keys, child_keys, was_triggered: false } } } impl Registry { pub(super) fn new(prometheus_registry: Option) -> Self { let metrics = prometheus_registry.and_then(|r| { CounterVec::new( Opts::new( "substrate_storage_notification_subscribers", "Number of subscribers in storage notification sytem", ), &["action"], // added | removed ) .and_then(|g| register(g, &r)) .ok() }); Registry { metrics, ..Default::default() } } } impl Unsubscribe for Registry { fn unsubscribe(&mut self, subs_id: SubscriberId) { self.remove_subscriber(subs_id); } } impl<'a> Subscribe> for Registry { fn subscribe(&mut self, subs_op: SubscribeOp<'a>, subs_id: SubscriberId) { let SubscribeOp { filter_keys, filter_child_keys } = subs_op; let keys = Self::listen_from( subs_id, filter_keys.as_ref(), &mut self.listeners, &mut self.wildcard_listeners, ); let child_keys = filter_child_keys.map(|filter_child_keys| { filter_child_keys .iter() .map(|(c_key, o_keys)| { let (c_listeners, c_wildcards) = self.child_listeners.entry(c_key.clone()).or_default(); ( c_key.clone(), Self::listen_from( subs_id, o_keys.as_ref(), &mut *c_listeners, &mut *c_wildcards, ), ) }) .collect() }); if let Some(m) = self.metrics.as_ref() { m.with_label_values(&["added"]).inc(); } if self .sinks .insert(subs_id, SubscriberSink::new(subs_id, keys, child_keys)) .is_some() { log::warn!("The `subscribe`-method has been passed a non-unique subs_id (in `sc-client-api::notifications`)"); } } } impl<'a, Hash, CS, CCS, CCSI> Dispatch<(&'a Hash, CS, CCS)> for Registry where Hash: Clone, CS: Iterator, Option>)>, CCS: Iterator, CCSI)>, CCSI: Iterator, Option>)>, { type Item = StorageNotification; type Ret = (); fn dispatch(&mut self, message: (&'a Hash, CS, CCS), dispatch: F) -> Self::Ret where F: FnMut(&SubscriberId, Self::Item), { let (hash, changeset, child_changeset) = message; self.trigger(hash, changeset, child_changeset, dispatch); } } impl Registry { pub(super) fn trigger( &mut self, hash: &Hash, changeset: impl Iterator, Option>)>, child_changeset: impl Iterator< Item = (Vec, impl Iterator, Option>)>), >, mut dispatch: F, ) where Hash: Clone, F: FnMut(&SubscriberId, StorageNotification), { let has_wildcard = !self.wildcard_listeners.is_empty(); // early exit if no listeners if !has_wildcard && self.listeners.is_empty() && self.child_listeners.is_empty() { return } let mut subscribers = self.wildcard_listeners.clone(); let mut changes = Vec::new(); let mut child_changes = Vec::new(); // Collect subscribers and changes for (k, v) in changeset { let k = StorageKey(k); let listeners = self.listeners.get(&k); if let Some(listeners) = listeners { subscribers.extend(listeners.iter()); } if has_wildcard || listeners.is_some() { changes.push((k, v.map(StorageData))); } } for (sk, changeset) in child_changeset { let sk = StorageKey(sk); if let Some((cl, cw)) = self.child_listeners.get(&sk) { let mut changes = Vec::new(); for (k, v) in changeset { let k = StorageKey(k); let listeners = cl.get(&k); if let Some(listeners) = listeners { subscribers.extend(listeners.iter()); } subscribers.extend(cw.iter()); if !cw.is_empty() || listeners.is_some() { changes.push((k, v.map(StorageData))); } } if !changes.is_empty() { child_changes.push((sk, changes)); } } } // Don't send empty notifications if changes.is_empty() && child_changes.is_empty() { return } let changes = Arc::<[_]>::from(changes); let child_changes = Arc::<[_]>::from(child_changes); // Trigger the events self.sinks.iter_mut().for_each(|(subs_id, sink)| { if subscribers.contains(subs_id) { sink.was_triggered = true; let storage_change_set = StorageChangeSet { changes: changes.clone(), child_changes: child_changes.clone(), filter: sink.keys.clone(), child_filters: sink.child_keys.clone(), }; let notification = StorageNotification { block: hash.clone(), changes: storage_change_set }; dispatch(subs_id, notification); } }); } } impl Registry { fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> { let sink = self.sinks.remove(&subscriber)?; Self::remove_subscriber_from( subscriber, &sink.keys, &mut self.listeners, &mut self.wildcard_listeners, ); if let Some(child_filters) = &sink.child_keys { for (c_key, filters) in child_filters { if let Some((listeners, wildcards)) = self.child_listeners.get_mut(c_key) { Self::remove_subscriber_from( subscriber, filters, &mut *listeners, &mut *wildcards, ); if listeners.is_empty() && wildcards.is_empty() { self.child_listeners.remove(c_key); } } } } if let Some(m) = self.metrics.as_ref() { m.with_label_values(&["removed"]).inc(); } Some((sink.keys.clone(), sink.child_keys.clone())) } fn remove_subscriber_from( subscriber: SubscriberId, filters: &Keys, listeners: &mut HashMap>, wildcards: &mut FnvHashSet, ) { match filters { None => { wildcards.remove(&subscriber); }, Some(filters) => for key in filters.iter() { let remove_key = match listeners.get_mut(key) { Some(ref mut set) => { set.remove(&subscriber); set.is_empty() }, None => false, }; if remove_key { listeners.remove(key); } }, } } fn listen_from( current_id: SubscriberId, filter_keys: Option>, listeners: &mut HashMap>, wildcards: &mut FnvHashSet, ) -> Keys { match filter_keys { None => { wildcards.insert(current_id); None }, Some(keys) => Some( keys.as_ref() .iter() .map(|key| { listeners.entry(key.clone()).or_default().insert(current_id); key.clone() }) .collect(), ), } } } pub(super) struct PrintKeys<'a>(pub &'a Keys); impl<'a> std::fmt::Debug for PrintKeys<'a> { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { if let Some(keys) = self.0 { fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish() } else { write!(fmt, "None") } } } pub(super) struct PrintChildKeys<'a>(pub &'a ChildKeys); impl<'a> std::fmt::Debug for PrintChildKeys<'a> { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { if let Some(map) = self.0 { fmt.debug_map() .entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values)))) .finish() } else { write!(fmt, "None") } } }