From b1e55e0f4bf14da9cf8533c86756ca547442daec Mon Sep 17 00:00:00 2001
From: Pavlo Khrystenko
Date: Wed, 29 May 2024 18:24:20 +0200
Subject: [PATCH] use stream of bools to track whether the connection is
stopped/initialized
---
.../backend/unstable/follow_stream_driver.rs | 33 +++++-----
subxt/src/backend/unstable/mod.rs | 62 +++++++++++++++----
2 files changed, 65 insertions(+), 30 deletions(-)
diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs
index aab13d521e..5fc69c986d 100644
--- a/subxt/src/backend/unstable/follow_stream_driver.rs
+++ b/subxt/src/backend/unstable/follow_stream_driver.rs
@@ -76,8 +76,8 @@ impl FollowStreamDriverHandle {
}
/// Returns if Followstream has reconnected
- pub async fn reconnected(&self) {
- self.shared.subscribe(false).reconnected().await;
+ pub fn reconnected(&self) -> impl Stream- {
+ self.shared.subscribe(false).reconnected()
}
}
@@ -142,22 +142,19 @@ impl FollowStreamDriverSubscription {
}
}
- /// Returns if the backend has reconnected
- pub async fn reconnected(self) -> bool {
- let ready_event = self
- .skip_while(|ev| {
- std::future::ready(!matches!(
- ev,
- FollowStreamMsg::Event(FollowEvent::Initialized(_))
- ))
- })
- .next()
- .await;
-
- matches!(
- ready_event,
- Some(FollowStreamMsg::Event(FollowEvent::Initialized(_)))
- )
+ /// Returns if the backend has reconnected/is reconnecting
+ pub fn reconnected(self) -> impl Stream
- {
+ self.filter_map(|ev| {
+ let result = match ev {
+ FollowStreamMsg::Ready(_) => None,
+ FollowStreamMsg::Event(ev) => match ev {
+ FollowEvent::Initialized(_) => Some(true),
+ FollowEvent::Stop => Some(false),
+ _ => None,
+ },
+ };
+ std::future::ready(result)
+ })
}
/// Subscribe to the follow events, ignoring any other messages.
diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs
index f59c11a943..623e6bd395 100644
--- a/subxt/src/backend/unstable/mod.rs
+++ b/subxt/src/backend/unstable/mod.rs
@@ -34,6 +34,7 @@ use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
use futures::future::Either;
use futures::{pin_mut, Future, FutureExt, Stream, StreamExt};
use std::collections::HashMap;
+use std::pin::Pin;
use std::task::Poll;
use storage_items::StorageItems;
@@ -726,21 +727,58 @@ where
F: FnMut() -> T,
T: Future