blocks: Add blocks API for subscribing to the chainHead_follow method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2022-11-25 18:37:53 +00:00
parent a7fcbdb825
commit 684897f096
2 changed files with 79 additions and 1 deletions
+23
View File
@@ -23,6 +23,29 @@ use sp_runtime::traits::{
};
use std::sync::Arc;
/// A representation of a block obtained from the `chainHead_follow` subscription.
pub struct FollowBlock<T: Config, C> {
/// The hash of the block.
hash: T::Hash,
/// The ID of the subscription that produced this block.
subscription_id: String,
/// The client to communicate with the chain.
client: C,
}
impl<T, C> FollowBlock<T, C>
where
T: Config,
C: OfflineClientT<T>,
{
pub(crate) fn new(hash: T::Hash, subscription_id: String, client: C) -> Self {
Self {
hash,
subscription_id,
client,
}
}
}
/// A representation of a block.
pub struct Block<T: Config, C> {
header: T::Header,
+56 -1
View File
@@ -2,13 +2,17 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::Block;
use super::{
block_types::FollowBlock,
Block,
};
use crate::{
client::OnlineClientT,
error::{
BlockError,
Error,
},
rpc::subscription_events::FollowEvent,
utils::PhantomDataSendSync,
Config,
};
@@ -152,6 +156,57 @@ where
)
})
}
/// Subscribe to finalized blocks using `chainHead_follow`.
pub fn subscribe_chainhead_finalized(
&self,
runtime_updates: bool,
) -> impl Future<Output = Result<BlockStream<FollowBlock<T, Client>>, Error>>
+ Send
+ 'static {
let client = self.client.clone();
async move {
let sub = client
.rpc()
.subscribe_chainhead_follow(runtime_updates)
.await?;
let subscription_id = match sub.subscription_id() {
Some(id) => id.clone(),
None => return Err(Error::Other("Subscription without ID".into())),
};
// Flatten the finalized events and map them into a `FollowBlock`.
Ok(sub
.flat_map(move |event| {
let subscription_id = subscription_id.clone();
let client = client.clone();
let event = match event {
Ok(event) => event,
Err(e) => return Either::Left(stream::once(async { Err(e) })),
};
let blocks = match event {
FollowEvent::Finalized(finalized) => {
finalized.finalized_block_hashes
}
_ => vec![],
};
let blocks = blocks.into_iter().map(move |hash| {
Ok(FollowBlock::new(
hash,
subscription_id.clone(),
client.clone(),
))
});
Either::Right(stream::iter(blocks))
})
.boxed())
}
}
}
/// Take a promise that will return a subscription to some block headers,