// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Parity Bridges Common. // Parity Bridges Common is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Bridges Common is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . use crate::error::Result as ClientResult; use async_std::{ channel::{bounded, Receiver, Sender}, stream::StreamExt, }; use futures::{FutureExt, Stream}; use pezsp_runtime::DeserializeOwned; use std::{ fmt::Debug, pin::Pin, result::Result as StdResult, task::{Context, Poll}, }; /// Once channel reaches this capacity, the subscription breaks. const CHANNEL_CAPACITY: usize = 128; /// Structure describing a stream. #[derive(Clone)] pub struct StreamDescription { stream_name: String, chain_name: String, } impl StreamDescription { /// Create a new instance of `StreamDescription`. pub fn new(stream_name: String, chain_name: String) -> Self { Self { stream_name, chain_name } } /// Get a stream description. fn get(&self) -> String { format!("{} stream of {}", self.stream_name, self.chain_name) } } /// Chainable stream that transforms items of type `Result` to items of type `T`. /// /// If it encounters an item of type `Err`, it returns `Poll::Ready(None)` /// and terminates the underlying stream. struct Unwrap>, T, E> { desc: StreamDescription, stream: Option, } impl>, T, E> Unwrap { /// Create a new instance of `Unwrap`. pub fn new(desc: StreamDescription, stream: S) -> Self { Self { desc, stream: Some(stream) } } } impl> + Unpin, T: DeserializeOwned, E: Debug> Stream for Unwrap { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Poll::Ready(match self.stream.as_mut() { Some(subscription) => match futures::ready!(Pin::new(subscription).poll_next(cx)) { Some(Ok(item)) => Some(item), Some(Err(e)) => { self.stream.take(); tracing::debug!( target: "bridge", error=?e, desc=%self.desc.get(), "Returned with error. It may need to be restarted" ); None }, None => { self.stream.take(); tracing::debug!( target: "bridge", desc=%self.desc.get(), "Returned `None`. It may need to be restarted" ); None }, }, None => None, }) } } /// Subscription factory that produces subscriptions, sharing the same background thread. #[derive(Clone)] pub struct SubscriptionBroadcaster { desc: StreamDescription, subscribers_sender: Sender>, } impl SubscriptionBroadcaster { /// Create new subscription factory. pub fn new(subscription: Subscription) -> StdResult> { // It doesn't make sense to further broadcast a broadcasted subscription. if subscription.is_broadcasted { return Err(subscription); } let desc = subscription.desc().clone(); let (subscribers_sender, subscribers_receiver) = bounded(CHANNEL_CAPACITY); async_std::task::spawn(background_worker(subscription, subscribers_receiver)); Ok(Self { desc, subscribers_sender }) } /// Produce new subscription. pub async fn subscribe(&self) -> ClientResult> { let (items_sender, items_receiver) = bounded(CHANNEL_CAPACITY); self.subscribers_sender.try_send(items_sender)?; Ok(Subscription::new_broadcasted(self.desc.clone(), items_receiver)) } } /// Subscription to some chain events. pub struct Subscription { desc: StreamDescription, subscription: Box + Unpin + Send>, is_broadcasted: bool, } impl Subscription { /// Create new forwarded subscription. pub fn new_forwarded( desc: StreamDescription, subscription: impl Stream> + Unpin + Send + 'static, ) -> Self { Self { desc: desc.clone(), subscription: Box::new(Unwrap::new(desc, subscription)), is_broadcasted: false, } } /// Create new broadcasted subscription. pub fn new_broadcasted( desc: StreamDescription, subscription: impl Stream + Unpin + Send + 'static, ) -> Self { Self { desc, subscription: Box::new(subscription), is_broadcasted: true } } /// Get the description of the underlying stream pub fn desc(&self) -> &StreamDescription { &self.desc } } impl Stream for Subscription { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Poll::Ready(futures::ready!(Pin::new(&mut self.subscription).poll_next(cx))) } } /// Background worker that is executed in tokio context as `jsonrpsee` requires. /// /// This task may exit under some circumstances. It'll send the correspondent /// message (`Err` or `None`) to all known listeners. Also, when it stops, all /// subsequent reads and new subscribers will get the connection error (`ChannelError`). async fn background_worker( mut subscription: Subscription, mut subscribers_receiver: Receiver>, ) { fn log_task_exit(desc: &StreamDescription, reason: &str) { tracing::debug!( target: "bridge", desc=%desc.get(), %reason, "Background task of subscription broadcaster has stopped" ); } // wait for first subscriber until actually starting subscription let subscriber = match subscribers_receiver.next().await { Some(subscriber) => subscriber, None => { // it means that the last subscriber/factory has been dropped, so we need to // exit too return log_task_exit(subscription.desc(), "client has stopped"); }, }; // actually subscribe let mut subscribers = vec![subscriber]; // start listening for new items and receivers loop { futures::select! { subscriber = subscribers_receiver.next().fuse() => { match subscriber { Some(subscriber) => subscribers.push(subscriber), None => { // it means that the last subscriber/factory has been dropped, so we need to // exit too return log_task_exit(subscription.desc(), "client has stopped") }, } }, maybe_item = subscription.subscription.next().fuse() => { match maybe_item { Some(item) => { // notify subscribers subscribers.retain(|subscriber| { let send_result = subscriber.try_send(item.clone()); send_result.is_ok() }); } None => { // The underlying client has dropped, so we can't do anything here // and need to stop the task. return log_task_exit(subscription.desc(), "stream has finished"); } } }, } } }