diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 1e44edc899..da474bdcfb 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -9067,6 +9067,7 @@ dependencies = [ "futures 0.3.16", "futures-timer", "lazy_static", + "log 0.4.14", "parking_lot 0.12.0", "prometheus", "tokio-test", diff --git a/substrate/client/api/src/client.rs b/substrate/client/api/src/client.rs index 949cc42c1c..11195e1def 100644 --- a/substrate/client/api/src/client.rs +++ b/substrate/client/api/src/client.rs @@ -307,9 +307,9 @@ pub struct FinalityNotification { /// Finalized block header. pub header: Block::Header, /// Path from the old finalized to new finalized parent (implicitly finalized blocks). - pub tree_route: Arc>, + pub tree_route: Arc<[Block::Hash]>, /// Stale branches heads. - pub stale_heads: Arc>, + pub stale_heads: Arc<[Block::Hash]>, } impl TryFrom> for ChainEvent { @@ -336,8 +336,8 @@ impl From> for FinalityNotification { FinalityNotification { hash, header: summary.header, - tree_route: Arc::new(summary.finalized), - stale_heads: Arc::new(summary.stale_heads), + tree_route: Arc::from(summary.finalized), + stale_heads: Arc::from(summary.stale_heads), } } } diff --git a/substrate/client/api/src/notifications.rs b/substrate/client/api/src/notifications.rs index 8199bc6b97..36798abc5b 100644 --- a/substrate/client/api/src/notifications.rs +++ b/substrate/client/api/src/notifications.rs @@ -21,30 +21,54 @@ use std::{ collections::{HashMap, HashSet}, pin::Pin, - sync::{Arc, Weak}, + sync::Arc, task::Poll, }; -use fnv::{FnvHashMap, FnvHashSet}; use futures::Stream; -use parking_lot::Mutex; -use prometheus_endpoint::{register, CounterVec, Opts, Registry, U64}; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use sp_core::{ - hexdisplay::HexDisplay, - storage::{StorageData, StorageKey}, -}; + +use prometheus_endpoint::Registry as PrometheusRegistry; + +use sc_utils::pubsub::{Hub, Receiver}; +use sp_core::storage::{StorageData, StorageKey}; use sp_runtime::traits::Block as BlockT; +mod registry; + +use registry::Registry; + +#[cfg(test)] +mod tests; + +/// A type of a message delivered to the subscribers +#[derive(Debug)] +pub struct StorageNotification { + /// The hash of the block + pub block: Hash, + + /// The set of changes + pub changes: StorageChangeSet, +} + /// Storage change set #[derive(Debug)] pub struct StorageChangeSet { - changes: Arc)>>, - child_changes: Arc)>)>>, + changes: Arc<[(StorageKey, Option)]>, + child_changes: Arc<[(StorageKey, Vec<(StorageKey, Option)>)]>, filter: Keys, child_filters: ChildKeys, } +/// Manages storage listeners. +#[derive(Debug)] +pub struct StorageNotifications(Hub, Registry>); + +/// Type that implements `futures::Stream` of storage change events. +pub struct StorageEventStream(Receiver, Registry>); + +type Keys = Option>; +type ChildKeys = Option>>>; + impl StorageChangeSet { /// Convert the change set into iterator over storage items. pub fn iter<'a>( @@ -80,125 +104,24 @@ impl StorageChangeSet { } } -/// Type that implements `futures::Stream` of storage change events. -pub struct StorageEventStream { - rx: TracingUnboundedReceiver<(H, StorageChangeSet)>, - storage_notifications: Weak>>, - was_triggered: bool, - id: u64, -} - impl Stream for StorageEventStream { - type Item = as Stream>::Item; + type Item = StorageNotification; fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let result = Stream::poll_next(Pin::new(&mut self.rx), cx); - if result.is_ready() { - self.was_triggered = true; - } - result - } -} - -impl Drop for StorageEventStream { - fn drop(&mut self) { - if let Some(storage_notifications) = self.storage_notifications.upgrade() { - if let Some((keys, child_keys)) = - storage_notifications.lock().remove_subscriber(self.id) - { - if !self.was_triggered { - log::trace!( - target: "storage_notifications", - "Listener was never triggered: id={}, keys={:?}, child_keys={:?}", - self.id, - PrintKeys(&keys), - PrintChildKeys(&child_keys), - ); - } - } - } - } -} - -type SubscriberId = u64; - -type SubscribersGauge = CounterVec; - -/// Manages storage listeners. -#[derive(Debug)] -pub struct StorageNotifications(Arc>>); - -type Keys = Option>; -type ChildKeys = Option>>>; - -#[derive(Debug)] -struct StorageNotificationsImpl { - metrics: Option, - next_id: SubscriberId, - wildcard_listeners: FnvHashSet, - listeners: HashMap>, - child_listeners: HashMap< - StorageKey, - (HashMap>, FnvHashSet), - >, - sinks: FnvHashMap< - SubscriberId, - (TracingUnboundedSender<(Hash, StorageChangeSet)>, Keys, ChildKeys), - >, -} - -impl Default for StorageNotifications { - fn default() -> Self { - Self(Default::default()) - } -} - -impl Default for StorageNotificationsImpl { - fn default() -> Self { - Self { - metrics: Default::default(), - next_id: Default::default(), - wildcard_listeners: Default::default(), - listeners: Default::default(), - child_listeners: Default::default(), - sinks: Default::default(), - } - } -} - -struct PrintKeys<'a>(&'a Keys); -impl<'a> std::fmt::Debug for PrintKeys<'a> { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - if let Some(keys) = self.0 { - fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish() - } else { - write!(fmt, "None") - } - } -} - -struct PrintChildKeys<'a>(&'a ChildKeys); -impl<'a> std::fmt::Debug for PrintChildKeys<'a> { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - if let Some(map) = self.0 { - fmt.debug_map() - .entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values)))) - .finish() - } else { - write!(fmt, "None") - } + Stream::poll_next(Pin::new(&mut self.get_mut().0), cx) } } impl StorageNotifications { /// Initialize a new StorageNotifications /// optionally pass a prometheus registry to send subscriber metrics to - pub fn new(prometheus_registry: Option) -> Self { - StorageNotifications(Arc::new(Mutex::new(StorageNotificationsImpl::new( - prometheus_registry, - )))) + pub fn new(prometheus_registry: Option) -> Self { + let registry = Registry::new(prometheus_registry); + let hub = Hub::new_with_registry("mpsc_storage_notification_items", registry); + + StorageNotifications(hub) } /// Trigger notification to all listeners. @@ -206,467 +129,24 @@ impl StorageNotifications { /// Note the changes are going to be filtered by listener's filter key. /// In fact no event might be sent if clients are not interested in the changes. pub fn trigger( - &mut self, + &self, hash: &Block::Hash, changeset: impl Iterator, Option>)>, child_changeset: impl Iterator< Item = (Vec, impl Iterator, Option>)>), >, ) { - self.0.lock().trigger(hash, changeset, child_changeset); + self.0.send((hash, changeset, child_changeset)) } /// Start listening for particular storage keys. pub fn listen( - &mut self, + &self, filter_keys: Option<&[StorageKey]>, filter_child_keys: Option<&[(StorageKey, Option>)]>, ) -> StorageEventStream { - let (id, rx) = self.0.lock().listen(filter_keys, filter_child_keys); - let storage_notifications = Arc::downgrade(&self.0); - StorageEventStream { rx, storage_notifications, was_triggered: false, id } - } -} - -impl StorageNotificationsImpl { - fn new(prometheus_registry: Option) -> Self { - let metrics = prometheus_registry.and_then(|r| { - CounterVec::new( - Opts::new( - "substrate_storage_notification_subscribers", - "Number of subscribers in storage notification sytem", - ), - &["action"], // added | removed - ) - .and_then(|g| register(g, &r)) - .ok() - }); - - StorageNotificationsImpl { - metrics, - next_id: Default::default(), - wildcard_listeners: Default::default(), - listeners: Default::default(), - child_listeners: Default::default(), - sinks: Default::default(), - } - } - fn trigger( - &mut self, - hash: &Hash, - changeset: impl Iterator, Option>)>, - child_changeset: impl Iterator< - Item = (Vec, impl Iterator, Option>)>), - >, - ) where - Hash: Clone, - { - let has_wildcard = !self.wildcard_listeners.is_empty(); - - // early exit if no listeners - if !has_wildcard && self.listeners.is_empty() && self.child_listeners.is_empty() { - return - } - - let mut subscribers = self.wildcard_listeners.clone(); - let mut changes = Vec::new(); - let mut child_changes = Vec::new(); - - // Collect subscribers and changes - for (k, v) in changeset { - let k = StorageKey(k); - let listeners = self.listeners.get(&k); - - if let Some(ref listeners) = listeners { - subscribers.extend(listeners.iter()); - } - - if has_wildcard || listeners.is_some() { - changes.push((k, v.map(StorageData))); - } - } - for (sk, changeset) in child_changeset { - let sk = StorageKey(sk); - if let Some((cl, cw)) = self.child_listeners.get(&sk) { - let mut changes = Vec::new(); - for (k, v) in changeset { - let k = StorageKey(k); - let listeners = cl.get(&k); - - if let Some(ref listeners) = listeners { - subscribers.extend(listeners.iter()); - } - - subscribers.extend(cw.iter()); - - if !cw.is_empty() || listeners.is_some() { - changes.push((k, v.map(StorageData))); - } - } - if !changes.is_empty() { - child_changes.push((sk, changes)); - } - } - } - - // Don't send empty notifications - if changes.is_empty() && child_changes.is_empty() { - return - } - - let changes = Arc::new(changes); - let child_changes = Arc::new(child_changes); - // Trigger the events - - let to_remove = self - .sinks - .iter() - .filter_map(|(subscriber, &(ref sink, ref filter, ref child_filters))| { - let should_remove = { - if subscribers.contains(subscriber) { - sink.unbounded_send(( - hash.clone(), - StorageChangeSet { - changes: changes.clone(), - child_changes: child_changes.clone(), - filter: filter.clone(), - child_filters: child_filters.clone(), - }, - )) - .is_err() - } else { - sink.is_closed() - } - }; - - if should_remove { - Some(subscriber.clone()) - } else { - None - } - }) - .collect::>(); - - for sub_id in to_remove { - self.remove_subscriber(sub_id); - } - } - - fn remove_subscriber_from( - subscriber: &SubscriberId, - filters: &Keys, - listeners: &mut HashMap>, - wildcards: &mut FnvHashSet, - ) { - match filters { - None => { - wildcards.remove(subscriber); - }, - Some(filters) => - for key in filters.iter() { - let remove_key = match listeners.get_mut(key) { - Some(ref mut set) => { - set.remove(subscriber); - set.is_empty() - }, - None => false, - }; - - if remove_key { - listeners.remove(key); - } - }, - } - } - - fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> { - let (_, filters, child_filters) = self.sinks.remove(&subscriber)?; - Self::remove_subscriber_from( - &subscriber, - &filters, - &mut self.listeners, - &mut self.wildcard_listeners, - ); - if let Some(child_filters) = child_filters.as_ref() { - for (c_key, filters) in child_filters { - if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) { - Self::remove_subscriber_from( - &subscriber, - &filters, - &mut *listeners, - &mut *wildcards, - ); - - if listeners.is_empty() && wildcards.is_empty() { - self.child_listeners.remove(&c_key); - } - } - } - } - if let Some(m) = self.metrics.as_ref() { - m.with_label_values(&[&"removed"]).inc(); - } - - Some((filters, child_filters)) - } - - fn listen_from( - current_id: SubscriberId, - filter_keys: &Option>, - listeners: &mut HashMap>, - wildcards: &mut FnvHashSet, - ) -> Keys { - match filter_keys { - None => { - wildcards.insert(current_id); - None - }, - Some(keys) => Some( - keys.as_ref() - .iter() - .map(|key| { - listeners - .entry(key.clone()) - .or_insert_with(Default::default) - .insert(current_id); - key.clone() - }) - .collect(), - ), - } - } - - fn listen( - &mut self, - filter_keys: Option<&[StorageKey]>, - filter_child_keys: Option<&[(StorageKey, Option>)]>, - ) -> (u64, TracingUnboundedReceiver<(Hash, StorageChangeSet)>) { - self.next_id += 1; - let current_id = self.next_id; - - // add subscriber for every key - let keys = Self::listen_from( - current_id, - &filter_keys, - &mut self.listeners, - &mut self.wildcard_listeners, - ); - let child_keys = filter_child_keys.map(|filter_child_keys| { - filter_child_keys - .iter() - .map(|(c_key, o_keys)| { - let (c_listeners, c_wildcards) = - self.child_listeners.entry(c_key.clone()).or_insert_with(Default::default); - - ( - c_key.clone(), - Self::listen_from(current_id, o_keys, &mut *c_listeners, &mut *c_wildcards), - ) - }) - .collect() - }); - - // insert sink - let (tx, rx) = tracing_unbounded("mpsc_storage_notification_items"); - self.sinks.insert(current_id, (tx, keys, child_keys)); - - if let Some(m) = self.metrics.as_ref() { - m.with_label_values(&[&"added"]).inc(); - } - - (current_id, rx) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper, H256 as Hash}; - use std::iter::{empty, Empty}; - - type TestChangeSet = ( - Vec<(StorageKey, Option)>, - Vec<(StorageKey, Vec<(StorageKey, Option)>)>, - ); - - #[cfg(test)] - impl From for StorageChangeSet { - fn from(changes: TestChangeSet) -> Self { - // warning hardcoded child trie wildcard to test upon - let child_filters = Some( - [(StorageKey(vec![4]), None), (StorageKey(vec![5]), None)] - .iter() - .cloned() - .collect(), - ); - StorageChangeSet { - changes: Arc::new(changes.0), - child_changes: Arc::new(changes.1), - filter: None, - child_filters, - } - } - } - - #[cfg(test)] - impl PartialEq for StorageChangeSet { - fn eq(&self, other: &Self) -> bool { - self.iter().eq(other.iter()) - } - } - - type Block = RawBlock>; - - #[test] - fn triggering_change_should_notify_wildcard_listeners() { - // given - let mut notifications = StorageNotifications::::default(); - let child_filter = [(StorageKey(vec![4]), None)]; - let mut recv = - futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter[..]))); - - // when - let changeset = vec![(vec![2], Some(vec![3])), (vec![3], None)]; - let c_changeset_1 = vec![(vec![5], Some(vec![4])), (vec![6], None)]; - let c_changeset = vec![(vec![4], c_changeset_1)]; - notifications.trigger( - &Hash::from_low_u64_be(1), - changeset.into_iter(), - c_changeset.into_iter().map(|(a, b)| (a, b.into_iter())), - ); - - // then - assert_eq!( - recv.next().unwrap(), - ( - Hash::from_low_u64_be(1), - ( - vec![ - (StorageKey(vec![2]), Some(StorageData(vec![3]))), - (StorageKey(vec![3]), None), - ], - vec![( - StorageKey(vec![4]), - vec![ - (StorageKey(vec![5]), Some(StorageData(vec![4]))), - (StorageKey(vec![6]), None), - ] - )] - ) - .into() - ) - ); - } - - #[test] - fn should_only_notify_interested_listeners() { - // given - let mut notifications = StorageNotifications::::default(); - let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; - let mut recv1 = futures::executor::block_on_stream( - notifications.listen(Some(&[StorageKey(vec![1])]), None), - ); - let mut recv2 = futures::executor::block_on_stream( - notifications.listen(Some(&[StorageKey(vec![2])]), None), - ); - let mut recv3 = futures::executor::block_on_stream( - notifications.listen(Some(&[]), Some(&child_filter)), - ); - - // when - let changeset = vec![(vec![2], Some(vec![3])), (vec![1], None)]; - let c_changeset_1 = vec![(vec![5], Some(vec![4])), (vec![6], None)]; - - let c_changeset = vec![(vec![4], c_changeset_1)]; - notifications.trigger( - &Hash::from_low_u64_be(1), - changeset.into_iter(), - c_changeset.into_iter().map(|(a, b)| (a, b.into_iter())), - ); - - // then - assert_eq!( - recv1.next().unwrap(), - (Hash::from_low_u64_be(1), (vec![(StorageKey(vec![1]), None),], vec![]).into()) - ); - assert_eq!( - recv2.next().unwrap(), - ( - Hash::from_low_u64_be(1), - (vec![(StorageKey(vec![2]), Some(StorageData(vec![3]))),], vec![]).into() - ) - ); - assert_eq!( - recv3.next().unwrap(), - ( - Hash::from_low_u64_be(1), - ( - vec![], - vec![( - StorageKey(vec![4]), - vec![(StorageKey(vec![5]), Some(StorageData(vec![4])))] - ),] - ) - .into() - ) - ); - } - - #[test] - fn should_cleanup_subscribers_if_dropped() { - // given - let mut notifications = StorageNotifications::::default(); - { - let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; - let _recv1 = futures::executor::block_on_stream( - notifications.listen(Some(&[StorageKey(vec![1])]), None), - ); - let _recv2 = futures::executor::block_on_stream( - notifications.listen(Some(&[StorageKey(vec![2])]), None), - ); - let _recv3 = futures::executor::block_on_stream(notifications.listen(None, None)); - let _recv4 = - futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter))); - assert_eq!(notifications.0.lock().listeners.len(), 2); - assert_eq!(notifications.0.lock().wildcard_listeners.len(), 2); - assert_eq!(notifications.0.lock().child_listeners.len(), 1); - } - - // when - let changeset = vec![(vec![2], Some(vec![3])), (vec![1], None)]; - let c_changeset = empty::<(_, Empty<_>)>(); - notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset); - - // then - assert_eq!(notifications.0.lock().listeners.len(), 0); - assert_eq!(notifications.0.lock().wildcard_listeners.len(), 0); - assert_eq!(notifications.0.lock().child_listeners.len(), 0); - } - - #[test] - fn should_cleanup_subscriber_if_stream_is_dropped() { - let mut notifications = StorageNotifications::::default(); - let stream = notifications.listen(None, None); - assert_eq!(notifications.0.lock().sinks.len(), 1); - std::mem::drop(stream); - assert_eq!(notifications.0.lock().sinks.len(), 0); - } - - #[test] - fn should_not_send_empty_notifications() { - // given - let mut recv = { - let mut notifications = StorageNotifications::::default(); - let recv = futures::executor::block_on_stream(notifications.listen(None, None)); - - // when - let changeset = vec![]; - let c_changeset = empty::<(_, Empty<_>)>(); - notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset); - recv - }; - - // then - assert_eq!(recv.next(), None); + let receiver = self.0.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }); + + StorageEventStream(receiver) } } diff --git a/substrate/client/api/src/notifications/registry.rs b/substrate/client/api/src/notifications/registry.rs new file mode 100644 index 0000000000..b34d5a6b67 --- /dev/null +++ b/substrate/client/api/src/notifications/registry.rs @@ -0,0 +1,365 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::*; + +use sp_core::hexdisplay::HexDisplay; + +use fnv::{FnvHashMap, FnvHashSet}; +use prometheus_endpoint::{register, CounterVec, Opts, U64}; + +use sc_utils::{ + id_sequence::SeqID as SubscriberId, + pubsub::{Dispatch, Subscribe, Unsubscribe}, +}; + +type SubscribersGauge = CounterVec; + +/// A command to subscribe with the specified filters. +/// +/// Used by the implementation of [`Subscribe`] trait for [`Registry]. +pub(super) struct SubscribeOp<'a> { + pub filter_keys: Option<&'a [StorageKey]>, + pub filter_child_keys: Option<&'a [(StorageKey, Option>)]>, +} + +#[derive(Debug, Default)] +pub(super) struct Registry { + pub(super) metrics: Option, + pub(super) wildcard_listeners: FnvHashSet, + pub(super) listeners: HashMap>, + pub(super) child_listeners: HashMap< + StorageKey, + (HashMap>, FnvHashSet), + >, + pub(super) sinks: FnvHashMap, +} + +#[derive(Debug)] +pub(super) struct SubscriberSink { + subs_id: SubscriberId, + keys: Keys, + child_keys: ChildKeys, + was_triggered: bool, +} + +impl Drop for SubscriberSink { + fn drop(&mut self) { + if !self.was_triggered { + log::trace!( + target: "storage_notifications", + "Listener was never triggered: id={}, keys={:?}, child_keys={:?}", + self.subs_id, + PrintKeys(&self.keys), + PrintChildKeys(&self.child_keys), + ); + } + } +} + +impl SubscriberSink { + fn new(subs_id: SubscriberId, keys: Keys, child_keys: ChildKeys) -> Self { + Self { subs_id, keys, child_keys, was_triggered: false } + } +} + +impl Registry { + pub(super) fn new(prometheus_registry: Option) -> Self { + let metrics = prometheus_registry.and_then(|r| { + CounterVec::new( + Opts::new( + "substrate_storage_notification_subscribers", + "Number of subscribers in storage notification sytem", + ), + &["action"], // added | removed + ) + .and_then(|g| register(g, &r)) + .ok() + }); + + Registry { metrics, ..Default::default() } + } +} + +impl Unsubscribe for Registry { + fn unsubscribe(&mut self, subs_id: SubscriberId) { + self.remove_subscriber(subs_id); + } +} + +impl<'a> Subscribe> for Registry { + fn subscribe(&mut self, subs_op: SubscribeOp<'a>, subs_id: SubscriberId) { + let SubscribeOp { filter_keys, filter_child_keys } = subs_op; + + let keys = Self::listen_from( + subs_id, + filter_keys.as_ref(), + &mut self.listeners, + &mut self.wildcard_listeners, + ); + + let child_keys = filter_child_keys.map(|filter_child_keys| { + filter_child_keys + .iter() + .map(|(c_key, o_keys)| { + let (c_listeners, c_wildcards) = + self.child_listeners.entry(c_key.clone()).or_default(); + + ( + c_key.clone(), + Self::listen_from( + subs_id, + o_keys.as_ref(), + &mut *c_listeners, + &mut *c_wildcards, + ), + ) + }) + .collect() + }); + + if let Some(m) = self.metrics.as_ref() { + m.with_label_values(&[&"added"]).inc(); + } + + if self + .sinks + .insert(subs_id, SubscriberSink::new(subs_id, keys, child_keys)) + .is_some() + { + log::warn!("The `subscribe`-method has been passed a non-unique subs_id (in `sc-client-api::notifications`)"); + } + } +} + +impl<'a, Hash, CS, CCS, CCSI> Dispatch<(&'a Hash, CS, CCS)> for Registry +where + Hash: Clone, + CS: Iterator, Option>)>, + CCS: Iterator, CCSI)>, + CCSI: Iterator, Option>)>, +{ + type Item = StorageNotification; + type Ret = (); + + fn dispatch(&mut self, message: (&'a Hash, CS, CCS), dispatch: F) -> Self::Ret + where + F: FnMut(&SubscriberId, Self::Item), + { + let (hash, changeset, child_changeset) = message; + self.trigger(hash, changeset, child_changeset, dispatch); + } +} + +impl Registry { + pub(super) fn trigger( + &mut self, + hash: &Hash, + changeset: impl Iterator, Option>)>, + child_changeset: impl Iterator< + Item = (Vec, impl Iterator, Option>)>), + >, + mut dispatch: F, + ) where + Hash: Clone, + F: FnMut(&SubscriberId, StorageNotification), + { + let has_wildcard = !self.wildcard_listeners.is_empty(); + + // early exit if no listeners + if !has_wildcard && self.listeners.is_empty() && self.child_listeners.is_empty() { + return + } + + let mut subscribers = self.wildcard_listeners.clone(); + let mut changes = Vec::new(); + let mut child_changes = Vec::new(); + + // Collect subscribers and changes + for (k, v) in changeset { + let k = StorageKey(k); + let listeners = self.listeners.get(&k); + + if let Some(ref listeners) = listeners { + subscribers.extend(listeners.iter()); + } + + if has_wildcard || listeners.is_some() { + changes.push((k, v.map(StorageData))); + } + } + for (sk, changeset) in child_changeset { + let sk = StorageKey(sk); + if let Some((cl, cw)) = self.child_listeners.get(&sk) { + let mut changes = Vec::new(); + for (k, v) in changeset { + let k = StorageKey(k); + let listeners = cl.get(&k); + + if let Some(ref listeners) = listeners { + subscribers.extend(listeners.iter()); + } + + subscribers.extend(cw.iter()); + + if !cw.is_empty() || listeners.is_some() { + changes.push((k, v.map(StorageData))); + } + } + if !changes.is_empty() { + child_changes.push((sk, changes)); + } + } + } + + // Don't send empty notifications + if changes.is_empty() && child_changes.is_empty() { + return + } + + let changes = Arc::<[_]>::from(changes); + let child_changes = Arc::<[_]>::from(child_changes); + + // Trigger the events + self.sinks.iter_mut().for_each(|(subs_id, sink)| { + if subscribers.contains(subs_id) { + sink.was_triggered = true; + + let storage_change_set = StorageChangeSet { + changes: changes.clone(), + child_changes: child_changes.clone(), + filter: sink.keys.clone(), + child_filters: sink.child_keys.clone(), + }; + + let notification = + StorageNotification { block: hash.clone(), changes: storage_change_set }; + + dispatch(subs_id, notification); + } + }); + } +} + +impl Registry { + fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> { + let sink = self.sinks.remove(&subscriber)?; + + Self::remove_subscriber_from( + subscriber, + &sink.keys, + &mut self.listeners, + &mut self.wildcard_listeners, + ); + if let Some(child_filters) = &sink.child_keys { + for (c_key, filters) in child_filters { + if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) { + Self::remove_subscriber_from( + subscriber, + &filters, + &mut *listeners, + &mut *wildcards, + ); + + if listeners.is_empty() && wildcards.is_empty() { + self.child_listeners.remove(&c_key); + } + } + } + } + if let Some(m) = self.metrics.as_ref() { + m.with_label_values(&[&"removed"]).inc(); + } + + Some((sink.keys.clone(), sink.child_keys.clone())) + } + + fn remove_subscriber_from( + subscriber: SubscriberId, + filters: &Keys, + listeners: &mut HashMap>, + wildcards: &mut FnvHashSet, + ) { + match filters { + None => { + wildcards.remove(&subscriber); + }, + Some(filters) => + for key in filters.iter() { + let remove_key = match listeners.get_mut(key) { + Some(ref mut set) => { + set.remove(&subscriber); + set.is_empty() + }, + None => false, + }; + + if remove_key { + listeners.remove(key); + } + }, + } + } + + fn listen_from( + current_id: SubscriberId, + filter_keys: Option>, + listeners: &mut HashMap>, + wildcards: &mut FnvHashSet, + ) -> Keys { + match filter_keys { + None => { + wildcards.insert(current_id); + None + }, + Some(keys) => Some( + keys.as_ref() + .iter() + .map(|key| { + listeners.entry(key.clone()).or_default().insert(current_id); + key.clone() + }) + .collect(), + ), + } + } +} + +pub(super) struct PrintKeys<'a>(pub &'a Keys); +impl<'a> std::fmt::Debug for PrintKeys<'a> { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + if let Some(keys) = self.0 { + fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish() + } else { + write!(fmt, "None") + } + } +} + +pub(super) struct PrintChildKeys<'a>(pub &'a ChildKeys); +impl<'a> std::fmt::Debug for PrintChildKeys<'a> { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + if let Some(map) = self.0 { + fmt.debug_map() + .entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values)))) + .finish() + } else { + write!(fmt, "None") + } + } +} diff --git a/substrate/client/api/src/notifications/tests.rs b/substrate/client/api/src/notifications/tests.rs new file mode 100644 index 0000000000..2c728de742 --- /dev/null +++ b/substrate/client/api/src/notifications/tests.rs @@ -0,0 +1,221 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::*; + +use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper, H256 as Hash}; +use std::iter::{empty, Empty}; + +type TestChangeSet = ( + Vec<(StorageKey, Option)>, + Vec<(StorageKey, Vec<(StorageKey, Option)>)>, +); + +impl From for StorageChangeSet { + fn from(changes: TestChangeSet) -> Self { + // warning hardcoded child trie wildcard to test upon + let child_filters = Some( + [(StorageKey(vec![4]), None), (StorageKey(vec![5]), None)] + .iter() + .cloned() + .collect(), + ); + StorageChangeSet { + changes: From::from(changes.0), + child_changes: From::from(changes.1), + filter: None, + child_filters, + } + } +} + +impl PartialEq for StorageChangeSet { + fn eq(&self, other: &Self) -> bool { + self.iter().eq(other.iter()) + } +} + +type Block = RawBlock>; + +#[test] +fn triggering_change_should_notify_wildcard_listeners() { + // given + let notifications = StorageNotifications::::new(None); + let child_filter = [(StorageKey(vec![4]), None)]; + let mut recv = + futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter[..]))); + + // when + let changeset = vec![(vec![2], Some(vec![3])), (vec![3], None)]; + let c_changeset_1 = vec![(vec![5], Some(vec![4])), (vec![6], None)]; + let c_changeset = vec![(vec![4], c_changeset_1)]; + notifications.trigger( + &Hash::from_low_u64_be(1), + changeset.into_iter(), + c_changeset.into_iter().map(|(a, b)| (a, b.into_iter())), + ); + + // then + assert_eq!( + recv.next().map(StorageNotification::into_fields).unwrap(), + ( + Hash::from_low_u64_be(1), + ( + vec![ + (StorageKey(vec![2]), Some(StorageData(vec![3]))), + (StorageKey(vec![3]), None), + ], + vec![( + StorageKey(vec![4]), + vec![ + (StorageKey(vec![5]), Some(StorageData(vec![4]))), + (StorageKey(vec![6]), None), + ] + )] + ) + .into() + ) + ); +} + +#[test] +fn should_only_notify_interested_listeners() { + // given + let notifications = StorageNotifications::::new(None); + let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; + let mut recv1 = futures::executor::block_on_stream( + notifications.listen(Some(&[StorageKey(vec![1])]), None), + ); + let mut recv2 = futures::executor::block_on_stream( + notifications.listen(Some(&[StorageKey(vec![2])]), None), + ); + let mut recv3 = + futures::executor::block_on_stream(notifications.listen(Some(&[]), Some(&child_filter))); + + // when + let changeset = vec![(vec![2], Some(vec![3])), (vec![1], None)]; + let c_changeset_1 = vec![(vec![5], Some(vec![4])), (vec![6], None)]; + + let c_changeset = vec![(vec![4], c_changeset_1)]; + notifications.trigger( + &Hash::from_low_u64_be(1), + changeset.into_iter(), + c_changeset.into_iter().map(|(a, b)| (a, b.into_iter())), + ); + + // then + assert_eq!( + recv1.next().map(StorageNotification::into_fields).unwrap(), + (Hash::from_low_u64_be(1), (vec![(StorageKey(vec![1]), None),], vec![]).into()) + ); + assert_eq!( + recv2.next().map(StorageNotification::into_fields).unwrap(), + ( + Hash::from_low_u64_be(1), + (vec![(StorageKey(vec![2]), Some(StorageData(vec![3]))),], vec![]).into() + ) + ); + assert_eq!( + recv3.next().map(StorageNotification::into_fields).unwrap(), + ( + Hash::from_low_u64_be(1), + ( + vec![], + vec![( + StorageKey(vec![4]), + vec![(StorageKey(vec![5]), Some(StorageData(vec![4])))] + ),] + ) + .into() + ) + ); +} + +#[test] +fn should_cleanup_subscribers_if_dropped() { + // given + let notifications = StorageNotifications::::new(None); + { + let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; + let _recv1 = futures::executor::block_on_stream( + notifications.listen(Some(&[StorageKey(vec![1])]), None), + ); + let _recv2 = futures::executor::block_on_stream( + notifications.listen(Some(&[StorageKey(vec![2])]), None), + ); + let _recv3 = futures::executor::block_on_stream(notifications.listen(None, None)); + let _recv4 = + futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter))); + assert_eq!(notifications.map_registry(|r| r.listeners.len()), 2); + assert_eq!(notifications.map_registry(|r| r.wildcard_listeners.len()), 2); + assert_eq!(notifications.map_registry(|r| r.child_listeners.len()), 1); + } + + // when + let changeset = vec![(vec![2], Some(vec![3])), (vec![1], None)]; + let c_changeset = empty::<(_, Empty<_>)>(); + notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset); + + // then + assert_eq!(notifications.map_registry(|r| r.listeners.len()), 0); + assert_eq!(notifications.map_registry(|r| r.wildcard_listeners.len()), 0); + assert_eq!(notifications.map_registry(|r| r.child_listeners.len()), 0); +} + +#[test] +fn should_cleanup_subscriber_if_stream_is_dropped() { + let notifications = StorageNotifications::::new(None); + let stream = notifications.listen(None, None); + assert_eq!(notifications.map_registry(|r| r.sinks.len()), 1); + std::mem::drop(stream); + assert_eq!(notifications.map_registry(|r| r.sinks.len()), 0); +} + +#[test] +fn should_not_send_empty_notifications() { + // given + let mut recv = { + let notifications = StorageNotifications::::new(None); + let recv = futures::executor::block_on_stream(notifications.listen(None, None)); + + // when + let changeset = vec![]; + let c_changeset = empty::<(_, Empty<_>)>(); + notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset); + recv + }; + + // then + assert_eq!(recv.next().map(StorageNotification::into_fields), None); +} + +impl StorageNotifications { + fn map_registry(&self, map: MapF) -> Ret + where + MapF: FnOnce(&Registry) -> Ret, + { + self.0.map_registry_for_tests(map) + } +} + +impl StorageNotification { + fn into_fields(self) -> (H, StorageChangeSet) { + let Self { block, changes } = self; + (block, changes) + } +} diff --git a/substrate/client/rpc/src/state/state_full.rs b/substrate/client/rpc/src/state/state_full.rs index b8132094fe..1a35760bd6 100644 --- a/substrate/client/rpc/src/state/state_full.rs +++ b/substrate/client/rpc/src/state/state_full.rs @@ -51,7 +51,7 @@ use super::{ }; use sc_client_api::{ Backend, BlockBackend, BlockchainEvents, CallExecutor, ExecutorProvider, ProofProvider, - StorageProvider, + StorageNotification, StorageProvider, }; use std::marker::PhantomData; @@ -466,7 +466,7 @@ where ); self.subscriptions.add(subscriber, |sink| { - let stream = stream.map(|(block, changes)| { + let stream = stream.map(|StorageNotification { block, changes }| { Ok(Ok::<_, rpc::Error>(StorageChangeSet { block, changes: changes diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 8497c34f5a..004d4b711e 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -107,7 +107,7 @@ where { backend: Arc, executor: E, - storage_notifications: Mutex>, + storage_notifications: StorageNotifications, import_notification_sinks: NotificationSinks>, finality_notification_sinks: NotificationSinks>, // Collects auxiliary operations to be performed atomically together with @@ -393,7 +393,7 @@ where Ok(Client { backend, executor, - storage_notifications: Mutex::new(StorageNotifications::new(prometheus_registry)), + storage_notifications: StorageNotifications::new(prometheus_registry), import_notification_sinks: Default::default(), finality_notification_sinks: Default::default(), import_actions: Default::default(), @@ -986,7 +986,7 @@ where if let Some(storage_changes) = storage_changes { // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? - self.storage_notifications.lock().trigger( + self.storage_notifications.trigger( ¬ification.hash, storage_changes.0.into_iter(), storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), @@ -1951,7 +1951,7 @@ where filter_keys: Option<&[StorageKey]>, child_filter_keys: Option<&[(StorageKey, Option>)]>, ) -> sp_blockchain::Result> { - Ok(self.storage_notifications.lock().listen(filter_keys, child_filter_keys)) + Ok(self.storage_notifications.listen(filter_keys, child_filter_keys)) } } diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs index 7c90cd79cc..d7b3ddc996 100644 --- a/substrate/client/transaction-pool/api/src/lib.rs +++ b/substrate/client/transaction-pool/api/src/lib.rs @@ -292,7 +292,7 @@ pub enum ChainEvent { /// Hash of just finalized block. hash: B::Hash, /// Path from old finalized to new finalized parent. - tree_route: Arc>, + tree_route: Arc<[B::Hash]>, }, } diff --git a/substrate/client/transaction-pool/tests/pool.rs b/substrate/client/transaction-pool/tests/pool.rs index 21a87f6e00..08ecdbc86c 100644 --- a/substrate/client/transaction-pool/tests/pool.rs +++ b/substrate/client/transaction-pool/tests/pool.rs @@ -387,7 +387,7 @@ fn should_push_watchers_during_maintenance() { let header_hash = header.hash(); block_on(pool.maintain(block_event(header))); - let event = ChainEvent::Finalized { hash: header_hash.clone(), tree_route: Arc::new(vec![]) }; + let event = ChainEvent::Finalized { hash: header_hash.clone(), tree_route: Arc::from(vec![]) }; block_on(pool.maintain(event)); // then @@ -445,7 +445,7 @@ fn finalization() { let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; block_on(pool.maintain(event)); - let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::new(vec![]) }; + let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::from(vec![]) }; block_on(pool.maintain(event)); let mut stream = futures::executor::block_on_stream(watcher); @@ -493,7 +493,7 @@ fn fork_aware_finalization() { b1 = header.hash(); block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); - let event = ChainEvent::Finalized { hash: b1, tree_route: Arc::new(vec![]) }; + let event = ChainEvent::Finalized { hash: b1, tree_route: Arc::from(vec![]) }; block_on(pool.maintain(event)); } @@ -537,7 +537,7 @@ fn fork_aware_finalization() { block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 2); - let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::new(vec![]) }; + let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::from(vec![]) }; block_on(pool.maintain(event)); } @@ -554,7 +554,7 @@ fn fork_aware_finalization() { d1 = header.hash(); block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 2); - let event = ChainEvent::Finalized { hash: d1, tree_route: Arc::new(vec![]) }; + let event = ChainEvent::Finalized { hash: d1, tree_route: Arc::from(vec![]) }; block_on(pool.maintain(event)); } @@ -567,7 +567,7 @@ fn fork_aware_finalization() { let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); - block_on(pool.maintain(ChainEvent::Finalized { hash: e1, tree_route: Arc::new(vec![]) })); + block_on(pool.maintain(ChainEvent::Finalized { hash: e1, tree_route: Arc::from(vec![]) })); } for (canon_watcher, h) in canon_watchers { @@ -637,7 +637,7 @@ fn prune_and_retract_tx_at_same_time() { block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); - let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::new(vec![]) }; + let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::from(vec![]) }; block_on(pool.maintain(event)); header.hash() diff --git a/substrate/client/utils/Cargo.toml b/substrate/client/utils/Cargo.toml index c5acff2454..70d6bcd8b0 100644 --- a/substrate/client/utils/Cargo.toml +++ b/substrate/client/utils/Cargo.toml @@ -15,6 +15,7 @@ lazy_static = "1.4.0" parking_lot = "0.12.0" prometheus = { version = "0.13.0", default-features = false } futures-timer = "3.0.2" +log = "0.4" [features] default = ["metered"] diff --git a/substrate/client/utils/src/id_sequence.rs b/substrate/client/utils/src/id_sequence.rs new file mode 100644 index 0000000000..2ccb4fc3f3 --- /dev/null +++ b/substrate/client/utils/src/id_sequence.rs @@ -0,0 +1,54 @@ +// This file is part of Substrate. + +// Copyright (C) 2020-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Produce opaque sequential IDs. + +/// A Sequence of IDs. +#[derive(Debug, Default)] +// The `Clone` trait is intentionally not defined on this type. +pub struct IDSequence { + next_id: u64, +} + +/// A Sequential ID. +/// +/// Its integer value is intentionally not public: it is supposed to be instantiated from within +/// this module only. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct SeqID(u64); + +impl std::fmt::Display for SeqID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl IDSequence { + /// Create a new ID-sequence. + pub fn new() -> Self { + Default::default() + } + + /// Obtain another ID from this sequence. + pub fn next_id(&mut self) -> SeqID { + let id = SeqID(self.next_id); + self.next_id += 1; + + id + } +} diff --git a/substrate/client/utils/src/lib.rs b/substrate/client/utils/src/lib.rs index b3fb8400b1..0e47330335 100644 --- a/substrate/client/utils/src/lib.rs +++ b/substrate/client/utils/src/lib.rs @@ -36,7 +36,9 @@ //! | entity | Name of channel passed to `tracing_unbounded` | //! | action | One of `send`/`received`/`dropped` | +pub mod id_sequence; pub mod metrics; pub mod mpsc; pub mod notification; +pub mod pubsub; pub mod status_sinks; diff --git a/substrate/client/utils/src/notification.rs b/substrate/client/utils/src/notification.rs index 21d01c5f99..ff527c343f 100644 --- a/substrate/client/utils/src/notification.rs +++ b/substrate/client/utils/src/notification.rs @@ -26,15 +26,19 @@ //! //! See [`sc-service::builder::RpcExtensionBuilder`] for more details. -use std::{marker::PhantomData, sync::Arc}; +use futures::stream::{FusedStream, Stream}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; -use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use crate::pubsub::{Hub, Receiver}; -use parking_lot::Mutex; +mod registry; +use registry::Registry; -/// Collection of channel sending endpoints shared with the receiver side -/// so they can register themselves. -type SharedSenders = Arc>>>; +#[cfg(test)] +mod tests; /// Trait used to define the "tracing key" string used to tag /// and identify the mpsc channels. @@ -44,108 +48,75 @@ pub trait TracingKeyStr { const TRACING_KEY: &'static str; } -/// The sending half of the notifications channel(s). +/// The receiving half of the notifications channel. /// -/// Used to send notifications from the BEEFY gadget side. +/// The [`NotificationStream`] entity stores the [`Hub`] so it can be +/// used to add more subscriptions. #[derive(Clone)] -pub struct NotificationSender { - subscribers: SharedSenders, +pub struct NotificationStream { + hub: Hub, + _pd: std::marker::PhantomData, } -impl NotificationSender { - /// The `subscribers` should be shared with a corresponding `NotificationStream`. - fn new(subscribers: SharedSenders) -> Self { - Self { subscribers } +/// The receiving half of the notifications channel(s). +#[derive(Debug)] +pub struct NotificationReceiver { + receiver: Receiver, +} + +/// The sending half of the notifications channel(s). +pub struct NotificationSender { + hub: Hub, +} + +impl NotificationStream { + /// Creates a new pair of receiver and sender of `Payload` notifications. + pub fn channel() -> (NotificationSender, Self) { + let hub = Hub::new(TK::TRACING_KEY); + let sender = NotificationSender { hub: hub.clone() }; + let receiver = NotificationStream { hub, _pd: Default::default() }; + (sender, receiver) } + /// Subscribe to a channel through which the generic payload can be received. + pub fn subscribe(&self) -> NotificationReceiver { + let receiver = self.hub.subscribe(()); + NotificationReceiver { receiver } + } +} + +impl NotificationSender { /// Send out a notification to all subscribers that a new payload is available for a /// block. pub fn notify( &self, payload: impl FnOnce() -> Result, - ) -> Result<(), Error> { - let mut subscribers = self.subscribers.lock(); - - // do an initial prune on closed subscriptions - subscribers.retain(|n| !n.is_closed()); - - if !subscribers.is_empty() { - let payload = payload()?; - subscribers.retain(|n| n.unbounded_send(payload.clone()).is_ok()); - } - - Ok(()) + ) -> Result<(), Error> + where + Payload: Clone, + { + self.hub.send(payload) } } -/// The receiving half of the notifications channel. -/// -/// The `NotificationStream` entity stores the `SharedSenders` so it can be -/// used to add more subscriptions. -#[derive(Clone)] -pub struct NotificationStream { - subscribers: SharedSenders, - _trace_key: PhantomData, -} - -impl NotificationStream { - /// Creates a new pair of receiver and sender of `Payload` notifications. - pub fn channel() -> (NotificationSender, Self) { - let subscribers = Arc::new(Mutex::new(vec![])); - let receiver = NotificationStream::new(subscribers.clone()); - let sender = NotificationSender::new(subscribers); - (sender, receiver) - } - - /// Create a new receiver of `Payload` notifications. - /// - /// The `subscribers` should be shared with a corresponding `NotificationSender`. - fn new(subscribers: SharedSenders) -> Self { - Self { subscribers, _trace_key: PhantomData } - } - - /// Subscribe to a channel through which the generic payload can be received. - pub fn subscribe(&self) -> TracingUnboundedReceiver { - let (sender, receiver) = tracing_unbounded(TK::TRACING_KEY); - self.subscribers.lock().push(sender); - receiver +impl Clone for NotificationSender { + fn clone(&self) -> Self { + Self { hub: self.hub.clone() } } } -#[cfg(test)] -mod tests { - use super::*; - use futures::StreamExt; +impl Unpin for NotificationReceiver {} - #[derive(Clone)] - pub struct DummyTracingKey; - impl TracingKeyStr for DummyTracingKey { - const TRACING_KEY: &'static str = "test_notification_stream"; - } +impl Stream for NotificationReceiver { + type Item = Payload; - type StringStream = NotificationStream; - - #[test] - fn notification_channel_simple() { - let (sender, stream) = StringStream::channel(); - - let test_payload = String::from("test payload"); - let closure_payload = test_payload.clone(); - - // Create a future to receive a single notification - // from the stream and verify its payload. - let future = stream.subscribe().take(1).for_each(move |payload| { - let test_payload = closure_payload.clone(); - async move { - assert_eq!(payload, test_payload); - } - }); - - // Send notification. - let r: std::result::Result<(), ()> = sender.notify(|| Ok(test_payload)); - r.unwrap(); - - // Run receiver future. - tokio_test::block_on(future); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().receiver).poll_next(cx) + } +} + +impl FusedStream for NotificationReceiver { + fn is_terminated(&self) -> bool { + self.receiver.is_terminated() } } diff --git a/substrate/client/utils/src/notification/registry.rs b/substrate/client/utils/src/notification/registry.rs new file mode 100644 index 0000000000..ebe41452a5 --- /dev/null +++ b/substrate/client/utils/src/notification/registry.rs @@ -0,0 +1,63 @@ +// This file is part of Substrate. + +// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::collections::HashSet; + +use crate::{ + id_sequence::SeqID, + pubsub::{Dispatch, Subscribe, Unsubscribe}, +}; + +/// The shared structure to keep track on subscribers. +#[derive(Debug, Default)] +pub(super) struct Registry { + pub(super) subscribers: HashSet, +} + +impl Subscribe<()> for Registry { + fn subscribe(&mut self, _subs_key: (), subs_id: SeqID) { + self.subscribers.insert(subs_id); + } +} +impl Unsubscribe for Registry { + fn unsubscribe(&mut self, subs_id: SeqID) { + self.subscribers.remove(&subs_id); + } +} + +impl Dispatch for Registry +where + MakePayload: FnOnce() -> Result, + Payload: Clone, +{ + type Item = Payload; + type Ret = Result<(), Error>; + + fn dispatch(&mut self, make_payload: MakePayload, mut dispatch: F) -> Self::Ret + where + F: FnMut(&SeqID, Self::Item), + { + if !self.subscribers.is_empty() { + let payload = make_payload()?; + for subs_id in &self.subscribers { + dispatch(subs_id, payload.clone()); + } + } + Ok(()) + } +} diff --git a/substrate/client/utils/src/notification/tests.rs b/substrate/client/utils/src/notification/tests.rs new file mode 100644 index 0000000000..a001fa7e89 --- /dev/null +++ b/substrate/client/utils/src/notification/tests.rs @@ -0,0 +1,52 @@ +// This file is part of Substrate. + +// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::*; +use futures::StreamExt; + +#[derive(Clone)] +pub struct DummyTracingKey; +impl TracingKeyStr for DummyTracingKey { + const TRACING_KEY: &'static str = "test_notification_stream"; +} + +type StringStream = NotificationStream; + +#[test] +fn notification_channel_simple() { + let (sender, stream) = StringStream::channel(); + + let test_payload = String::from("test payload"); + let closure_payload = test_payload.clone(); + + // Create a future to receive a single notification + // from the stream and verify its payload. + let future = stream.subscribe().take(1).for_each(move |payload| { + let test_payload = closure_payload.clone(); + async move { + assert_eq!(payload, test_payload); + } + }); + + // Send notification. + let r: std::result::Result<(), ()> = sender.notify(|| Ok(test_payload)); + r.unwrap(); + + // Run receiver future. + tokio_test::block_on(future); +} diff --git a/substrate/client/utils/src/pubsub.rs b/substrate/client/utils/src/pubsub.rs new file mode 100644 index 0000000000..c8e51e3494 --- /dev/null +++ b/substrate/client/utils/src/pubsub.rs @@ -0,0 +1,263 @@ +// This file is part of Substrate. + +// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Provides means to implement a typical Pub/Sub mechanism. +//! +//! This module provides a type [`Hub`] which can be used both to subscribe, +//! and to send the broadcast messages. +//! +//! The [`Hub`] type is parametrized by two other types: +//! - `Message` — the type of a message that shall be delivered to the subscribers; +//! - `Registry` — implementation of the subscription/dispatch logic. +//! +//! A Registry is implemented by defining the following traits: +//! - [`Subscribe`]; +//! - [`Dispatch`]; +//! - [`Unsubscribe`]. +//! +//! As a result of subscription `Hub::subscribe` method returns an instance of +//! [`Receiver`]. That can be used as a [`Stream`] to receive the messages. +//! Upon drop the [`Receiver`] shall unregister itself from the `Hub`. + +use std::{ + collections::HashMap, + pin::Pin, + sync::{Arc, Weak}, + task::{Context, Poll}, +}; + +use futures::stream::{FusedStream, Stream}; +// use parking_lot::Mutex; +use parking_lot::ReentrantMutex; +use std::cell::RefCell; + +use crate::{ + id_sequence::SeqID, + mpsc::{TracingUnboundedReceiver, TracingUnboundedSender}, +}; + +#[cfg(test)] +mod tests; + +/// Unsubscribe: unregisters a previously created subscription. +pub trait Unsubscribe { + /// Remove all registrations of the subscriber with ID `subs_id`. + fn unsubscribe(&mut self, subs_id: SeqID); +} + +/// Subscribe using a key of type `K` +pub trait Subscribe { + /// Register subscriber with the ID `subs_id` as having interest to the key `K`. + fn subscribe(&mut self, subs_key: K, subs_id: SeqID); +} + +/// Dispatch a message of type `M`. +pub trait Dispatch { + /// The type of the that shall be sent through the channel as a result of such dispatch. + type Item; + /// The type returned by the `dispatch`-method. + type Ret; + + /// Dispatch the message of type `M`. + /// + /// The implementation is given an instance of `M` and is supposed to invoke `dispatch` for + /// each matching subscriber, with an argument of type `Self::Item` matching that subscriber. + /// + /// Note that this does not have to be of the same type with the item that will be sent through + /// to the subscribers. The subscribers will receive a message of type `Self::Item`. + fn dispatch(&mut self, message: M, dispatch: F) -> Self::Ret + where + F: FnMut(&SeqID, Self::Item); +} + +/// A subscription hub. +/// +/// Does the subscription and dispatch. +/// The exact subscription and routing behaviour is to be implemented by the Registry (of type `R`). +/// The Hub under the hood uses the channel defined in `crate::mpsc` module. +#[derive(Debug)] +pub struct Hub { + tracing_key: &'static str, + shared: Arc>>>, +} + +/// The receiving side of the subscription. +/// +/// The messages are delivered as items of a [`Stream`]. +/// Upon drop this receiver unsubscribes itself from the [`Hub`]. +#[derive(Debug)] +pub struct Receiver +where + R: Unsubscribe, +{ + rx: TracingUnboundedReceiver, + + shared: Weak>>>, + subs_id: SeqID, +} + +#[derive(Debug)] +struct Shared { + id_sequence: crate::id_sequence::IDSequence, + registry: R, + sinks: HashMap>, +} + +impl Hub +where + R: Unsubscribe, +{ + /// Provide access to the registry (for test purposes). + pub fn map_registry_for_tests(&self, map: MapF) -> Ret + where + MapF: FnOnce(&R) -> Ret, + { + let shared_locked = self.shared.lock(); + let shared_borrowed = shared_locked.borrow(); + map(&shared_borrowed.registry) + } +} + +impl Drop for Receiver +where + R: Unsubscribe, +{ + fn drop(&mut self) { + if let Some(shared) = self.shared.upgrade() { + shared.lock().borrow_mut().unsubscribe(self.subs_id); + } + } +} + +impl Hub { + /// Create a new instance of Hub (with default value for the Registry). + pub fn new(tracing_key: &'static str) -> Self + where + R: Default, + { + Self::new_with_registry(tracing_key, Default::default()) + } + + /// Create a new instance of Hub over the initialized Registry. + pub fn new_with_registry(tracing_key: &'static str, registry: R) -> Self { + let shared = + Shared { registry, sinks: Default::default(), id_sequence: Default::default() }; + let shared = Arc::new(ReentrantMutex::new(RefCell::new(shared))); + Self { tracing_key, shared } + } + + /// Subscribe to this Hub using the `subs_key: K`. + /// + /// A subscription with a key `K` is possible if the Registry implements `Subscribe`. + pub fn subscribe(&self, subs_key: K) -> Receiver + where + R: Subscribe + Unsubscribe, + { + let shared_locked = self.shared.lock(); + let mut shared_borrowed = shared_locked.borrow_mut(); + + let subs_id = shared_borrowed.id_sequence.next_id(); + + // The order (registry.subscribe then sinks.insert) is important here: + // assuming that `Subscribe::subscribe` can panic, it is better to at least + // have the sink disposed. + shared_borrowed.registry.subscribe(subs_key, subs_id); + + let (tx, rx) = crate::mpsc::tracing_unbounded(self.tracing_key); + assert!(shared_borrowed.sinks.insert(subs_id, tx).is_none(), "Used IDSequence to create another ID. Should be unique until u64 is overflowed. Should be unique."); + + Receiver { shared: Arc::downgrade(&self.shared), subs_id, rx } + } + + /// Send the message produced with `Trigger`. + /// + /// This is possible if the registry implements `Dispatch`. + pub fn send(&self, trigger: Trigger) -> >::Ret + where + R: Dispatch, + { + let shared_locked = self.shared.lock(); + let mut shared_borrowed = shared_locked.borrow_mut(); + let (registry, sinks) = shared_borrowed.get_mut(); + + let dispatch_result = registry.dispatch(trigger, |subs_id, item| { + if let Some(tx) = sinks.get_mut(&subs_id) { + if let Err(send_err) = tx.unbounded_send(item) { + log::warn!("Sink with SubsID = {} failed to perform unbounded_send: {} ({} as Dispatch<{}, Item = {}>::dispatch(...))", subs_id, send_err, std::any::type_name::(), + std::any::type_name::(), + std::any::type_name::()); + } + } else { + log::warn!( + "No Sink for SubsID = {} ({} as Dispatch<{}, Item = {}>::dispatch(...))", + subs_id, + std::any::type_name::(), + std::any::type_name::(), + std::any::type_name::(), + ); + } + }); + + dispatch_result + } +} + +impl Shared { + fn get_mut(&mut self) -> (&mut R, &mut HashMap>) { + (&mut self.registry, &mut self.sinks) + } + + fn unsubscribe(&mut self, subs_id: SeqID) + where + R: Unsubscribe, + { + // The order (sinks.remove then registry.unsubscribe) is important here: + // assuming that `Unsubscribe::unsubscribe` can panic, it is better to at least + // have the sink disposed. + self.sinks.remove(&subs_id); + self.registry.unsubscribe(subs_id); + } +} + +impl Clone for Hub { + fn clone(&self) -> Self { + Self { tracing_key: self.tracing_key, shared: self.shared.clone() } + } +} + +impl Unpin for Receiver where R: Unsubscribe {} + +impl Stream for Receiver +where + R: Unsubscribe, +{ + type Item = M; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().rx).poll_next(cx) + } +} + +impl FusedStream for Receiver +where + R: Unsubscribe, +{ + fn is_terminated(&self) -> bool { + self.rx.is_terminated() + } +} diff --git a/substrate/client/utils/src/pubsub/tests.rs b/substrate/client/utils/src/pubsub/tests.rs new file mode 100644 index 0000000000..12945c0d88 --- /dev/null +++ b/substrate/client/utils/src/pubsub/tests.rs @@ -0,0 +1,123 @@ +// This file is part of Substrate. + +// Copyright (C) 2020-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use futures::StreamExt; +use tokio_test::block_on; + +use super::*; + +mod normal_operation; +mod panicking_registry; + +const TK: &str = "a_tracing_key"; + +type Message = u64; +type TestHub = Hub; +type TestReceiver = Receiver; + +#[derive(Default)] +struct Registry { + subscribers: HashMap, +} + +struct SubsKey { + _receiver: Option, + panic: SubsKeyPanic, +} + +impl SubsKey { + fn new() -> Self { + Self { _receiver: None, panic: SubsKeyPanic::None } + } + fn with_receiver(self, receiver: TestReceiver) -> Self { + Self { _receiver: Some(receiver), ..self } + } + fn with_panic(self, panic: SubsKeyPanic) -> Self { + Self { panic, ..self } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum SubsKeyPanic { + None, + + OnSubscribePanicBefore, + OnSubscribePanicAfter, + + OnUnsubscribePanicBefore, + OnUnsubscribePanicAfter, + + OnDispatchPanicBefore, + OnDispatchPanicAfter, +} + +impl Hub { + fn subs_count(&self) -> usize { + self.map_registry_for_tests(|r| r.subscribers.len()) + } + fn sink_count(&self) -> usize { + self.shared.lock().borrow().sinks.len() + } +} + +impl Subscribe for Registry { + fn subscribe(&mut self, subs_key: SubsKey, subs_id: SeqID) { + let sk_panic = subs_key.panic; + + if sk_panic == SubsKeyPanic::OnSubscribePanicBefore { + panic!("on-subscribe-panic-before") + } + self.subscribers.insert(subs_id, subs_key); + if sk_panic == SubsKeyPanic::OnSubscribePanicAfter { + panic!("on-subscribe-panic-after") + } + } +} +impl Unsubscribe for Registry { + fn unsubscribe(&mut self, subs_id: SeqID) { + let sk_panic = + self.subscribers.get(&subs_id).map(|sk| sk.panic).unwrap_or(SubsKeyPanic::None); + + if sk_panic == SubsKeyPanic::OnUnsubscribePanicBefore { + panic!("on-unsubscribe-panic-before") + } + self.subscribers.remove(&subs_id); + if sk_panic == SubsKeyPanic::OnUnsubscribePanicAfter { + panic!("on-unsubscribe-panic-after") + } + } +} +impl Dispatch for Registry { + type Item = Message; + type Ret = (); + + fn dispatch(&mut self, message: Message, mut dispatch: F) -> Self::Ret + where + F: FnMut(&SeqID, Self::Item), + { + self.subscribers.iter().for_each(|(id, subs_key)| { + if subs_key.panic == SubsKeyPanic::OnDispatchPanicBefore { + panic!("on-dispatch-panic-before") + } + dispatch(id, message); + if subs_key.panic == SubsKeyPanic::OnDispatchPanicAfter { + panic!("on-dispatch-panic-after") + } + }); + } +} diff --git a/substrate/client/utils/src/pubsub/tests/normal_operation.rs b/substrate/client/utils/src/pubsub/tests/normal_operation.rs new file mode 100644 index 0000000000..a13c718d74 --- /dev/null +++ b/substrate/client/utils/src/pubsub/tests/normal_operation.rs @@ -0,0 +1,88 @@ +// This file is part of Substrate. + +// Copyright (C) 2020-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::*; + +#[test] +fn positive_rx_receives_relevant_messages_and_terminates_upon_hub_drop() { + block_on(async { + let hub = TestHub::new(TK); + assert_eq!(hub.subs_count(), 0); + + // No subscribers yet. That message is not supposed to get to anyone. + hub.send(0); + + let mut rx_01 = hub.subscribe(SubsKey::new()); + assert_eq!(hub.subs_count(), 1); + + // That message is sent after subscription. Should be delivered into rx_01. + hub.send(1); + assert_eq!(Some(1), rx_01.next().await); + + // Hub is disposed. The rx_01 should be over after that. + std::mem::drop(hub); + + assert!(!rx_01.is_terminated()); + assert_eq!(None, rx_01.next().await); + assert!(rx_01.is_terminated()); + }); +} + +#[test] +fn positive_subs_count_is_correct_upon_drop_of_rxs() { + block_on(async { + let hub = TestHub::new(TK); + assert_eq!(hub.subs_count(), 0); + + let rx_01 = hub.subscribe(SubsKey::new()); + assert_eq!(hub.subs_count(), 1); + let rx_02 = hub.subscribe(SubsKey::new()); + assert_eq!(hub.subs_count(), 2); + + std::mem::drop(rx_01); + assert_eq!(hub.subs_count(), 1); + std::mem::drop(rx_02); + assert_eq!(hub.subs_count(), 0); + }); +} + +#[test] +fn positive_subs_count_is_correct_upon_drop_of_rxs_on_cloned_hubs() { + block_on(async { + let hub_01 = TestHub::new(TK); + let hub_02 = hub_01.clone(); + assert_eq!(hub_01.subs_count(), 0); + assert_eq!(hub_02.subs_count(), 0); + + let rx_01 = hub_02.subscribe(SubsKey::new()); + assert_eq!(hub_01.subs_count(), 1); + assert_eq!(hub_02.subs_count(), 1); + + let rx_02 = hub_02.subscribe(SubsKey::new()); + assert_eq!(hub_01.subs_count(), 2); + assert_eq!(hub_02.subs_count(), 2); + + std::mem::drop(rx_01); + assert_eq!(hub_01.subs_count(), 1); + assert_eq!(hub_02.subs_count(), 1); + + std::mem::drop(rx_02); + assert_eq!(hub_01.subs_count(), 0); + assert_eq!(hub_02.subs_count(), 0); + }); +} diff --git a/substrate/client/utils/src/pubsub/tests/panicking_registry.rs b/substrate/client/utils/src/pubsub/tests/panicking_registry.rs new file mode 100644 index 0000000000..26ce63bd51 --- /dev/null +++ b/substrate/client/utils/src/pubsub/tests/panicking_registry.rs @@ -0,0 +1,248 @@ +// This file is part of Substrate. + +// Copyright (C) 2020-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::*; + +use std::panic::{catch_unwind, AssertUnwindSafe}; + +fn assert_hub_props(hub: &TestHub, sinks_count: usize, subs_count: usize) { + assert_eq!(hub.sink_count(), sinks_count); + assert_eq!(hub.subs_count(), subs_count); +} + +#[test] +fn t01() { + let hub = TestHub::new(TK); + assert_hub_props(&hub, 0, 0); + + let rx_01 = hub.subscribe(SubsKey::new()); + assert_hub_props(&hub, 1, 1); + + std::mem::drop(rx_01); + assert_hub_props(&hub, 0, 0); +} + +#[test] +fn t02() { + block_on(async { + // Create a Hub + let hub = TestHub::new(TK); + assert_hub_props(&hub, 0, 0); + + // Subscribe rx-01 + let rx_01 = hub.subscribe(SubsKey::new()); + assert_hub_props(&hub, 1, 1); + + // Subscribe rx-02 so that its unsubscription will lead to an attempt to drop rx-01 in the + // middle of unsubscription of rx-02 + let rx_02 = hub.subscribe(SubsKey::new().with_receiver(rx_01)); + assert_hub_props(&hub, 2, 2); + + // Subscribe rx-03 in order to see that it will receive messages after the unclean + // unsubscription + let mut rx_03 = hub.subscribe(SubsKey::new()); + assert_hub_props(&hub, 3, 3); + + // drop rx-02 leads to an attempt to unsubscribe rx-01 + assert!(catch_unwind(AssertUnwindSafe(move || { + std::mem::drop(rx_02); + })) + .is_err()); + + // One of the rxes could not unsubscribe + assert_hub_props(&hub, 2, 2); + + // Subscribe rx-04 in order to see that it will receive messages after the unclean + // unsubscription + let mut rx_04 = hub.subscribe(SubsKey::new()); + assert_hub_props(&hub, 3, 3); + + hub.send(2); + + // The messages are still received + assert_eq!(rx_03.next().await, Some(2)); + assert_eq!(rx_04.next().await, Some(2)); + + // Perform a clean unsubscription + std::mem::drop(rx_04); + + hub.send(3); + + // The messages are still received + assert_eq!(rx_03.next().await, Some(3)); + + std::mem::drop(rx_03); + + hub.send(4); + + // The stuck subscription is still there + assert_hub_props(&hub, 1, 1); + }); +} + +async fn add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(hub: &TestHub) { + let rx_01 = hub.subscribe(SubsKey::new()); + let rx_02 = hub.subscribe(SubsKey::new()); + + hub.send(1); + hub.send(2); + hub.send(3); + + assert_eq!(rx_01.take(3).collect::>().await, vec![1, 2, 3]); + + hub.send(4); + hub.send(5); + hub.send(6); + + assert_eq!(rx_02.take(6).collect::>().await, vec![1, 2, 3, 4, 5, 6]); +} + +#[test] +fn t03() { + block_on(async { + // Create a Hub + let hub = TestHub::new(TK); + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + + assert!(catch_unwind(AssertUnwindSafe( + || hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicBefore)) + )) + .is_err()); + + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + }); +} + +#[test] +fn t04() { + block_on(async { + let hub = TestHub::new(TK); + + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + + assert!(catch_unwind(AssertUnwindSafe( + || hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicAfter)) + )) + .is_err()); + + // the registry has panicked after it has added a subs-id into its internal storage — the + // sinks do not leak, although the subscriptions storage contains some garbage + assert_hub_props(&hub, 0, 1); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 1); + }) +} + +#[test] +fn t05() { + block_on(async { + let hub = TestHub::new(TK); + + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + + let rx_01 = + hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicBefore)); + + assert_hub_props(&hub, 1, 1); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 1, 1); + + assert!(catch_unwind(AssertUnwindSafe(move || std::mem::drop(rx_01))).is_err()); + + // the registry has panicked on-unsubscribe before it removed the subs-id from its internal + // storage — the sinks do not leak, although the subscriptions storage contains some garbage + assert_hub_props(&hub, 0, 1); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 1); + }) +} + +#[test] +fn t06() { + block_on(async { + let hub = TestHub::new(TK); + + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + + let rx_01 = hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicAfter)); + + assert_hub_props(&hub, 1, 1); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 1, 1); + + assert!(catch_unwind(AssertUnwindSafe(move || std::mem::drop(rx_01))).is_err()); + + // the registry has panicked on-unsubscribe after it removed the subs-id from its internal + // storage — the sinks do not leak, the subscriptions storage does not contain any garbage + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + }) +} + +#[test] +fn t07() { + block_on(async { + let hub = TestHub::new(TK); + + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + + let rx_01 = hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicBefore)); + assert_hub_props(&hub, 1, 1); + assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err()); + assert_hub_props(&hub, 1, 1); + + std::mem::drop(rx_01); + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + }) +} + +#[test] +fn t08() { + block_on(async { + let hub = TestHub::new(TK); + + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + + let rx_01 = hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicAfter)); + assert_hub_props(&hub, 1, 1); + assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err()); + assert_hub_props(&hub, 1, 1); + + std::mem::drop(rx_01); + assert_hub_props(&hub, 0, 0); + add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; + assert_hub_props(&hub, 0, 0); + }) +}