Return events from blocks skipped over during Finalization, too (#473)

* make subscription stream generic in EventSubscription

* rename to EventSub/FinalizedEventSub

* wip fix some lifetimes so that event sub can depend on client in stream

* Cargo fmt + comment tweaks

* Add another comment

* factor out prev block header fetching into a separate function to tidy

* add a comment

* remove ListOrValue as it's unused

* Into<u128> on BlockNumber to simplify things

* clippy

* Fix an example and clippy

* simplify iterator now we are Into<u128>

* Into<u64> instead because it needs serializing, and test core logic

* Tweak missing block test to fill in >=2 holes

* tweak a comment
This commit is contained in:
James Wilson
2022-03-10 10:24:24 +00:00
committed by GitHub
parent a091d2b756
commit 4144a769d5
9 changed files with 206 additions and 59 deletions
+2 -2
View File
@@ -343,11 +343,11 @@ impl RuntimeGenerator {
::subxt::events::at::<T, Event>(self.client, block_hash).await
}
pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> {
pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub<T::Header>, T, Event>, ::subxt::BasicError> {
::subxt::events::subscribe::<T, Event>(self.client).await
}
pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> {
pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError> {
::subxt::events::subscribe_finalized::<T, Event>(self.client).await
}
}
+1 -1
View File
@@ -41,7 +41,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?
.to_runtime_api::<polkadot::RuntimeApi<DefaultConfig, DefaultExtra<DefaultConfig>>>();
let block_number = 1;
let block_number = 1u32;
let block_hash = api
.client
-1
View File
@@ -22,7 +22,6 @@ futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { version = "0.8.0", features = ["async-client", "client-ws-transport"] }
log = "0.4.14"
num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.124", features = ["derive"] }
serde_json = "1.0.64"
thiserror = "1.0.24"
+2 -1
View File
@@ -46,7 +46,8 @@ pub trait Config: 'static {
+ Default
+ Copy
+ core::hash::Hash
+ core::str::FromStr;
+ core::str::FromStr
+ Into<u64>;
/// The output of the `Hashing` function.
type Hash: Parameter
+128 -25
View File
@@ -24,12 +24,18 @@ use crate::{
use codec::Decode;
use derivative::Derivative;
use futures::{
future::Either,
stream::{
self,
BoxStream,
},
Future,
FutureExt,
Stream,
StreamExt,
};
use jsonrpsee::core::client::Subscription;
use sp_runtime::traits::Header;
use std::{
marker::Unpin,
task::Poll,
@@ -51,12 +57,12 @@ pub use super::{
/// [`Events::subscribe_finalized()`] if that is important.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe()` over calling this directly.
/// and is exposed only to be called via the codegen. It may
/// break between minor releases.
#[doc(hidden)]
pub async fn subscribe<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
pub async fn subscribe<'a, T: Config, Evs: Decode + 'static>(
client: &'a Client<T>,
) -> Result<EventSubscription<'a, EventSub<T::Header>, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
}
@@ -64,23 +70,98 @@ pub async fn subscribe<T: Config, Evs: Decode + 'static>(
/// Subscribe to events from finalized blocks.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe_finalized()` over calling this directly.
/// and is exposed only to be called via the codegen. It may
/// break between minor releases.
#[doc(hidden)]
pub async fn subscribe_finalized<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_finalized_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>(
client: &'a Client<T>,
) -> Result<EventSubscription<'a, FinalizedEventSub<'a, T::Header>, T, Evs>, BasicError> {
// fetch the last finalised block details immediately, so that we'll get
// events for each block after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_number = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());
// Fill in any gaps between the block above and the finalized blocks reported.
let block_subscription = subscribe_to_block_headers_filling_in_gaps(
client,
last_finalized_block_number,
client.rpc().subscribe_finalized_blocks().await?,
);
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}
/// Take a subscription that returns block headers, and if any block numbers are missed out
/// betweem the block number provided and what's returned from the subscription, we fill in
/// the gaps and get hold of all intermediate block headers.
///
/// **Note:** This is exposed so that we can run integration tests on it, but otherwise
/// should not be used directly and may break between minor releases.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<'a, S, E, T: Config>(
client: &'a Client<T>,
mut last_block_num: Option<u64>,
sub: S,
) -> impl Stream<Item = Result<T::Header, BasicError>> + Send + 'a
where
S: Stream<Item = Result<T::Header, E>> + Send + 'a,
E: Into<BasicError> + Send + 'static,
{
sub.flat_map(move |s| {
// Get the header, or return a stream containing just the error. Our EventSubscription
// stream will return `None` as soon as it hits an error like this.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};
// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();
// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);
// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
async move {
let hash = client.rpc().block_hash(Some(n.into())).await?;
let header = client.rpc().header(hash).await?;
Ok::<_, BasicError>(header)
}
})
.filter_map(|h| async { h.transpose() });
// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);
// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}
/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type FinalizedEventSub<'a, Header> = BoxStream<'a, Result<Header, BasicError>>;
/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;
/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct EventSubscription<'a, T: Config, Evs: 'static> {
#[derivative(Debug(bound = "Sub: std::fmt::Debug"))]
pub struct EventSubscription<'a, Sub, T: Config, Evs: 'static> {
finished: bool,
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
block_header_subscription: Sub,
#[derivative(Debug = "ignore")]
at: Option<
std::pin::Pin<
@@ -90,11 +171,12 @@ pub struct EventSubscription<'a, T: Config, Evs: 'static> {
_event_type: std::marker::PhantomData<Evs>,
}
impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
fn new(
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
) -> Self {
impl<'a, Sub, T: Config, Evs: Decode, E: Into<BasicError>>
EventSubscription<'a, Sub, T, Evs>
where
Sub: Stream<Item = Result<T::Header, E>> + Unpin + 'a,
{
fn new(client: &'a Client<T>, block_header_subscription: Sub) -> Self {
EventSubscription {
finished: false,
client,
@@ -111,7 +193,10 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
}
}
impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
impl<'a, T: Config, Sub: Unpin, Evs: Decode> Unpin
for EventSubscription<'a, Sub, T, Evs>
{
}
// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose
// way to roughly implement the following function:
@@ -130,7 +215,13 @@ impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
//
// The advantage of this manual implementation is that we have a named type that we (and others)
// can derive things on, store away, alias etc.
impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
impl<'a, Sub, T, Evs, E> Stream for EventSubscription<'a, Sub, T, Evs>
where
T: Config,
Evs: Decode,
Sub: Stream<Item = Result<T::Header, E>> + Unpin + 'a,
E: Into<BasicError>,
{
type Item = Result<Events<'a, T, Evs>, BasicError>;
fn poll_next(
@@ -155,7 +246,6 @@ impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
return Poll::Ready(Some(Err(e.into())))
}
Some(Ok(block_header)) => {
use sp_runtime::traits::Header;
// Note [jsdw]: We may be able to get rid of the per-item allocation
// with https://github.com/oblique/reusable-box-future.
self.at = Some(Box::pin(at(self.client, block_header.hash())));
@@ -181,9 +271,22 @@ mod test {
use super::*;
// Ensure `EventSubscription` can be sent; only actually a compile-time check.
#[test]
#[allow(unused)]
fn check_sendability() {
fn assert_send<T: Send>() {}
assert_send::<EventSubscription<crate::DefaultConfig, ()>>();
assert_send::<
EventSubscription<
EventSub<<crate::DefaultConfig as Config>::Header>,
crate::DefaultConfig,
(),
>,
>();
assert_send::<
EventSubscription<
FinalizedEventSub<<crate::DefaultConfig as Config>::Header>,
crate::DefaultConfig,
(),
>,
>();
}
}
+3
View File
@@ -25,7 +25,10 @@ pub use decoding::EventsDecodingError;
pub use event_subscription::{
subscribe,
subscribe_finalized,
subscribe_to_block_headers_filling_in_gaps,
EventSub,
EventSubscription,
FinalizedEventSub,
};
pub use events_type::{
at,
+16 -27
View File
@@ -96,16 +96,6 @@ pub enum NumberOrHex {
Hex(U256),
}
/// RPC list or value wrapper.
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(untagged)]
pub enum ListOrValue<T> {
/// A list of values of given type.
List(Vec<T>),
/// A single value of given type.
Value(T),
}
/// Alias for the type of a block returned by `chain_getBlock`
pub type ChainBlock<T> =
SignedBlock<Block<<T as Config>::Header, <T as Config>::Extrinsic>>;
@@ -120,11 +110,19 @@ impl From<NumberOrHex> for BlockNumber {
}
}
impl From<u32> for BlockNumber {
fn from(x: u32) -> Self {
NumberOrHex::Number(x.into()).into()
// All unsigned ints can be converted into a BlockNumber:
macro_rules! into_block_number {
($($t: ty)+) => {
$(
impl From<$t> for BlockNumber {
fn from(x: $t) -> Self {
NumberOrHex::Number(x.into()).into()
}
}
)+
}
}
into_block_number!(u8 u16 u32 u64);
/// Arbitrary properties defined in the chain spec as a JSON object.
pub type SystemProperties = serde_json::Map<String, serde_json::Value>;
@@ -285,16 +283,11 @@ impl<T: Config> Rpc<T> {
/// Fetch the genesis hash
pub async fn genesis_hash(&self) -> Result<T::Hash, BasicError> {
let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0)));
let block_zero = 0u32;
let params = rpc_params![block_zero];
let list_or_value: ListOrValue<Option<T::Hash>> =
let genesis_hash: Option<T::Hash> =
self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(genesis_hash) => {
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
}
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
}
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
}
/// Fetch the metadata
@@ -346,13 +339,9 @@ impl<T: Config> Rpc<T> {
&self,
block_number: Option<BlockNumber>,
) -> Result<Option<T::Hash>, BasicError> {
let block_number = block_number.map(ListOrValue::Value);
let params = rpc_params![block_number];
let list_or_value = self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(hash) => Ok(hash),
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
}
let block_hash = self.client.request("chain_getBlockHash", params).await?;
Ok(block_hash)
}
/// Get a block hash of the latest finalized block
+2 -2
View File
@@ -27849,13 +27849,13 @@ pub mod api {
}
pub async fn subscribe(
&self,
) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError>
) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub<T::Header>, T, Event>, ::subxt::BasicError>
{
::subxt::events::subscribe::<T, Event>(self.client).await
}
pub async fn subscribe_finalized(
&self,
) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError>
) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError>
{
::subxt::events::subscribe_finalized::<T, Event>(self.client).await
}
+52
View File
@@ -136,6 +136,58 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> {
Ok(())
}
#[async_std::test]
async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::BasicError> {
// This function is not publically available to use, but contains
// the key logic for filling in missing blocks, so we want to test it.
// This is used in `subscribe_finalized` to ensure no block headers are
// missed.
use subxt::events::subscribe_to_block_headers_filling_in_gaps;
let ctx = test_context().await;
// Manually subscribe to the next 6 finalized block headers, but deliberately
// filter out some in the middle so we get back b _ _ b _ b. This guarantees
// that there will be some gaps, even if there aren't any from the subscription.
let some_finalized_blocks = ctx
.api
.client
.rpc()
.subscribe_finalized_blocks()
.await?
.enumerate()
.take(6)
.filter(|(n, _)| {
let n = *n;
async move { n == 0 || n == 3 || n == 5 }
})
.map(|(_, h)| h);
// This should spot any gaps in the middle and fill them back in.
let all_finalized_blocks = subscribe_to_block_headers_filling_in_gaps(
&ctx.api.client,
None,
some_finalized_blocks,
);
futures::pin_mut!(all_finalized_blocks);
// Iterate the block headers, making sure we get them all in order.
let mut last_block_number = None;
while let Some(header) = all_finalized_blocks.next().await {
let header = header?;
use sp_runtime::traits::Header;
let block_number: u128 = (*header.number()).into();
if let Some(last) = last_block_number {
assert_eq!(last + 1, block_number);
}
last_block_number = Some(block_number);
}
Ok(())
}
// This is just a compile-time check that we can subscribe to events in
// a context that requires the event subscription/filtering to be Send-able.
// We test a typical use of EventSubscription and FilterEvents. We don't need