diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index aab13d521e..5fc69c986d 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -76,8 +76,8 @@ impl FollowStreamDriverHandle { } /// Returns if Followstream has reconnected - pub async fn reconnected(&self) { - self.shared.subscribe(false).reconnected().await; + pub fn reconnected(&self) -> impl Stream { + self.shared.subscribe(false).reconnected() } } @@ -142,22 +142,19 @@ impl FollowStreamDriverSubscription { } } - /// Returns if the backend has reconnected - pub async fn reconnected(self) -> bool { - let ready_event = self - .skip_while(|ev| { - std::future::ready(!matches!( - ev, - FollowStreamMsg::Event(FollowEvent::Initialized(_)) - )) - }) - .next() - .await; - - matches!( - ready_event, - Some(FollowStreamMsg::Event(FollowEvent::Initialized(_))) - ) + /// Returns if the backend has reconnected/is reconnecting + pub fn reconnected(self) -> impl Stream { + self.filter_map(|ev| { + let result = match ev { + FollowStreamMsg::Ready(_) => None, + FollowStreamMsg::Event(ev) => match ev { + FollowEvent::Initialized(_) => Some(true), + FollowEvent::Stop => Some(false), + _ => None, + }, + }; + std::future::ready(result) + }) } /// Subscribe to the follow events, ignoring any other messages. diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index f59c11a943..623e6bd395 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -34,6 +34,7 @@ use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle}; use futures::future::Either; use futures::{pin_mut, Future, FutureExt, Stream, StreamExt}; use std::collections::HashMap; +use std::pin::Pin; use std::task::Poll; use storage_items::StorageItems; @@ -726,21 +727,58 @@ where F: FnMut() -> T, T: Future>, { - loop { - let reconnected = follow_handle.reconnected().fuse(); - let action = retry(&mut fun).fuse(); + let reconnected = follow_handle.reconnected().fuse(); + pin_mut!(reconnected); - pin_mut!(reconnected, action); - - let result = futures::future::select(reconnected, action).await; - match result { - Either::Left((_, _)) => (), - Either::Right((result, reset)) => { - let is_reconnected = reset.now_or_never().is_some(); - if !is_reconnected { - break result; + async fn check_for_reconnect( + mut reconnected: Pin<&mut impl Stream>, + ) -> Result<(), Error> { + loop { + match reconnected.next().await { + Some(true) => { + break; + } + Some(false) => (), + None => { + return Err(RpcError::SubscriptionDropped.into()); } } } + Ok(()) } + + loop { + let action = retry(&mut fun).fuse(); + + pin_mut!(action); + + let result = futures::future::select(reconnected.next(), action).await; + match result { + // We reconnected and received FollowEvent::Initialized() + Either::Left((Some(has_reconnected), _)) if has_reconnected => {} + Either::Left((Some(_), _)) => { + // Wait until we see Initialized Event + check_for_reconnect(reconnected.as_mut()).await? + } + Either::Right((result, reset)) => { + let is_reconnected = reset.now_or_never(); + if is_reconnected.is_none() { + return result; + } + let is_reconnected = is_reconnected.flatten(); + if let Some(has_reconnected) = is_reconnected { + // Wait until we see Initialized Event + if !has_reconnected { + check_for_reconnect(reconnected.as_mut()).await? + } + } else { + break; + } + } + Either::Left((None, _)) => { + break; + } + } + } + Err(RpcError::SubscriptionDropped.into()) }