diff --git a/subxt/src/blocks/block_types.rs b/subxt/src/blocks/block_types.rs index 3522991521..00d04ef856 100644 --- a/subxt/src/blocks/block_types.rs +++ b/subxt/src/blocks/block_types.rs @@ -23,6 +23,29 @@ use sp_runtime::traits::{ }; use std::sync::Arc; +/// A representation of a block obtained from the `chainHead_follow` subscription. +pub struct FollowBlock { + /// The hash of the block. + hash: T::Hash, + /// The ID of the subscription that produced this block. + subscription_id: String, + /// The client to communicate with the chain. + client: C, +} + +impl FollowBlock +where + T: Config, + C: OfflineClientT, +{ + pub(crate) fn new(hash: T::Hash, subscription_id: String, client: C) -> Self { + Self { + hash, + subscription_id, + client, + } + } +} /// A representation of a block. pub struct Block { header: T::Header, diff --git a/subxt/src/blocks/blocks_client.rs b/subxt/src/blocks/blocks_client.rs index 9fa82b66cc..91848762e3 100644 --- a/subxt/src/blocks/blocks_client.rs +++ b/subxt/src/blocks/blocks_client.rs @@ -2,13 +2,17 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. -use super::Block; +use super::{ + block_types::FollowBlock, + Block, +}; use crate::{ client::OnlineClientT, error::{ BlockError, Error, }, + rpc::subscription_events::FollowEvent, utils::PhantomDataSendSync, Config, }; @@ -152,6 +156,57 @@ where ) }) } + + /// Subscribe to finalized blocks using `chainHead_follow`. + pub fn subscribe_chainhead_finalized( + &self, + runtime_updates: bool, + ) -> impl Future>, Error>> + + Send + + 'static { + let client = self.client.clone(); + + async move { + let sub = client + .rpc() + .subscribe_chainhead_follow(runtime_updates) + .await?; + + let subscription_id = match sub.subscription_id() { + Some(id) => id.clone(), + None => return Err(Error::Other("Subscription without ID".into())), + }; + + // Flatten the finalized events and map them into a `FollowBlock`. + Ok(sub + .flat_map(move |event| { + let subscription_id = subscription_id.clone(); + let client = client.clone(); + + let event = match event { + Ok(event) => event, + Err(e) => return Either::Left(stream::once(async { Err(e) })), + }; + + let blocks = match event { + FollowEvent::Finalized(finalized) => { + finalized.finalized_block_hashes + } + _ => vec![], + }; + let blocks = blocks.into_iter().map(move |hash| { + Ok(FollowBlock::new( + hash, + subscription_id.clone(), + client.clone(), + )) + }); + + Either::Right(stream::iter(blocks)) + }) + .boxed()) + } + } } /// Take a promise that will return a subscription to some block headers,