fix race with chainHead single subscription ability

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2022-12-08 10:30:24 +00:00
parent ad6a1f2e68
commit 6c0448158d
2 changed files with 10 additions and 5 deletions
+9 -2
View File
@@ -13,6 +13,7 @@
use codec::Encode;
use futures::StreamExt;
use sp_keyring::AccountKeyring;
use std::time::Duration;
use subxt::{
OnlineClient,
PolkadotConfig,
@@ -31,8 +32,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let genesis = api.rpc().chainhead_genesis_hash().await?;
println!("Genesis: {:?}", genesis);
let mut follow_sub = api.blocks().subscribe_chainhead_finalized(false).await?;
// TODO: spec v2 no longer uses the `pipe_from_stream`, therefore we need some
// time between two sequencial `chainHead_follow` subscription to propagate
// the `chainHead_unfollow` from the first subscription.
tokio::time::sleep(Duration::from_secs(5)).await;
let mut follow_sub = api.blocks().subscribe_chainhead_finalized(true).await?;
println!("Following subscription...");
// Handle all subscriptions from the `chainHead_follow`.
while let Some(block) = follow_sub.next().await {
let block = block?;
@@ -56,7 +63,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let call = block
.call("AccountNonceApi_account_nonce".into(), Some(&call_params))
.await?;
// println!("[hash={:?}] call={:?}", block.hash(), call);
println!("[hash={:?}] call={:?}", block.hash(), call);
}
// Subscribe to the `chainHead_follow` method.
+1 -3
View File
@@ -891,8 +891,7 @@ impl<T: Config> Rpc<T> {
.await?;
println!("Dropping subscription");
// Manually cancel the susbcription
// {"id": 1, "method": "eth_unsubscribe", "params": ["0xcd0c3e8af590364c09d0fa6a1210faf5"]}
drop(sub);
let result: bool = self
.client
@@ -902,7 +901,6 @@ impl<T: Config> Rpc<T> {
)
.await?;
println!("Subscription dropped with result={:?}", result);
drop(sub);
(bytes, event)
};