feat: initialize Kurdistan SDK - independent fork of Polkadot SDK

This commit is contained in:
2025-12-13 15:44:15 +03:00
commit e4778b4576
6838 changed files with 1847450 additions and 0 deletions
+29
View File
@@ -0,0 +1,29 @@
[package]
name = "sc-utils"
version = "14.0.0"
authors.workspace = true
edition.workspace = true
license = "Apache-2.0"
homepage.workspace = true
repository.workspace = true
description = "I/O for Substrate runtimes"
readme = "README.md"
[lints]
workspace = true
[dependencies]
async-channel = { workspace = true }
futures = { workspace = true }
futures-timer = { workspace = true }
log = { workspace = true, default-features = true }
parking_lot = { workspace = true, default-features = true }
prometheus = { workspace = true }
sp-arithmetic = { workspace = true }
[dev-dependencies]
tokio-test = { workspace = true }
[features]
default = ["metered"]
metered = []
+15
View File
@@ -0,0 +1,15 @@
# Utilities Primitives for Substrate
This crate provides `mpsc::tracing_unbounded` function that returns wrapper types to
`async_channel::Sender<T>` and `async_channel::Receiver<T>`, which register every
`send`/`received`/`dropped` action happened on the channel.
Also this wrapper creates and registers a prometheus vector with name `unbounded_channel_len`
and labels:
| Label | Description |
| ------------ | --------------------------------------------- |
| entity | Name of channel passed to `tracing_unbounded` |
| action | One of `send`/`received`/`dropped` |
License: Apache-2.0
+53
View File
@@ -0,0 +1,53 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Produce opaque sequential IDs.
/// A Sequence of IDs.
#[derive(Debug, Default)]
// The `Clone` trait is intentionally not defined on this type.
pub struct IDSequence {
next_id: u64,
}
/// A Sequential ID.
///
/// Its integer value is intentionally not public: it is supposed to be instantiated from within
/// this module only.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct SeqID(u64);
impl std::fmt::Display for SeqID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl IDSequence {
/// Create a new ID-sequence.
pub fn new() -> Self {
Default::default()
}
/// Obtain another ID from this sequence.
pub fn next_id(&mut self) -> SeqID {
let id = SeqID(self.next_id);
self.next_id += 1;
id
}
}
+37
View File
@@ -0,0 +1,37 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities Primitives for Substrate
//!
//! This crate provides `mpsc::tracing_unbounded` function that returns wrapper types to
//! `async_channel::Sender<T>` and `async_channel::Receiver<T>`, which register every
//! `send`/`received`/`dropped` action happened on the channel.
//!
//! Also this wrapper creates and registers a prometheus vector with name `unbounded_channel_len`
//! and labels:
//!
//! | Label | Description |
//! | ------------ | --------------------------------------------- |
//! | entity | Name of channel passed to `tracing_unbounded` |
//! | action | One of `send`/`received`/`dropped` |
pub mod id_sequence;
pub mod metrics;
pub mod mpsc;
pub mod notification;
pub mod pubsub;
pub mod status_sinks;
+76
View File
@@ -0,0 +1,76 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Metering primitives and globals
use prometheus::{
core::{AtomicU64, GenericCounter, GenericGauge},
Error as PrometheusError, Registry,
};
use std::sync::LazyLock;
use prometheus::{
core::{GenericCounterVec, GenericGaugeVec},
Opts,
};
pub static TOKIO_THREADS_TOTAL: LazyLock<GenericCounter<AtomicU64>> = LazyLock::new(|| {
GenericCounter::new("substrate_tokio_threads_total", "Total number of threads created")
.expect("Creating of statics doesn't fail. qed")
});
pub static TOKIO_THREADS_ALIVE: LazyLock<GenericGauge<AtomicU64>> = LazyLock::new(|| {
GenericGauge::new("substrate_tokio_threads_alive", "Number of threads alive right now")
.expect("Creating of statics doesn't fail. qed")
});
pub static UNBOUNDED_CHANNELS_COUNTER: LazyLock<GenericCounterVec<AtomicU64>> =
LazyLock::new(|| {
GenericCounterVec::new(
Opts::new(
"substrate_unbounded_channel_len",
"Items sent/received/dropped on each mpsc::unbounded instance",
),
&["entity", "action"], // name of channel, send|received|dropped
)
.expect("Creating of statics doesn't fail. qed")
});
pub static UNBOUNDED_CHANNELS_SIZE: LazyLock<GenericGaugeVec<AtomicU64>> = LazyLock::new(|| {
GenericGaugeVec::new(
Opts::new(
"substrate_unbounded_channel_size",
"Size (number of messages to be processed) of each mpsc::unbounded instance",
),
&["entity"], // name of channel
)
.expect("Creating of statics doesn't fail. qed")
});
pub static SENT_LABEL: &'static str = "send";
pub static RECEIVED_LABEL: &'static str = "received";
pub static DROPPED_LABEL: &'static str = "dropped";
/// Register the statics to report to registry
pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> {
registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?;
registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?;
registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?;
registry.register(Box::new(UNBOUNDED_CHANNELS_SIZE.clone()))?;
Ok(())
}
+229
View File
@@ -0,0 +1,229 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Code to meter unbounded channels.
pub use async_channel::{TryRecvError, TrySendError};
use crate::metrics::{
DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE,
};
use async_channel::{Receiver, Sender};
use futures::{
stream::{FusedStream, Stream},
task::{Context, Poll},
};
use log::error;
use sp_arithmetic::traits::SaturatedConversion;
use std::{
backtrace::Backtrace,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
/// Wrapper Type around [`async_channel::Sender`] that increases the global
/// measure when a message is added.
#[derive(Debug)]
pub struct TracingUnboundedSender<T> {
inner: Sender<T>,
name: &'static str,
queue_size_warning: usize,
warning_fired: Arc<AtomicBool>,
creation_backtrace: Arc<Backtrace>,
}
// Strangely, deriving `Clone` requires that `T` is also `Clone`.
impl<T> Clone for TracingUnboundedSender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
name: self.name,
queue_size_warning: self.queue_size_warning,
warning_fired: self.warning_fired.clone(),
creation_backtrace: self.creation_backtrace.clone(),
}
}
}
/// Wrapper Type around [`async_channel::Receiver`] that decreases the global
/// measure when a message is polled.
#[derive(Debug)]
pub struct TracingUnboundedReceiver<T> {
inner: Receiver<T>,
name: &'static str,
}
/// Wrapper around [`async_channel::unbounded`] that tracks the in- and outflow via
/// `UNBOUNDED_CHANNELS_COUNTER` and warns if the message queue grows
/// above the warning threshold.
pub fn tracing_unbounded<T>(
name: &'static str,
queue_size_warning: usize,
) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
let (s, r) = async_channel::unbounded();
let sender = TracingUnboundedSender {
inner: s,
name,
queue_size_warning,
warning_fired: Arc::new(AtomicBool::new(false)),
creation_backtrace: Arc::new(Backtrace::force_capture()),
};
let receiver = TracingUnboundedReceiver { inner: r, name: name.into() };
(sender, receiver)
}
impl<T> TracingUnboundedSender<T> {
/// Proxy function to [`async_channel::Sender`].
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
/// Proxy function to [`async_channel::Sender`].
pub fn close(&self) -> bool {
self.inner.close()
}
/// Proxy function to `async_channel::Sender::try_send`.
pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.inner.try_send(msg).inspect(|_| {
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc();
UNBOUNDED_CHANNELS_SIZE
.with_label_values(&[self.name])
.set(self.inner.len().saturated_into());
if self.inner.len() >= self.queue_size_warning &&
self.warning_fired
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
error!(
"The number of unprocessed messages in channel `{}` exceeded {}.\n\
The channel was created at:\n{}\n
Last message was sent from:\n{}",
self.name,
self.queue_size_warning,
self.creation_backtrace,
Backtrace::force_capture(),
);
}
})
}
/// The number of elements in the channel (proxy function to [`async_channel::Sender`]).
pub fn len(&self) -> usize {
self.inner.len()
}
}
impl<T> TracingUnboundedReceiver<T> {
/// Proxy function to [`async_channel::Receiver`].
pub fn close(&mut self) -> bool {
self.inner.close()
}
/// Proxy function to [`async_channel::Receiver`]
/// that discounts the messages taken out.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.inner.try_recv().inspect(|_| {
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc();
UNBOUNDED_CHANNELS_SIZE
.with_label_values(&[self.name])
.set(self.inner.len().saturated_into());
})
}
/// The number of elements in the channel (proxy function to [`async_channel::Receiver`]).
pub fn len(&self) -> usize {
self.inner.len()
}
/// The name of this receiver
pub fn name(&self) -> &'static str {
self.name
}
}
impl<T> Drop for TracingUnboundedReceiver<T> {
fn drop(&mut self) {
// Close the channel to prevent any further messages to be sent into the channel
self.close();
// The number of messages about to be dropped
let count = self.inner.len();
// Discount the messages
if count > 0 {
UNBOUNDED_CHANNELS_COUNTER
.with_label_values(&[self.name, DROPPED_LABEL])
.inc_by(count.saturated_into());
}
// Reset the size metric to 0
UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0);
// Drain all the pending messages in the channel since they can never be accessed,
// this can be removed once https://github.com/smol-rs/async-channel/issues/23 is
// resolved
while let Ok(_) = self.inner.try_recv() {}
}
}
impl<T> Unpin for TracingUnboundedReceiver<T> {}
impl<T> Stream for TracingUnboundedReceiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let s = self.get_mut();
match Pin::new(&mut s.inner).poll_next(cx) {
Poll::Ready(msg) => {
if msg.is_some() {
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc();
UNBOUNDED_CHANNELS_SIZE
.with_label_values(&[s.name])
.set(s.inner.len().saturated_into());
}
Poll::Ready(msg)
},
Poll::Pending => Poll::Pending,
}
}
}
impl<T> FusedStream for TracingUnboundedReceiver<T> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
#[cfg(test)]
mod tests {
use super::tracing_unbounded;
use async_channel::{self, RecvError, TryRecvError};
#[test]
fn test_tracing_unbounded_receiver_drop() {
let (tracing_unbounded_sender, tracing_unbounded_receiver) =
tracing_unbounded("test-receiver-drop", 10);
let (tx, rx) = async_channel::unbounded::<usize>();
tracing_unbounded_sender.unbounded_send(tx).unwrap();
drop(tracing_unbounded_receiver);
assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
assert_eq!(rx.recv_blocking(), Err(RecvError));
}
}
+121
View File
@@ -0,0 +1,121 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Provides mpsc notification channel that can be instantiated
//! _after_ it's been shared to the consumer and producers entities.
//!
//! Useful when building RPC extensions where, at service definition time, we
//! don't know whether the specific interface where the RPC extension will be
//! exposed is safe or not and we want to lazily build the RPC extension
//! whenever we bind the service to an interface.
//!
//! See [`sc-service::builder::RpcExtensionBuilder`] for more details.
use futures::stream::{FusedStream, Stream};
use std::{
pin::Pin,
task::{Context, Poll},
};
use crate::pubsub::{Hub, Receiver};
mod registry;
use registry::Registry;
#[cfg(test)]
mod tests;
/// Trait used to define the "tracing key" string used to tag
/// and identify the mpsc channels.
pub trait TracingKeyStr {
/// Const `str` representing the "tracing key" used to tag and identify
/// the mpsc channels owned by the object implementing this trait.
const TRACING_KEY: &'static str;
}
/// The receiving half of the notifications channel.
///
/// The [`NotificationStream`] entity stores the [`Hub`] so it can be
/// used to add more subscriptions.
#[derive(Clone)]
pub struct NotificationStream<Payload, TK: TracingKeyStr> {
hub: Hub<Payload, Registry>,
_pd: std::marker::PhantomData<TK>,
}
/// The receiving half of the notifications channel(s).
#[derive(Debug)]
pub struct NotificationReceiver<Payload> {
receiver: Receiver<Payload, Registry>,
}
/// The sending half of the notifications channel(s).
pub struct NotificationSender<Payload> {
hub: Hub<Payload, Registry>,
}
impl<Payload, TK: TracingKeyStr> NotificationStream<Payload, TK> {
/// Creates a new pair of receiver and sender of `Payload` notifications.
pub fn channel() -> (NotificationSender<Payload>, Self) {
let hub = Hub::new(TK::TRACING_KEY);
let sender = NotificationSender { hub: hub.clone() };
let receiver = NotificationStream { hub, _pd: Default::default() };
(sender, receiver)
}
/// Subscribe to a channel through which the generic payload can be received.
pub fn subscribe(&self, queue_size_warning: usize) -> NotificationReceiver<Payload> {
let receiver = self.hub.subscribe((), queue_size_warning);
NotificationReceiver { receiver }
}
}
impl<Payload> NotificationSender<Payload> {
/// Send out a notification to all subscribers that a new payload is available for a
/// block.
pub fn notify<Error>(
&self,
payload: impl FnOnce() -> Result<Payload, Error>,
) -> Result<(), Error>
where
Payload: Clone,
{
self.hub.send(payload)
}
}
impl<Payload> Clone for NotificationSender<Payload> {
fn clone(&self) -> Self {
Self { hub: self.hub.clone() }
}
}
impl<Payload> Unpin for NotificationReceiver<Payload> {}
impl<Payload> Stream for NotificationReceiver<Payload> {
type Item = Payload;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Payload>> {
Pin::new(&mut self.get_mut().receiver).poll_next(cx)
}
}
impl<Payload> FusedStream for NotificationReceiver<Payload> {
fn is_terminated(&self) -> bool {
self.receiver.is_terminated()
}
}
@@ -0,0 +1,62 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use crate::{
id_sequence::SeqID,
pubsub::{Dispatch, Subscribe, Unsubscribe},
};
/// The shared structure to keep track on subscribers.
#[derive(Debug, Default)]
pub(super) struct Registry {
pub(super) subscribers: HashSet<SeqID>,
}
impl Subscribe<()> for Registry {
fn subscribe(&mut self, _subs_key: (), subs_id: SeqID) {
self.subscribers.insert(subs_id);
}
}
impl Unsubscribe for Registry {
fn unsubscribe(&mut self, subs_id: SeqID) {
self.subscribers.remove(&subs_id);
}
}
impl<MakePayload, Payload, Error> Dispatch<MakePayload> for Registry
where
MakePayload: FnOnce() -> Result<Payload, Error>,
Payload: Clone,
{
type Item = Payload;
type Ret = Result<(), Error>;
fn dispatch<F>(&mut self, make_payload: MakePayload, mut dispatch: F) -> Self::Ret
where
F: FnMut(&SeqID, Self::Item),
{
if !self.subscribers.is_empty() {
let payload = make_payload()?;
for subs_id in &self.subscribers {
dispatch(subs_id, payload.clone());
}
}
Ok(())
}
}
@@ -0,0 +1,51 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
use futures::StreamExt;
#[derive(Clone)]
pub struct DummyTracingKey;
impl TracingKeyStr for DummyTracingKey {
const TRACING_KEY: &'static str = "test_notification_stream";
}
type StringStream = NotificationStream<String, DummyTracingKey>;
#[test]
fn notification_channel_simple() {
let (sender, stream) = StringStream::channel();
let test_payload = String::from("test payload");
let closure_payload = test_payload.clone();
// Create a future to receive a single notification
// from the stream and verify its payload.
let future = stream.subscribe(100_000).take(1).for_each(move |payload| {
let test_payload = closure_payload.clone();
async move {
assert_eq!(payload, test_payload);
}
});
// Send notification.
let r: std::result::Result<(), ()> = sender.notify(|| Ok(test_payload));
r.unwrap();
// Run receiver future.
tokio_test::block_on(future);
}
+260
View File
@@ -0,0 +1,260 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Provides means to implement a typical Pub/Sub mechanism.
//!
//! This module provides a type [`Hub`] which can be used both to subscribe,
//! and to send the broadcast messages.
//!
//! The [`Hub`] type is parametrized by two other types:
//! - `Message` — the type of a message that shall be delivered to the subscribers;
//! - `Registry` — implementation of the subscription/dispatch logic.
//!
//! A Registry is implemented by defining the following traits:
//! - [`Subscribe<K>`];
//! - [`Dispatch<M>`];
//! - [`Unsubscribe`].
//!
//! As a result of subscription `Hub::subscribe` method returns an instance of
//! [`Receiver<Message,Registry>`]. That can be used as a [`Stream`] to receive the messages.
//! Upon drop the [`Receiver<Message, Registry>`] shall unregister itself from the `Hub`.
use std::{
collections::HashMap,
pin::Pin,
sync::{Arc, Weak},
task::{Context, Poll},
};
use futures::stream::{FusedStream, Stream};
// use parking_lot::Mutex;
use parking_lot::ReentrantMutex;
use std::cell::RefCell;
use crate::{
id_sequence::SeqID,
mpsc::{TracingUnboundedReceiver, TracingUnboundedSender},
};
#[cfg(test)]
mod tests;
/// Unsubscribe: unregisters a previously created subscription.
pub trait Unsubscribe {
/// Remove all registrations of the subscriber with ID `subs_id`.
fn unsubscribe(&mut self, subs_id: SeqID);
}
/// Subscribe using a key of type `K`
pub trait Subscribe<K> {
/// Register subscriber with the ID `subs_id` as having interest to the key `K`.
fn subscribe(&mut self, subs_key: K, subs_id: SeqID);
}
/// Dispatch a message of type `M`.
pub trait Dispatch<M> {
/// The type of the that shall be sent through the channel as a result of such dispatch.
type Item;
/// The type returned by the `dispatch`-method.
type Ret;
/// Dispatch the message of type `M`.
///
/// The implementation is given an instance of `M` and is supposed to invoke `dispatch` for
/// each matching subscriber, with an argument of type `Self::Item` matching that subscriber.
///
/// Note that this does not have to be of the same type with the item that will be sent through
/// to the subscribers. The subscribers will receive a message of type `Self::Item`.
fn dispatch<F>(&mut self, message: M, dispatch: F) -> Self::Ret
where
F: FnMut(&SeqID, Self::Item);
}
/// A subscription hub.
///
/// Does the subscription and dispatch.
/// The exact subscription and routing behaviour is to be implemented by the Registry (of type `R`).
/// The Hub under the hood uses the channel defined in `crate::mpsc` module.
#[derive(Debug)]
pub struct Hub<M, R> {
tracing_key: &'static str,
shared: Arc<ReentrantMutex<RefCell<Shared<M, R>>>>,
}
/// The receiving side of the subscription.
///
/// The messages are delivered as items of a [`Stream`].
/// Upon drop this receiver unsubscribes itself from the [`Hub<M, R>`].
#[derive(Debug)]
pub struct Receiver<M, R>
where
R: Unsubscribe,
{
rx: TracingUnboundedReceiver<M>,
shared: Weak<ReentrantMutex<RefCell<Shared<M, R>>>>,
subs_id: SeqID,
}
#[derive(Debug)]
struct Shared<M, R> {
id_sequence: crate::id_sequence::IDSequence,
registry: R,
sinks: HashMap<SeqID, TracingUnboundedSender<M>>,
}
impl<M, R> Hub<M, R>
where
R: Unsubscribe,
{
/// Provide access to the registry (for test purposes).
pub fn map_registry_for_tests<MapF, Ret>(&self, map: MapF) -> Ret
where
MapF: FnOnce(&R) -> Ret,
{
let shared_locked = self.shared.lock();
let shared_borrowed = shared_locked.borrow();
map(&shared_borrowed.registry)
}
}
impl<M, R> Drop for Receiver<M, R>
where
R: Unsubscribe,
{
fn drop(&mut self) {
if let Some(shared) = self.shared.upgrade() {
shared.lock().borrow_mut().unsubscribe(self.subs_id);
}
}
}
impl<M, R> Hub<M, R> {
/// Create a new instance of Hub (with default value for the Registry).
pub fn new(tracing_key: &'static str) -> Self
where
R: Default,
{
Self::new_with_registry(tracing_key, Default::default())
}
/// Create a new instance of Hub over the initialized Registry.
pub fn new_with_registry(tracing_key: &'static str, registry: R) -> Self {
let shared =
Shared { registry, sinks: Default::default(), id_sequence: Default::default() };
let shared = Arc::new(ReentrantMutex::new(RefCell::new(shared)));
Self { tracing_key, shared }
}
/// Subscribe to this Hub using the `subs_key: K`.
///
/// A subscription with a key `K` is possible if the Registry implements `Subscribe<K>`.
pub fn subscribe<K>(&self, subs_key: K, queue_size_warning: usize) -> Receiver<M, R>
where
R: Subscribe<K> + Unsubscribe,
{
let shared_locked = self.shared.lock();
let mut shared_borrowed = shared_locked.borrow_mut();
let subs_id = shared_borrowed.id_sequence.next_id();
// The order (registry.subscribe then sinks.insert) is important here:
// assuming that `Subscribe<K>::subscribe` can panic, it is better to at least
// have the sink disposed.
shared_borrowed.registry.subscribe(subs_key, subs_id);
let (tx, rx) = crate::mpsc::tracing_unbounded(self.tracing_key, queue_size_warning);
assert!(shared_borrowed.sinks.insert(subs_id, tx).is_none(), "Used IDSequence to create another ID. Should be unique until u64 is overflowed. Should be unique.");
Receiver { shared: Arc::downgrade(&self.shared), subs_id, rx }
}
/// Send the message produced with `Trigger`.
///
/// This is possible if the registry implements `Dispatch<Trigger, Item = M>`.
pub fn send<Trigger>(&self, trigger: Trigger) -> <R as Dispatch<Trigger>>::Ret
where
R: Dispatch<Trigger, Item = M>,
{
let shared_locked = self.shared.lock();
let mut shared_borrowed = shared_locked.borrow_mut();
let (registry, sinks) = shared_borrowed.get_mut();
registry.dispatch(trigger, |subs_id, item| {
if let Some(tx) = sinks.get_mut(subs_id) {
if let Err(send_err) = tx.unbounded_send(item) {
log::warn!("Sink with SubsID = {} failed to perform unbounded_send: {} ({} as Dispatch<{}, Item = {}>::dispatch(...))", subs_id, send_err, std::any::type_name::<R>(),
std::any::type_name::<Trigger>(),
std::any::type_name::<M>());
}
} else {
log::warn!(
"No Sink for SubsID = {} ({} as Dispatch<{}, Item = {}>::dispatch(...))",
subs_id,
std::any::type_name::<R>(),
std::any::type_name::<Trigger>(),
std::any::type_name::<M>(),
);
}
})
}
}
impl<M, R> Shared<M, R> {
fn get_mut(&mut self) -> (&mut R, &mut HashMap<SeqID, TracingUnboundedSender<M>>) {
(&mut self.registry, &mut self.sinks)
}
fn unsubscribe(&mut self, subs_id: SeqID)
where
R: Unsubscribe,
{
// The order (sinks.remove then registry.unsubscribe) is important here:
// assuming that `Unsubscribe::unsubscribe` can panic, it is better to at least
// have the sink disposed.
self.sinks.remove(&subs_id);
self.registry.unsubscribe(subs_id);
}
}
impl<M, R> Clone for Hub<M, R> {
fn clone(&self) -> Self {
Self { tracing_key: self.tracing_key, shared: self.shared.clone() }
}
}
impl<M, R> Unpin for Receiver<M, R> where R: Unsubscribe {}
impl<M, R> Stream for Receiver<M, R>
where
R: Unsubscribe,
{
type Item = M;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().rx).poll_next(cx)
}
}
impl<Ch, R> FusedStream for Receiver<Ch, R>
where
R: Unsubscribe,
{
fn is_terminated(&self) -> bool {
self.rx.is_terminated()
}
}
+122
View File
@@ -0,0 +1,122 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::StreamExt;
use tokio_test::block_on;
use super::*;
mod normal_operation;
mod panicking_registry;
const TK: &str = "a_tracing_key";
type Message = u64;
type TestHub = Hub<Message, Registry>;
type TestReceiver = Receiver<Message, Registry>;
#[derive(Default)]
struct Registry {
subscribers: HashMap<SeqID, SubsKey>,
}
struct SubsKey {
_receiver: Option<TestReceiver>,
panic: SubsKeyPanic,
}
impl SubsKey {
fn new() -> Self {
Self { _receiver: None, panic: SubsKeyPanic::None }
}
fn with_receiver(self, receiver: TestReceiver) -> Self {
Self { _receiver: Some(receiver), ..self }
}
fn with_panic(self, panic: SubsKeyPanic) -> Self {
Self { panic, ..self }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SubsKeyPanic {
None,
OnSubscribePanicBefore,
OnSubscribePanicAfter,
OnUnsubscribePanicBefore,
OnUnsubscribePanicAfter,
OnDispatchPanicBefore,
OnDispatchPanicAfter,
}
impl<M> Hub<M, Registry> {
fn subs_count(&self) -> usize {
self.map_registry_for_tests(|r| r.subscribers.len())
}
fn sink_count(&self) -> usize {
self.shared.lock().borrow().sinks.len()
}
}
impl Subscribe<SubsKey> for Registry {
fn subscribe(&mut self, subs_key: SubsKey, subs_id: SeqID) {
let sk_panic = subs_key.panic;
if sk_panic == SubsKeyPanic::OnSubscribePanicBefore {
panic!("on-subscribe-panic-before")
}
self.subscribers.insert(subs_id, subs_key);
if sk_panic == SubsKeyPanic::OnSubscribePanicAfter {
panic!("on-subscribe-panic-after")
}
}
}
impl Unsubscribe for Registry {
fn unsubscribe(&mut self, subs_id: SeqID) {
let sk_panic =
self.subscribers.get(&subs_id).map(|sk| sk.panic).unwrap_or(SubsKeyPanic::None);
if sk_panic == SubsKeyPanic::OnUnsubscribePanicBefore {
panic!("on-unsubscribe-panic-before")
}
self.subscribers.remove(&subs_id);
if sk_panic == SubsKeyPanic::OnUnsubscribePanicAfter {
panic!("on-unsubscribe-panic-after")
}
}
}
impl Dispatch<Message> for Registry {
type Item = Message;
type Ret = ();
fn dispatch<F>(&mut self, message: Message, mut dispatch: F) -> Self::Ret
where
F: FnMut(&SeqID, Self::Item),
{
self.subscribers.iter().for_each(|(id, subs_key)| {
if subs_key.panic == SubsKeyPanic::OnDispatchPanicBefore {
panic!("on-dispatch-panic-before")
}
dispatch(id, message);
if subs_key.panic == SubsKeyPanic::OnDispatchPanicAfter {
panic!("on-dispatch-panic-after")
}
});
}
}
@@ -0,0 +1,86 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
#[test]
fn positive_rx_receives_relevant_messages_and_terminates_upon_hub_drop() {
block_on(async {
let hub = TestHub::new(TK);
assert_eq!(hub.subs_count(), 0);
// No subscribers yet. That message is not supposed to get to anyone.
hub.send(0);
let mut rx_01 = hub.subscribe(SubsKey::new(), 100_000);
assert_eq!(hub.subs_count(), 1);
// That message is sent after subscription. Should be delivered into rx_01.
hub.send(1);
assert_eq!(Some(1), rx_01.next().await);
// Hub is disposed. The rx_01 should be over after that.
std::mem::drop(hub);
assert!(rx_01.is_terminated());
assert_eq!(None, rx_01.next().await);
});
}
#[test]
fn positive_subs_count_is_correct_upon_drop_of_rxs() {
block_on(async {
let hub = TestHub::new(TK);
assert_eq!(hub.subs_count(), 0);
let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
assert_eq!(hub.subs_count(), 1);
let rx_02 = hub.subscribe(SubsKey::new(), 100_000);
assert_eq!(hub.subs_count(), 2);
std::mem::drop(rx_01);
assert_eq!(hub.subs_count(), 1);
std::mem::drop(rx_02);
assert_eq!(hub.subs_count(), 0);
});
}
#[test]
fn positive_subs_count_is_correct_upon_drop_of_rxs_on_cloned_hubs() {
block_on(async {
let hub_01 = TestHub::new(TK);
let hub_02 = hub_01.clone();
assert_eq!(hub_01.subs_count(), 0);
assert_eq!(hub_02.subs_count(), 0);
let rx_01 = hub_02.subscribe(SubsKey::new(), 100_000);
assert_eq!(hub_01.subs_count(), 1);
assert_eq!(hub_02.subs_count(), 1);
let rx_02 = hub_02.subscribe(SubsKey::new(), 100_000);
assert_eq!(hub_01.subs_count(), 2);
assert_eq!(hub_02.subs_count(), 2);
std::mem::drop(rx_01);
assert_eq!(hub_01.subs_count(), 1);
assert_eq!(hub_02.subs_count(), 1);
std::mem::drop(rx_02);
assert_eq!(hub_01.subs_count(), 0);
assert_eq!(hub_02.subs_count(), 0);
});
}
@@ -0,0 +1,248 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
use std::panic::{catch_unwind, AssertUnwindSafe};
fn assert_hub_props(hub: &TestHub, sinks_count: usize, subs_count: usize) {
assert_eq!(hub.sink_count(), sinks_count);
assert_eq!(hub.subs_count(), subs_count);
}
#[test]
fn t01() {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
assert_hub_props(&hub, 1, 1);
std::mem::drop(rx_01);
assert_hub_props(&hub, 0, 0);
}
#[test]
fn t02() {
block_on(async {
// Create a Hub
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
// Subscribe rx-01
let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
assert_hub_props(&hub, 1, 1);
// Subscribe rx-02 so that its unsubscription will lead to an attempt to drop rx-01 in the
// middle of unsubscription of rx-02
let rx_02 = hub.subscribe(SubsKey::new().with_receiver(rx_01), 100_000);
assert_hub_props(&hub, 2, 2);
// Subscribe rx-03 in order to see that it will receive messages after the unclean
// unsubscription
let mut rx_03 = hub.subscribe(SubsKey::new(), 100_000);
assert_hub_props(&hub, 3, 3);
// drop rx-02 leads to an attempt to unsubscribe rx-01
assert!(catch_unwind(AssertUnwindSafe(move || {
std::mem::drop(rx_02);
}))
.is_err());
// One of the rxes could not unsubscribe
assert_hub_props(&hub, 2, 2);
// Subscribe rx-04 in order to see that it will receive messages after the unclean
// unsubscription
let mut rx_04 = hub.subscribe(SubsKey::new(), 100_000);
assert_hub_props(&hub, 3, 3);
hub.send(2);
// The messages are still received
assert_eq!(rx_03.next().await, Some(2));
assert_eq!(rx_04.next().await, Some(2));
// Perform a clean unsubscription
std::mem::drop(rx_04);
hub.send(3);
// The messages are still received
assert_eq!(rx_03.next().await, Some(3));
std::mem::drop(rx_03);
hub.send(4);
// The stuck subscription is still there
assert_hub_props(&hub, 1, 1);
});
}
async fn add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(hub: &TestHub) {
let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
let rx_02 = hub.subscribe(SubsKey::new(), 100_000);
hub.send(1);
hub.send(2);
hub.send(3);
assert_eq!(rx_01.take(3).collect::<Vec<_>>().await, vec![1, 2, 3]);
hub.send(4);
hub.send(5);
hub.send(6);
assert_eq!(rx_02.take(6).collect::<Vec<_>>().await, vec![1, 2, 3, 4, 5, 6]);
}
#[test]
fn t03() {
block_on(async {
// Create a Hub
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
assert!(catch_unwind(AssertUnwindSafe(|| hub
.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicBefore), 100_000)))
.is_err());
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
});
}
#[test]
fn t04() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
assert!(catch_unwind(AssertUnwindSafe(|| hub
.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicAfter), 100_000)))
.is_err());
// the registry has panicked after it has added a subs-id into its internal storage — the
// sinks do not leak, although the subscriptions storage contains some garbage
assert_hub_props(&hub, 0, 1);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 1);
})
}
#[test]
fn t05() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
let rx_01 = hub
.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicBefore), 100_000);
assert_hub_props(&hub, 1, 1);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 1, 1);
assert!(catch_unwind(AssertUnwindSafe(move || std::mem::drop(rx_01))).is_err());
// the registry has panicked on-unsubscribe before it removed the subs-id from its internal
// storage — the sinks do not leak, although the subscriptions storage contains some garbage
assert_hub_props(&hub, 0, 1);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 1);
})
}
#[test]
fn t06() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
let rx_01 = hub
.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicAfter), 100_000);
assert_hub_props(&hub, 1, 1);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 1, 1);
assert!(catch_unwind(AssertUnwindSafe(move || std::mem::drop(rx_01))).is_err());
// the registry has panicked on-unsubscribe after it removed the subs-id from its internal
// storage — the sinks do not leak, the subscriptions storage does not contain any garbage
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
})
}
#[test]
fn t07() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
let rx_01 =
hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicBefore), 100_000);
assert_hub_props(&hub, 1, 1);
assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err());
assert_hub_props(&hub, 1, 1);
std::mem::drop(rx_01);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
})
}
#[test]
fn t08() {
block_on(async {
let hub = TestHub::new(TK);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
let rx_01 =
hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicAfter), 100_000);
assert_hub_props(&hub, 1, 1);
assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err());
assert_hub_props(&hub, 1, 1);
std::mem::drop(rx_01);
assert_hub_props(&hub, 0, 0);
add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
assert_hub_props(&hub, 0, 0);
})
}
+217
View File
@@ -0,0 +1,217 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use futures::{lock::Mutex, prelude::*};
use futures_timer::Delay;
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
/// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the
/// period elapses, we push an element on the sender.
///
/// Senders are removed only when they are closed.
pub struct StatusSinks<T> {
/// Should only be locked by `next`.
inner: Mutex<Inner<T>>,
/// Sending side of `Inner::entries_rx`.
entries_tx: TracingUnboundedSender<YieldAfter<T>>,
}
struct Inner<T> {
/// The actual entries of the list.
entries: stream::FuturesUnordered<YieldAfter<T>>,
/// Receives new entries and puts them in `entries`.
entries_rx: TracingUnboundedReceiver<YieldAfter<T>>,
}
struct YieldAfter<T> {
delay: Delay,
interval: Duration,
sender: Option<TracingUnboundedSender<T>>,
}
impl<T> Default for StatusSinks<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> StatusSinks<T> {
/// Builds a new empty collection.
pub fn new() -> StatusSinks<T> {
let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries", 100_000);
StatusSinks {
inner: Mutex::new(Inner { entries: stream::FuturesUnordered::new(), entries_rx }),
entries_tx,
}
}
/// Adds a sender to the collection.
///
/// The `interval` is the time period between two pushes on the sender.
pub fn push(&self, interval: Duration, sender: TracingUnboundedSender<T>) {
let _ = self.entries_tx.unbounded_send(YieldAfter {
delay: Delay::new(interval),
interval,
sender: Some(sender),
});
}
/// Waits until one of the sinks is ready, then returns an object that can be used to send
/// an element on said sink.
///
/// If the object isn't used to send an element, the slot is skipped.
pub async fn next(&self) -> ReadySinkEvent<'_, T> {
// This is only ever locked by `next`, which means that one `next` at a time can run.
let mut inner = self.inner.lock().await;
let inner = &mut *inner;
loop {
// Future that produces the next ready entry in `entries`, or doesn't produce anything
// if the list is empty.
let next_ready_entry = {
let entries = &mut inner.entries;
async move {
if let Some(v) = entries.next().await {
v
} else {
loop {
futures::pending!()
}
}
}
};
futures::select! {
new_entry = inner.entries_rx.next() => {
if let Some(new_entry) = new_entry {
inner.entries.push(new_entry);
}
},
(sender, interval) = next_ready_entry.fuse() => {
return ReadySinkEvent {
sinks: self,
sender: Some(sender),
interval,
}
}
}
}
}
}
/// One of the sinks is ready.
#[must_use]
pub struct ReadySinkEvent<'a, T> {
sinks: &'a StatusSinks<T>,
sender: Option<TracingUnboundedSender<T>>,
interval: Duration,
}
impl<'a, T> ReadySinkEvent<'a, T> {
/// Sends an element on the sender.
pub fn send(mut self, element: T) {
if let Some(sender) = self.sender.take() {
if sender.unbounded_send(element).is_ok() {
let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
// Note that since there's a small delay between the moment a task is
// woken up and the moment it is polled, the period is actually not
// `interval` but `interval + <delay>`. We ignore this problem in
// practice.
delay: Delay::new(self.interval),
interval: self.interval,
sender: Some(sender),
});
}
}
}
}
impl<'a, T> Drop for ReadySinkEvent<'a, T> {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
if sender.is_closed() {
return;
}
let _ = self.sinks.entries_tx.unbounded_send(YieldAfter {
delay: Delay::new(self.interval),
interval: self.interval,
sender: Some(sender),
});
}
}
}
impl<T> futures::Future for YieldAfter<T> {
type Output = (TracingUnboundedSender<T>, Duration);
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
match Pin::new(&mut this.delay).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => {
let sender = this
.sender
.take()
.expect("sender is always Some unless the future is finished; qed");
Poll::Ready((sender, this.interval))
},
}
}
}
#[cfg(test)]
mod tests {
use super::StatusSinks;
use crate::mpsc::tracing_unbounded;
use futures::prelude::*;
use std::time::Duration;
#[test]
fn works() {
// We're not testing that the `StatusSink` properly enforces an order in the intervals, as
// this easily causes test failures on busy CPUs.
let status_sinks = StatusSinks::new();
let (tx, rx) = tracing_unbounded("test", 100_000);
status_sinks.push(Duration::from_millis(100), tx);
let mut val_order = 5;
futures::executor::block_on(futures::future::select(
Box::pin(async move {
loop {
let ev = status_sinks.next().await;
val_order += 1;
ev.send(val_order);
}
}),
Box::pin(async {
let items: Vec<i32> = rx.take(3).collect().await;
assert_eq!(items, [6, 7, 8]);
}),
));
}
}