diff --git a/subxt/src/blocks/block_types.rs b/subxt/src/blocks/block_types.rs index a6cbce1e05..99b6e6eb90 100644 --- a/subxt/src/blocks/block_types.rs +++ b/subxt/src/blocks/block_types.rs @@ -310,6 +310,7 @@ where let event = event?; let bytes = Vec::::try_from(event)?; + return Ok(bytes) } Err(ChainHeadError::Other( diff --git a/subxt/src/blocks/blocks_client.rs b/subxt/src/blocks/blocks_client.rs index 9fa82b66cc..b6fbb81c12 100644 --- a/subxt/src/blocks/blocks_client.rs +++ b/subxt/src/blocks/blocks_client.rs @@ -2,13 +2,17 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. -use super::Block; +use super::{ + block_types::ChainHeadBlock, + Block, +}; use crate::{ client::OnlineClientT, error::{ BlockError, Error, }, + rpc::types::FollowEvent, utils::PhantomDataSendSync, Config, }; @@ -152,6 +156,57 @@ where ) }) } + + /// Subscribe to finalized blocks using `chainHead_follow`. + pub fn subscribe_chainhead_finalized( + &self, + runtime_updates: bool, + ) -> impl Future>, Error>> + + Send + + 'static { + let client = self.client.clone(); + + async move { + let sub = client + .rpc() + .subscribe_chainhead_follow(runtime_updates) + .await?; + + let subscription_id = match sub.subscription_id() { + Some(id) => id.clone(), + None => return Err(Error::Other("Subscription without ID".into())), + }; + + // Flatten the finalized events and map them into a `ChainHeadBlock`. + Ok(sub + .flat_map(move |event| { + let subscription_id = subscription_id.clone(); + let client = client.clone(); + + let event = match event { + Ok(event) => event, + Err(e) => return Either::Left(stream::once(async { Err(e) })), + }; + + let blocks = match event { + FollowEvent::Finalized(finalized) => { + finalized.finalized_block_hashes + } + _ => vec![], + }; + let blocks = blocks.into_iter().map(move |hash| { + Ok(ChainHeadBlock::new( + hash, + subscription_id.clone(), + client.clone(), + )) + }); + + Either::Right(stream::iter(blocks)) + }) + .boxed()) + } + } } /// Take a promise that will return a subscription to some block headers,