Implement BlocksClient for working with blocks (#671)

* rpc: Fill in any missing finalized blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Move fill blocks test to RPC location

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* events: Remove the fill in strategy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Introduce blocks client

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client: Enable the block API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Simplify `subscribe_finalized_headers` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Add tests for `subscribe_finalized_headers`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Implement `subscribe_headers`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Add tests for `subscribe_headers`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Move `missing_block_headers_will_be_filled_in` to blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* events: Use the new subscribe to blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Change API to return future similar to events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* events: Use blocks API for subscribing to blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update subxt/src/blocks/blocks_client.rs

Co-authored-by: James Wilson <james@jsdw.me>

* blocks: Simplify docs for `subscribe_finalized_headers`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Use `PhantomDataSendSync` to avoid other bounds on `T: Config`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Add docs for best blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* blocks: Avoid one clone for the `client.rpc()`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update testing/integration-tests/src/blocks/mod.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* blocks: Improve `subscribe_headers` doc

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: James Wilson <james@jsdw.me>
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
Alexandru Vasile
2022-10-10 12:31:54 +03:00
committed by GitHub
parent 10def3c2c4
commit 95e6aa9dda
11 changed files with 278 additions and 137 deletions
+1 -2
View File
@@ -8,7 +8,6 @@ use crate::{
client::OnlineClientT,
error::Error,
events::EventsClient,
rpc::Subscription,
Config,
};
use derivative::Derivative;
@@ -40,7 +39,7 @@ pub type FinalizedEventSub<Header> = BoxStream<'static, Result<Header, Error>>;
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;
pub type EventSub<Item> = BoxStream<'static, Result<Item, Error>>;
/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
+7 -79
View File
@@ -14,17 +14,10 @@ use crate::{
Config,
};
use derivative::Derivative;
use futures::{
future::Either,
stream,
Stream,
StreamExt,
};
use sp_core::{
storage::StorageKey,
twox_128,
};
use sp_runtime::traits::Header;
use std::future::Future;
/// A client for working with events.
@@ -96,7 +89,10 @@ where
) -> impl Future<
Output = Result<EventSubscription<T, Client, EventSub<T::Header>>, Error>,
> + Send
+ 'static {
+ 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
async move { subscribe(client).await }
}
@@ -157,8 +153,8 @@ where
T: Config,
Client: OnlineClientT<T>,
{
let block_subscription = client.rpc().subscribe_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
let block_subscription = client.blocks().subscribe_headers().await?;
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}
/// Subscribe to events from finalized blocks.
@@ -169,78 +165,10 @@ where
T: Config,
Client: OnlineClientT<T>,
{
// fetch the last finalised block details immediately, so that we'll get
// events for each block after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_number = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());
let sub = client.rpc().subscribe_finalized_blocks().await?;
// Fill in any gaps between the block above and the finalized blocks reported.
let block_subscription = subscribe_to_block_headers_filling_in_gaps(
client.clone(),
last_finalized_block_number,
sub,
);
let block_subscription = client.blocks().subscribe_finalized_headers().await?;
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}
/// Note: This is exposed for testing but is not considered stable and may change
/// without notice in a patch release.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<T, Client, S, E>(
client: Client,
mut last_block_num: Option<u64>,
sub: S,
) -> impl Stream<Item = Result<T::Header, Error>> + Send
where
T: Config,
Client: OnlineClientT<T> + Send + Sync,
S: Stream<Item = Result<T::Header, E>> + Send,
E: Into<Error> + Send + 'static,
{
sub.flat_map(move |s| {
let client = client.clone();
// Get the header, or return a stream containing just the error. Our EventSubscription
// stream will return `None` as soon as it hits an error like this.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};
// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();
// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);
// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
let client = client.clone();
async move {
let hash = client.rpc().block_hash(Some(n.into())).await?;
let header = client.rpc().header(hash).await?;
Ok::<_, Error>(header)
}
})
.filter_map(|h| async { h.transpose() });
// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);
// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}
// The storage key needed to access events.
fn system_events_key() -> StorageKey {
let mut storage_key = twox_128(b"System").to_vec();
+1 -4
View File
@@ -16,10 +16,7 @@ pub use event_subscription::{
EventSubscription,
FinalizedEventSub,
};
pub use events_client::{
subscribe_to_block_headers_filling_in_gaps,
EventsClient,
};
pub use events_client::EventsClient;
pub use events_type::{
EventDetails,
Events,