blocks: Expose the ChainHeadBlock via block subscription

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2023-01-09 18:41:08 +00:00
parent abc2bd88b5
commit 46e4b8f0fa
2 changed files with 57 additions and 1 deletions
+1
View File
@@ -310,6 +310,7 @@ where
let event = event?;
let bytes = Vec::<u8>::try_from(event)?;
return Ok(bytes)
}
Err(ChainHeadError::Other(
+56 -1
View File
@@ -2,13 +2,17 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::Block;
use super::{
block_types::ChainHeadBlock,
Block,
};
use crate::{
client::OnlineClientT,
error::{
BlockError,
Error,
},
rpc::types::FollowEvent,
utils::PhantomDataSendSync,
Config,
};
@@ -152,6 +156,57 @@ where
)
})
}
/// Subscribe to finalized blocks using `chainHead_follow`.
pub fn subscribe_chainhead_finalized(
&self,
runtime_updates: bool,
) -> impl Future<Output = Result<BlockStream<ChainHeadBlock<T, Client>>, Error>>
+ Send
+ 'static {
let client = self.client.clone();
async move {
let sub = client
.rpc()
.subscribe_chainhead_follow(runtime_updates)
.await?;
let subscription_id = match sub.subscription_id() {
Some(id) => id.clone(),
None => return Err(Error::Other("Subscription without ID".into())),
};
// Flatten the finalized events and map them into a `ChainHeadBlock`.
Ok(sub
.flat_map(move |event| {
let subscription_id = subscription_id.clone();
let client = client.clone();
let event = match event {
Ok(event) => event,
Err(e) => return Either::Left(stream::once(async { Err(e) })),
};
let blocks = match event {
FollowEvent::Finalized(finalized) => {
finalized.finalized_block_hashes
}
_ => vec![],
};
let blocks = blocks.into_iter().map(move |hash| {
Ok(ChainHeadBlock::new(
hash,
subscription_id.clone(),
client.clone(),
))
});
Either::Right(stream::iter(blocks))
})
.boxed())
}
}
}
/// Take a promise that will return a subscription to some block headers,