use stream of bools to track whether the connection is stopped/initialized

This commit is contained in:
Pavlo Khrystenko
2024-05-29 18:24:20 +02:00
parent ccf9146e84
commit b1e55e0f4b
2 changed files with 65 additions and 30 deletions
@@ -76,8 +76,8 @@ impl<Hash: BlockHash> FollowStreamDriverHandle<Hash> {
}
/// Returns if Followstream has reconnected
pub async fn reconnected(&self) {
self.shared.subscribe(false).reconnected().await;
pub fn reconnected(&self) -> impl Stream<Item = bool> {
self.shared.subscribe(false).reconnected()
}
}
@@ -142,22 +142,19 @@ impl<Hash: BlockHash> FollowStreamDriverSubscription<Hash> {
}
}
/// Returns if the backend has reconnected
pub async fn reconnected(self) -> bool {
let ready_event = self
.skip_while(|ev| {
std::future::ready(!matches!(
ev,
FollowStreamMsg::Event(FollowEvent::Initialized(_))
))
})
.next()
.await;
matches!(
ready_event,
Some(FollowStreamMsg::Event(FollowEvent::Initialized(_)))
)
/// Returns if the backend has reconnected/is reconnecting
pub fn reconnected(self) -> impl Stream<Item = bool> {
self.filter_map(|ev| {
let result = match ev {
FollowStreamMsg::Ready(_) => None,
FollowStreamMsg::Event(ev) => match ev {
FollowEvent::Initialized(_) => Some(true),
FollowEvent::Stop => Some(false),
_ => None,
},
};
std::future::ready(result)
})
}
/// Subscribe to the follow events, ignoring any other messages.
+50 -12
View File
@@ -34,6 +34,7 @@ use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
use futures::future::Either;
use futures::{pin_mut, Future, FutureExt, Stream, StreamExt};
use std::collections::HashMap;
use std::pin::Pin;
use std::task::Poll;
use storage_items::StorageItems;
@@ -726,21 +727,58 @@ where
F: FnMut() -> T,
T: Future<Output = Result<R, Error>>,
{
loop {
let reconnected = follow_handle.reconnected().fuse();
let action = retry(&mut fun).fuse();
let reconnected = follow_handle.reconnected().fuse();
pin_mut!(reconnected);
pin_mut!(reconnected, action);
let result = futures::future::select(reconnected, action).await;
match result {
Either::Left((_, _)) => (),
Either::Right((result, reset)) => {
let is_reconnected = reset.now_or_never().is_some();
if !is_reconnected {
break result;
async fn check_for_reconnect(
mut reconnected: Pin<&mut impl Stream<Item = bool>>,
) -> Result<(), Error> {
loop {
match reconnected.next().await {
Some(true) => {
break;
}
Some(false) => (),
None => {
return Err(RpcError::SubscriptionDropped.into());
}
}
}
Ok(())
}
loop {
let action = retry(&mut fun).fuse();
pin_mut!(action);
let result = futures::future::select(reconnected.next(), action).await;
match result {
// We reconnected and received FollowEvent::Initialized()
Either::Left((Some(has_reconnected), _)) if has_reconnected => {}
Either::Left((Some(_), _)) => {
// Wait until we see Initialized Event
check_for_reconnect(reconnected.as_mut()).await?
}
Either::Right((result, reset)) => {
let is_reconnected = reset.now_or_never();
if is_reconnected.is_none() {
return result;
}
let is_reconnected = is_reconnected.flatten();
if let Some(has_reconnected) = is_reconnected {
// Wait until we see Initialized Event
if !has_reconnected {
check_for_reconnect(reconnected.as_mut()).await?
}
} else {
break;
}
}
Either::Left((None, _)) => {
break;
}
}
}
Err(RpcError::SubscriptionDropped.into())
}