From 9a587b360df7c7723b66f7bfc3f68b1737b56e8a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 9 Jan 2024 18:27:28 +0200 Subject: [PATCH] subxt: Expose chainHeadFollow on the backend and test order of blocks Signed-off-by: Alexandru Vasile --- subxt/src/backend/legacy/mod.rs | 11 ++ subxt/src/backend/mod.rs | 9 ++ subxt/src/backend/unstable/mod.rs | 15 +++ .../src/full_client/client/unstable_rpcs.rs | 106 +++++++++++++++--- 4 files changed, 128 insertions(+), 13 deletions(-) diff --git a/subxt/src/backend/legacy/mod.rs b/subxt/src/backend/legacy/mod.rs index cffb3f4475..de195caec1 100644 --- a/subxt/src/backend/legacy/mod.rs +++ b/subxt/src/backend/legacy/mod.rs @@ -19,9 +19,13 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; +use crate::backend::unstable::UnstableBlockRef; + // Expose the RPC methods. pub use rpc_methods::LegacyRpcMethods; +use super::unstable::rpc_methods::FollowEvent; + /// The legacy backend. #[derive(Debug, Clone)] pub struct LegacyBackend { @@ -41,6 +45,13 @@ impl super::sealed::Sealed for LegacyBackend {} #[async_trait] impl Backend for LegacyBackend { + /// ChainHead follow + async fn chain_head_follow( + &self, + ) -> Result>>, Error> { + panic!("Unimplemented") + } + async fn storage_fetch_values( &self, keys: Vec>, diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index 3fb9782c53..d023d2d535 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -19,6 +19,10 @@ use futures::{Stream, StreamExt}; use std::pin::Pin; use std::sync::Arc; +use crate::backend::unstable::UnstableBlockRef; + +use self::unstable::rpc_methods::FollowEvent; + /// Prevent the backend trait being implemented externally. #[doc(hidden)] pub(crate) mod sealed { @@ -99,6 +103,11 @@ pub trait Backend: sealed::Sealed + Send + Sync + 'static { call_parameters: Option<&[u8]>, at: T::Hash, ) -> Result, Error>; + + /// ChainHead follow + async fn chain_head_follow( + &self, + ) -> Result>>, Error>; } /// helpeful utility methods derived from those provided on [`Backend`] diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 964b85e1ed..c9b2252ade 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -16,6 +16,8 @@ mod follow_stream_driver; mod follow_stream_unpin; mod storage_items; +pub use follow_stream_unpin::BlockRef as UnstableBlockRef; + pub mod rpc_methods; use self::rpc_methods::{ @@ -25,6 +27,7 @@ use crate::backend::{ rpc::RpcClient, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf, StreamOfResults, TransactionStatus, }; + use crate::config::BlockHash; use crate::error::{Error, RpcError}; use crate::Config; @@ -332,6 +335,18 @@ impl Backend for UnstableBackend { next_ref.ok_or_else(|| RpcError::SubscriptionDropped.into()) } + async fn chain_head_follow( + &self, + ) -> Result>>, Error> { + let stream = self + .follow_handle + .subscribe() + .events() + .map(|event| Ok(event)); + + Ok(StreamOf(Box::pin(stream))) + } + async fn current_runtime_version(&self) -> Result { // Just start a stream of version infos, and return the first value we get from it. let runtime_version = self.stream_runtime_version().await?.next().await; diff --git a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs index 5633518008..1547f7c60d 100644 --- a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs +++ b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs @@ -298,15 +298,14 @@ async fn chainhead_unstable_follow_order_of_blocks() { }; let mut tracked_blocks = HashMap::new(); - - println!("Initialized finalized={:?}", finalized); tracked_blocks.insert(finalized, true); + let mut events = Vec::with_capacity(100); + let mut num_blocks = 0; while let Some(event) = blocks.next().await { let event = event.unwrap(); - - println!("event = {:?}\n", event); + events.push(event.clone()); match event { FollowEvent::Initialized(_) => panic!("Unexpected"), @@ -316,14 +315,14 @@ async fn chainhead_unstable_follow_order_of_blocks() { if tracked_blocks.contains_key(&hash) { panic!( - "NewBlock block={:?} parent={:?} already tracked tracked={:#?}", - hash, parent, tracked_blocks + "NewBlock block={:?} parent={:?} already tracked tracked={:#?}\n events={:#?}", + hash, parent, tracked_blocks, events, ); } if !tracked_blocks.contains_key(&parent) { panic!( - "NewBlock PARENT NOT TRACKED block={:?} parent={:?} tracked={:#?}", - hash, parent, tracked_blocks + "NewBlock PARENT NOT TRACKED block={:?} parent={:?} tracked={:#?}, events={:#?}", + hash, parent, tracked_blocks, events ); } @@ -334,8 +333,8 @@ async fn chainhead_unstable_follow_order_of_blocks() { if !tracked_blocks.contains_key(&hash) { panic!( - "BestBlockChanged not tracked block={:?} tracked={:#?}", - hash, tracked_blocks + "BestBlockChanged not tracked block={:?} tracked={:#?} events={:#?}", + hash, tracked_blocks, events, ); } } @@ -345,8 +344,8 @@ async fn chainhead_unstable_follow_order_of_blocks() { for hash in hashes { if !tracked_blocks.contains_key(&hash) { panic!( - "Finalized block={:?} not tracked tracked={:#?}", - hash, tracked_blocks + "Finalized block={:?} not tracked tracked={:#?} events={:#?}", + hash, tracked_blocks, events, ); } @@ -354,7 +353,88 @@ async fn chainhead_unstable_follow_order_of_blocks() { } num_blocks += 1; - if num_blocks > 10 { + if num_blocks > 40 { + break; + } + } + _ => continue, + } + } +} + +#[tokio::test] +async fn unstable_backend_follow_order_of_blocks() { + let ctx = test_context().await; + + let api = ctx.unstable_client().await; + let backend = api.backend(); + let mut blocks = backend.chain_head_follow().await.unwrap(); + + let event = blocks.next().await.unwrap().unwrap(); + + let finalized = match event { + FollowEvent::Initialized(init) => init.finalized_block_hash, + _ => panic!("Unexpected event"), + }; + + let mut tracked_blocks = HashMap::new(); + + println!("Initialized finalized={:?}", finalized); + tracked_blocks.insert(finalized.hash(), true); + + let mut events = Vec::with_capacity(100); + let mut num_blocks = 0; + while let Some(event) = blocks.next().await { + let event = event.unwrap(); + events.push(event.clone()); + + match event { + FollowEvent::Initialized(_) => panic!("Unexpected"), + FollowEvent::NewBlock(new) => { + let hash = new.block_hash.hash(); + let parent = new.parent_block_hash.hash(); + + if tracked_blocks.contains_key(&hash) { + panic!( + "NewBlock block={:?} parent={:?} already tracked tracked={:#?} events={:#?}", + hash, parent, tracked_blocks, events, + ); + } + if !tracked_blocks.contains_key(&parent) { + panic!( + "NewBlock PARENT NOT TRACKED block={:?} parent={:?} tracked={:#?} events={:#?}", + hash, parent, tracked_blocks, events, + ); + } + + tracked_blocks.insert(hash, false); + } + FollowEvent::BestBlockChanged(best) => { + let hash = best.best_block_hash.hash(); + + if !tracked_blocks.contains_key(&hash) { + panic!( + "BestBlockChanged not tracked block={:?} tracked={:#?} events={:#?}", + hash, tracked_blocks, events, + ); + } + } + FollowEvent::Finalized(fin) => { + let hashes = fin.finalized_block_hashes; + + for hash in hashes { + if !tracked_blocks.contains_key(&hash.hash()) { + panic!( + "Finalized block={:?} not tracked tracked={:#?} events={:#?}", + hash, tracked_blocks, events, + ); + } + + tracked_blocks.insert(hash.hash(), true); + } + + num_blocks += 1; + if num_blocks > 40 { break; } }