Ensures poll order wrt subscription ID's

closes #1567
This commit is contained in:
Pavlo Khrystenko
2024-05-29 14:42:11 +02:00
parent 44517aabfc
commit 93c0029c75
4 changed files with 89 additions and 49 deletions
Generated
+15 -15
View File
@@ -327,7 +327,7 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
name = "artifacts"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"substrate-runner",
]
@@ -1812,7 +1812,7 @@ dependencies = [
[[package]]
name = "generate-custom-metadata"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"frame-metadata 16.0.0",
"parity-scale-codec",
@@ -2309,7 +2309,7 @@ dependencies = [
[[package]]
name = "integration-tests"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"assert_matches",
"cfg_aliases",
@@ -4745,7 +4745,7 @@ dependencies = [
[[package]]
name = "substrate-runner"
version = "0.36.1"
version = "0.37.0"
[[package]]
name = "subtle"
@@ -4755,7 +4755,7 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "subxt"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"assert_matches",
"async-trait",
@@ -4798,7 +4798,7 @@ dependencies = [
[[package]]
name = "subxt-cli"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"clap 4.5.4",
"color-eyre",
@@ -4827,7 +4827,7 @@ dependencies = [
[[package]]
name = "subxt-codegen"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"frame-metadata 16.0.0",
"getrandom",
@@ -4847,7 +4847,7 @@ dependencies = [
[[package]]
name = "subxt-core"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"assert_matches",
"base58",
@@ -4879,7 +4879,7 @@ dependencies = [
[[package]]
name = "subxt-lightclient"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"futures",
"futures-timer",
@@ -4904,7 +4904,7 @@ dependencies = [
[[package]]
name = "subxt-macro"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"darling 0.20.8",
"parity-scale-codec",
@@ -4917,7 +4917,7 @@ dependencies = [
[[package]]
name = "subxt-metadata"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"assert_matches",
"bitvec",
@@ -4931,7 +4931,7 @@ dependencies = [
[[package]]
name = "subxt-signer"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"bip32",
"bip39",
@@ -4958,7 +4958,7 @@ dependencies = [
[[package]]
name = "subxt-test-macro"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"quote",
"syn 2.0.60",
@@ -5021,7 +5021,7 @@ dependencies = [
[[package]]
name = "test-runtime"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"hex",
"impl-serde",
@@ -5446,7 +5446,7 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "ui-tests"
version = "0.36.1"
version = "0.37.0"
dependencies = [
"frame-metadata 16.0.0",
"generate-custom-metadata",
@@ -72,7 +72,12 @@ pub struct FollowStreamDriverHandle<Hash: BlockHash> {
impl<Hash: BlockHash> FollowStreamDriverHandle<Hash> {
/// Subscribe to follow events.
pub fn subscribe(&self) -> FollowStreamDriverSubscription<Hash> {
self.shared.subscribe()
self.shared.subscribe(true)
}
/// Returns if Followstream has reconnected
pub async fn reconnected(&self) {
self.shared.subscribe(false).reconnected().await;
}
}
@@ -137,6 +142,24 @@ impl<Hash: BlockHash> FollowStreamDriverSubscription<Hash> {
}
}
/// Returns if the backend has reconnected
pub async fn reconnected(self) -> bool {
let ready_event = self
.skip_while(|ev| {
std::future::ready(!matches!(
ev,
FollowStreamMsg::Event(FollowEvent::Initialized(_))
))
})
.next()
.await;
matches!(
ready_event,
Some(FollowStreamMsg::Event(FollowEvent::Initialized(_)))
)
}
/// Subscribe to the follow events, ignoring any other messages.
pub fn events(self) -> impl Stream<Item = FollowEvent<BlockRef<Hash>>> + Send + Sync {
self.filter_map(|ev| std::future::ready(ev.into_event()))
@@ -145,7 +168,7 @@ impl<Hash: BlockHash> FollowStreamDriverSubscription<Hash> {
impl<Hash: BlockHash> Clone for FollowStreamDriverSubscription<Hash> {
fn clone(&self) -> Self {
self.shared.subscribe()
self.shared.subscribe(true)
}
}
@@ -330,7 +353,10 @@ impl<Hash: BlockHash> Shared<Hash> {
}
/// Create a new subscription.
pub fn subscribe(&self) -> FollowStreamDriverSubscription<Hash> {
pub fn subscribe(
&self,
insert_subscription_data: bool,
) -> FollowStreamDriverSubscription<Hash> {
let mut shared = self.0.lock().unwrap();
let id = shared.next_id;
@@ -349,16 +375,18 @@ impl<Hash: BlockHash> Shared<Hash> {
// it means the subscription is currently stopped, and we should expect new Ready/Init
// messages anyway once it restarts.
let mut local_items = VecDeque::new();
if let Some(sub_id) = &shared.current_subscription_id {
local_items.push_back(FollowStreamMsg::Ready(sub_id.clone()));
}
if let Some(init_msg) = &shared.current_init_message {
local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized(
init_msg.clone(),
)));
}
for ev in &shared.block_events_for_new_subscriptions {
local_items.push_back(FollowStreamMsg::Event(ev.clone()));
if insert_subscription_data {
if let Some(sub_id) = &shared.current_subscription_id {
local_items.push_back(FollowStreamMsg::Ready(sub_id.clone()));
}
if let Some(init_msg) = &shared.current_init_message {
local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized(
init_msg.clone(),
)));
}
for ev in &shared.block_events_for_new_subscriptions {
local_items.push_back(FollowStreamMsg::Event(ev.clone()));
}
}
drop(shared);
+33 -5
View File
@@ -32,7 +32,7 @@ use crate::Config;
use async_trait::async_trait;
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
use futures::future::Either;
use futures::{Stream, StreamExt};
use futures::{pin_mut, Future, FutureExt, Stream, StreamExt};
use std::collections::HashMap;
use std::task::Poll;
use storage_items::StorageItems;
@@ -302,7 +302,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
}
async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
retry(|| async {
retry_with_reset_on_reconnect(&self.follow_handle, || async {
let sub_id = get_subscription_id(&self.follow_handle).await?;
self.methods.chainhead_v1_header(&sub_id, at).await
})
@@ -310,7 +310,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
}
async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
retry(|| async {
retry_with_reset_on_reconnect(&self.follow_handle, || async {
let sub_id = get_subscription_id(&self.follow_handle).await?;
// Subscribe to the body response and get our operationId back.
@@ -669,9 +669,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
call_parameters: Option<&[u8]>,
at: T::Hash,
) -> Result<Vec<u8>, Error> {
retry(|| async {
retry_with_reset_on_reconnect(&self.follow_handle, || async {
let sub_id = get_subscription_id(&self.follow_handle).await?;
// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let call_parameters = call_parameters.unwrap_or(&[]);
@@ -716,3 +715,32 @@ async fn get_subscription_id<Hash: BlockHash>(
Ok(sub_id)
}
/// A helper to restart calls on subscription reconnect.
async fn retry_with_reset_on_reconnect<Hash, T, F, R>(
follow_handle: &FollowStreamDriverHandle<Hash>,
mut fun: F,
) -> Result<R, Error>
where
Hash: BlockHash,
F: FnMut() -> T,
T: Future<Output = Result<R, Error>>,
{
loop {
let reconnected = follow_handle.reconnected().fuse();
let action = retry(|| fun()).fuse();
pin_mut!(reconnected, action);
let result = futures::future::select(reconnected, action).await;
match result {
Either::Left((_, _)) => (),
Either::Right((result, reset)) => {
let is_reconnected = reset.now_or_never().is_some();
if !is_reconnected {
break result;
}
}
}
}
}
-16
View File
@@ -106,9 +106,6 @@ where
F: FnMut() -> T,
T: Future<Output = Result<R, Error>>,
{
const REJECTED_MAX_RETRIES: usize = 10;
let mut rejected_retries = 0;
loop {
match retry_future().await {
Ok(v) => return Ok(v),
@@ -117,19 +114,6 @@ where
continue;
}
// TODO: https://github.com/paritytech/subxt/issues/1567
// This is a hack because if a reconnection occurs
// the order of pending calls is not guaranteed.
//
// Such that it's possible the a pending future completes
// before `chainHead_follow` is established with fresh
// subscription id.
//
if e.is_rejected() && rejected_retries < REJECTED_MAX_RETRIES {
rejected_retries += 1;
continue;
}
return Err(e);
}
}