Extend the new api.blocks() to be the primary way to subscribe and fetch blocks/extrinsics/events (#691)

* First pass adding functions to get blocks and extrinsics

* cargo fmt and cache block events

* prefix block hash with 0x

* pin streams for better ergonomics and add an example of subscribing to blocks

* remove unused var

* standardise on _all, _best and _finalized for different block header subs

* WIP center subscribing around blocks

* Remove the event filtering/subscribing  stuff

* clippy

* we need tokio, silly clippy

* add extrinsic_index() call

* Update subxt/src/blocks/block_types.rs

Co-authored-by: Andrew Jones <ascjones@gmail.com>

Co-authored-by: Andrew Jones <ascjones@gmail.com>
This commit is contained in:
James Wilson
2022-11-01 16:53:35 +01:00
committed by GitHub
parent 52d4762d13
commit 33a9ec91af
25 changed files with 646 additions and 1279 deletions
-1
View File
@@ -39,7 +39,6 @@ impl Config for MyConfig {
type Address = <SubstrateConfig as Config>::Address;
type Header = <SubstrateConfig as Config>::Header;
type Signature = <SubstrateConfig as Config>::Signature;
type Extrinsic = <SubstrateConfig as Config>::Extrinsic;
// ExtrinsicParams makes use of the index type, so we need to adjust it
// too to align with our modified index type, above:
type ExtrinsicParams = SubstrateExtrinsicParams<Self>;
@@ -29,7 +29,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// For non-finalised blocks use `.subscribe_blocks()`
let mut blocks: Subscription<Header<u32, BlakeTwo256>> =
api.rpc().subscribe_finalized_blocks().await?;
api.rpc().subscribe_finalized_block_headers().await?;
while let Some(Ok(block)) = blocks.next().await {
println!(
@@ -31,8 +31,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a client to use:
let api = OnlineClient::<PolkadotConfig>::new().await?;
// Subscribe to any events that occur:
let mut event_sub = api.events().subscribe().await?;
// Subscribe to (in this case, finalized) blocks.
let mut block_sub = api.blocks().subscribe_finalized().await?;
// While this subscription is active, balance transfers are made somewhere:
tokio::task::spawn({
@@ -58,10 +58,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});
// Our subscription will see the events emitted as a result of this:
while let Some(events) = event_sub.next().await {
let events = events?;
let block_hash = events.block_hash();
// Get each finalized block as it arrives.
while let Some(block) = block_sub.next().await {
let block = block?;
// Ask for the events for this block.
let events = block.events().await?;
let block_hash = block.hash();
// We can dynamically decode events:
println!(" Dynamic event details: {block_hash:?}:");
+64
View File
@@ -0,0 +1,64 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! To run this example, a local polkadot node should be running. Example verified against polkadot 0.9.29-41a9d84b152.
//!
//! E.g.
//! ```bash
//! curl "https://github.com/paritytech/polkadot/releases/download/v0.9.29/polkadot" --output /usr/local/bin/polkadot --location
//! polkadot --dev --tmp
//! ```
use futures::StreamExt;
use subxt::{
OnlineClient,
PolkadotConfig,
};
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")]
pub mod polkadot {}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Create a client to use:
let api = OnlineClient::<PolkadotConfig>::new().await?;
// Subscribe to all finalized blocks:
let mut blocks_sub = api.blocks().subscribe_finalized().await?;
while let Some(block) = blocks_sub.next().await {
let block = block?;
let block_number = block.header().number;
let block_hash = block.hash();
println!("Block #{block_number}:");
println!(" Hash: {block_hash}");
println!(" Extrinsics:");
let body = block.body().await?;
for ext in body.extrinsics() {
let idx = ext.index();
let events = ext.events().await?;
let bytes_hex = format!("0x{}", hex::encode(ext.bytes()));
println!(" Extrinsic #{idx}:");
println!(" Bytes: {bytes_hex}");
println!(" Events:");
for evt in events.iter() {
let evt = evt?;
let pallet_name = evt.pallet_name();
let event_name = evt.variant_name();
println!(" {pallet_name}_{event_name}");
}
}
}
Ok(())
}
-72
View File
@@ -1,72 +0,0 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! To run this example, a local polkadot node should be running. Example verified against polkadot v0.9.28-9ffe6e9e3da.
//!
//! E.g.
//! ```bash
//! curl "https://github.com/paritytech/polkadot/releases/download/v0.9.28/polkadot" --output /usr/local/bin/polkadot --location
//! polkadot --dev --tmp
//! ```
use futures::StreamExt;
use sp_keyring::AccountKeyring;
use std::time::Duration;
use subxt::{
tx::PairSigner,
OnlineClient,
PolkadotConfig,
};
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")]
pub mod polkadot {}
/// Subscribe to all events, and then manually look through them and
/// pluck out the events that we care about.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Create a client to use:
let api = OnlineClient::<PolkadotConfig>::new().await?;
// Subscribe to just balance transfer events, making use of `filter_events`
// to select a single event type (note the 1-tuple) to filter out and return.
let mut transfer_events = api
.events()
.subscribe()
.await?
.filter_events::<(polkadot::balances::events::Transfer,)>();
// While this subscription is active, balance transfers are made somewhere:
tokio::task::spawn({
let api = api.clone();
async move {
let signer = PairSigner::new(AccountKeyring::Alice.pair());
let mut transfer_amount = 1_000_000_000;
// Make small balance transfers from Alice to Bob in a loop:
loop {
let transfer_tx = polkadot::tx().balances().transfer(
AccountKeyring::Bob.to_account_id().into(),
transfer_amount,
);
api.tx()
.sign_and_submit_default(&transfer_tx, &signer)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(10)).await;
transfer_amount += 100_000_000;
}
}
});
// Our subscription will see all of the transfer events emitted as a result of this:
while let Some(transfer_event) = transfer_events.next().await {
println!("Balance transfer event: {transfer_event:?}");
}
Ok(())
}
@@ -1,87 +0,0 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! To run this example, a local polkadot node should be running. Example verified against polkadot v0.9.28-9ffe6e9e3da.
//!
//! E.g.
//! ```bash
//! curl "https://github.com/paritytech/polkadot/releases/download/v0.9.28/polkadot" --output /usr/local/bin/polkadot --location
//! polkadot --dev --tmp
//! ```
use futures::StreamExt;
use sp_keyring::AccountKeyring;
use std::time::Duration;
use subxt::{
tx::PairSigner,
OnlineClient,
PolkadotConfig,
};
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")]
pub mod polkadot {}
/// Subscribe to all events, and then manually look through them and
/// pluck out the events that we care about.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Create a client to use:
let api = OnlineClient::<PolkadotConfig>::new().await?;
// Subscribe to several balance related events. If we ask for more than one event,
// we'll be given a correpsonding tuple of `Option`'s, with exactly one
// variant populated each time.
let mut balance_events = api.events().subscribe().await?.filter_events::<(
polkadot::balances::events::Withdraw,
polkadot::balances::events::Transfer,
polkadot::balances::events::Deposit,
)>();
// While this subscription is active, balance transfers are made somewhere:
tokio::task::spawn({
let api = api.clone();
async move {
let signer = PairSigner::new(AccountKeyring::Alice.pair());
let mut transfer_amount = 1_000_000_000;
// Make small balance transfers from Alice to Bob in a loop:
loop {
let transfer_tx = polkadot::tx().balances().transfer(
AccountKeyring::Bob.to_account_id().into(),
transfer_amount,
);
api.tx()
.sign_and_submit_default(&transfer_tx, &signer)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(10)).await;
transfer_amount += 100_000_000;
}
}
});
// Our subscription will see all of the balance events we're filtering on:
while let Some(ev) = balance_events.next().await {
let event_details = ev?;
let block_hash = event_details.block_hash;
let event = event_details.event;
println!("Event at {:?}:", block_hash);
if let (Some(withdraw), _, _) = &event {
println!(" Withdraw event: {withdraw:?}");
}
if let (_, Some(transfer), _) = &event {
println!(" Transfer event: {transfer:?}");
}
if let (_, _, Some(deposit)) = &event {
println!(" Deposit event: {deposit:?}");
}
}
Ok(())
}
+289
View File
@@ -0,0 +1,289 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::{
client::{
OfflineClientT,
OnlineClientT,
},
error::{
BlockError,
Error,
},
events,
rpc::ChainBlockResponse,
Config,
};
use derivative::Derivative;
use futures::lock::Mutex as AsyncMutex;
use sp_runtime::traits::{
Hash,
Header,
};
use std::sync::Arc;
/// A representation of a block.
pub struct Block<T: Config, C> {
header: T::Header,
client: C,
// Since we obtain the same events for every extrinsic, let's
// cache them so that we only ever do that once:
cached_events: CachedEvents<T>,
}
// A cache for our events so we don't fetch them more than once when
// iterating over events for extrinsics.
type CachedEvents<T> = Arc<AsyncMutex<Option<events::Events<T>>>>;
impl<T, C> Block<T, C>
where
T: Config,
C: OfflineClientT<T>,
{
pub(crate) fn new(header: T::Header, client: C) -> Self {
Block {
header,
client,
cached_events: Default::default(),
}
}
/// Return the block hash.
pub fn hash(&self) -> T::Hash {
self.header.hash()
}
/// Return the block number.
pub fn number(&self) -> T::BlockNumber {
*self.header().number()
}
/// Return the entire block header.
pub fn header(&self) -> &T::Header {
&self.header
}
}
impl<T, C> Block<T, C>
where
T: Config,
C: OnlineClientT<T>,
{
/// Return the events associated with the block, fetching them from the node if necessary.
pub async fn events(&self) -> Result<events::Events<T>, Error> {
get_events(&self.client, self.header.hash(), &self.cached_events).await
}
/// Fetch and return the block body.
pub async fn body(&self) -> Result<BlockBody<T, C>, Error> {
let block_hash = self.header.hash();
let block_details = match self.client.rpc().block(Some(block_hash)).await? {
Some(block) => block,
None => return Err(BlockError::block_hash_not_found(block_hash).into()),
};
Ok(BlockBody::new(
self.client.clone(),
block_details,
self.cached_events.clone(),
))
}
}
/// The body of a block.
pub struct BlockBody<T: Config, C> {
details: ChainBlockResponse<T>,
client: C,
cached_events: CachedEvents<T>,
}
impl<T, C> BlockBody<T, C>
where
T: Config,
C: OfflineClientT<T>,
{
pub(crate) fn new(
client: C,
details: ChainBlockResponse<T>,
cached_events: CachedEvents<T>,
) -> Self {
Self {
details,
client,
cached_events,
}
}
/// Returns an iterator over the extrinsics in the block body.
pub fn extrinsics(&self) -> impl Iterator<Item = Extrinsic<'_, T, C>> {
self.details
.block
.extrinsics
.iter()
.enumerate()
.map(|(idx, e)| {
Extrinsic {
index: idx as u32,
bytes: &e.0,
client: self.client.clone(),
block_hash: self.details.block.header.hash(),
cached_events: self.cached_events.clone(),
_marker: std::marker::PhantomData,
}
})
}
}
/// A single extrinsic in a block.
pub struct Extrinsic<'a, T: Config, C> {
index: u32,
bytes: &'a [u8],
client: C,
block_hash: T::Hash,
cached_events: CachedEvents<T>,
_marker: std::marker::PhantomData<T>,
}
impl<'a, T, C> Extrinsic<'a, T, C>
where
T: Config,
C: OfflineClientT<T>,
{
/// The index of the extrinsic in the block.
pub fn index(&self) -> u32 {
self.index
}
/// The bytes of the extrinsic.
pub fn bytes(&self) -> &'a [u8] {
self.bytes
}
}
impl<'a, T, C> Extrinsic<'a, T, C>
where
T: Config,
C: OnlineClientT<T>,
{
/// The events associated with the extrinsic.
pub async fn events(&self) -> Result<ExtrinsicEvents<T>, Error> {
let events =
get_events(&self.client, self.block_hash, &self.cached_events).await?;
let ext_hash = T::Hashing::hash_of(&self.bytes);
Ok(ExtrinsicEvents::new(ext_hash, self.index, events))
}
}
/// The events associated with a given extrinsic.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct ExtrinsicEvents<T: Config> {
// The hash of the extrinsic (handy to expose here because
// this type is returned from TxProgress things in the most
// basic flows, so it's the only place people can access it
// without complicating things for themselves).
ext_hash: T::Hash,
// The index of the extrinsic:
idx: u32,
// All of the events in the block:
events: events::Events<T>,
}
impl<T: Config> ExtrinsicEvents<T> {
pub(crate) fn new(ext_hash: T::Hash, idx: u32, events: events::Events<T>) -> Self {
Self {
ext_hash,
idx,
events,
}
}
/// Return the hash of the block that the extrinsic is in.
pub fn block_hash(&self) -> T::Hash {
self.events.block_hash()
}
/// The index of the extrinsic that these events are produced from.
pub fn extrinsic_index(&self) -> u32 {
self.idx
}
/// Return the hash of the extrinsic.
pub fn extrinsic_hash(&self) -> T::Hash {
self.ext_hash
}
/// Return all of the events in the block that the extrinsic is in.
pub fn all_events_in_block(&self) -> &events::Events<T> {
&self.events
}
/// Iterate over all of the raw events associated with this transaction.
///
/// This works in the same way that [`events::Events::iter()`] does, with the
/// exception that it filters out events not related to the submitted extrinsic.
pub fn iter(&self) -> impl Iterator<Item = Result<events::EventDetails, Error>> + '_ {
self.events.iter().filter(|ev| {
ev.as_ref()
.map(|ev| ev.phase() == events::Phase::ApplyExtrinsic(self.idx))
.unwrap_or(true) // Keep any errors.
})
}
/// Find all of the transaction events matching the event type provided as a generic parameter.
///
/// This works in the same way that [`events::Events::find()`] does, with the
/// exception that it filters out events not related to the submitted extrinsic.
pub fn find<Ev: events::StaticEvent>(
&self,
) -> impl Iterator<Item = Result<Ev, Error>> + '_ {
self.iter().filter_map(|ev| {
ev.and_then(|ev| ev.as_event::<Ev>().map_err(Into::into))
.transpose()
})
}
/// Iterate through the transaction events using metadata to dynamically decode and skip
/// them, and return the first event found which decodes to the provided `Ev` type.
///
/// This works in the same way that [`events::Events::find_first()`] does, with the
/// exception that it ignores events not related to the submitted extrinsic.
pub fn find_first<Ev: events::StaticEvent>(&self) -> Result<Option<Ev>, Error> {
self.find::<Ev>().next().transpose()
}
/// Find an event in those associated with this transaction. Returns true if it was found.
///
/// This works in the same way that [`events::Events::has()`] does, with the
/// exception that it ignores events not related to the submitted extrinsic.
pub fn has<Ev: events::StaticEvent>(&self) -> Result<bool, Error> {
Ok(self.find::<Ev>().next().transpose()?.is_some())
}
}
// Return Events from the cache, or fetch from the node if needed.
async fn get_events<C, T>(
client: &C,
block_hash: T::Hash,
cached_events: &AsyncMutex<Option<events::Events<T>>>,
) -> Result<events::Events<T>, Error>
where
T: Config,
C: OnlineClientT<T>,
{
// Acquire lock on the events cache. We either get back our events or we fetch and set them
// before unlocking, so only one fetch call should ever be made. We do this because the
// same events can be shared across all extrinsics in the block.
let lock = cached_events.lock().await;
let events = match &*lock {
Some(events) => events.clone(),
None => {
events::EventsClient::new(client.clone())
.at(Some(block_hash))
.await?
}
};
Ok(events)
}
+122 -44
View File
@@ -2,9 +2,13 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::Block;
use crate::{
client::OnlineClientT,
error::Error,
error::{
BlockError,
Error,
},
utils::PhantomDataSendSync,
Config,
};
@@ -16,7 +20,13 @@ use futures::{
StreamExt,
};
use sp_runtime::traits::Header;
use std::future::Future;
use std::{
future::Future,
pin::Pin,
};
type BlockStream<T> = Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>;
type BlockStreamRes<T> = Result<BlockStream<T>, Error>;
/// A client for working with blocks.
#[derive(Derivative)]
@@ -41,64 +51,132 @@ where
T: Config,
Client: OnlineClientT<T>,
{
/// Subscribe to new best block headers.
/// Obtain block details given the provided block hash, or the latest block if `None` is
/// provided.
///
/// # Note
/// # Warning
///
/// This does not produce all the blocks from the chain, just the best blocks.
/// The best block is selected by the consensus algorithm.
/// This calls under the hood the `chain_subscribeNewHeads` RPC method, if you need
/// a subscription of all the blocks please use the `chain_subscribeAllHeads` method.
///
/// These blocks haven't necessarily been finalised yet. Prefer
/// [`BlocksClient::subscribe_finalized_headers()`] if that is important.
pub fn subscribe_headers(
/// This call only supports blocks produced since the most recent
/// runtime upgrade. You can attempt to retrieve older blocks,
/// but may run into errors attempting to work with them.
pub fn at(
&self,
) -> impl Future<Output = Result<impl Stream<Item = Result<T::Header, Error>>, Error>>
+ Send
+ 'static {
block_hash: Option<T::Hash>,
) -> impl Future<Output = Result<Block<T, Client>, Error>> + Send + 'static {
let client = self.client.clone();
async move { client.rpc().subscribe_blocks().await }
async move {
// If block hash is not provided, get the hash
// for the latest block and use that.
let block_hash = match block_hash {
Some(hash) => hash,
None => {
client
.rpc()
.block_hash(None)
.await?
.expect("didn't pass a block number; qed")
}
};
let block_header = match client.rpc().header(Some(block_hash)).await? {
Some(header) => header,
None => return Err(BlockError::block_hash_not_found(block_hash).into()),
};
Ok(Block::new(block_header, client))
}
}
/// Subscribe to finalized block headers.
/// Subscribe to all new blocks imported by the node.
///
/// While the Substrate RPC method does not guarantee that all finalized block headers are
/// provided, this function does.
/// ```
pub fn subscribe_finalized_headers(
/// **Note:** You probably want to use [`Self::subscribe_finalized()`] most of
/// the time.
pub fn subscribe_all(
&self,
) -> impl Future<Output = Result<impl Stream<Item = Result<T::Header, Error>>, Error>>
+ Send
+ 'static {
) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, Error>> + Send + 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
async move { subscribe_finalized_headers(client).await }
header_sub_fut_to_block_sub(self.clone(), async move {
let sub = client.rpc().subscribe_all_block_headers().await?;
BlockStreamRes::Ok(Box::pin(sub))
})
}
/// Subscribe to all new blocks imported by the node onto the current best fork.
///
/// **Note:** You probably want to use [`Self::subscribe_finalized()`] most of
/// the time.
pub fn subscribe_best(
&self,
) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, Error>> + Send + 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
header_sub_fut_to_block_sub(self.clone(), async move {
let sub = client.rpc().subscribe_best_block_headers().await?;
BlockStreamRes::Ok(Box::pin(sub))
})
}
/// Subscribe to finalized blocks.
pub fn subscribe_finalized(
&self,
) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, Error>> + Send + 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
header_sub_fut_to_block_sub(self.clone(), async move {
// Fetch the last finalised block details immediately, so that we'll get
// all blocks after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_num = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());
let sub = client.rpc().subscribe_finalized_block_headers().await?;
// Adjust the subscription stream to fill in any missing blocks.
BlockStreamRes::Ok(
subscribe_to_block_headers_filling_in_gaps(
client,
last_finalized_block_num,
sub,
)
.boxed(),
)
})
}
}
async fn subscribe_finalized_headers<T, Client>(
client: Client,
) -> Result<impl Stream<Item = Result<T::Header, Error>>, Error>
/// Take a promise that will return a subscription to some block headers,
/// and return a subscription to some blocks based on this.
async fn header_sub_fut_to_block_sub<T, Client, S>(
blocks_client: BlocksClient<T, Client>,
sub: S,
) -> Result<BlockStream<Block<T, Client>>, Error>
where
T: Config,
Client: OnlineClientT<T>,
S: Future<Output = Result<BlockStream<T::Header>, Error>> + Send + 'static,
Client: OnlineClientT<T> + Send + Sync + 'static,
{
// Fetch the last finalised block details immediately, so that we'll get
// all blocks after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_num = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());
let sub = sub.await?.then(move |header| {
let client = blocks_client.client.clone();
async move {
let header = match header {
Ok(header) => header,
Err(e) => return Err(e),
};
let sub = client.rpc().subscribe_finalized_blocks().await?;
// Adjust the subscription stream to fill in any missing blocks.
Ok(
subscribe_to_block_headers_filling_in_gaps(client, last_finalized_block_num, sub)
.boxed(),
)
Ok(Block::new(header, client))
}
});
BlockStreamRes::Ok(Box::pin(sub))
}
/// Note: This is exposed for testing but is not considered stable and may change
+6
View File
@@ -4,8 +4,14 @@
//! This module exposes the necessary functionality for working with events.
mod block_types;
mod blocks_client;
pub use block_types::{
Block,
Extrinsic,
ExtrinsicEvents,
};
pub use blocks_client::{
subscribe_to_block_headers_filling_in_gaps,
BlocksClient,
-6
View File
@@ -16,7 +16,6 @@ use codec::{
use core::fmt::Debug;
use sp_runtime::traits::{
AtLeast32Bit,
Extrinsic,
Hash,
Header,
MaybeSerializeDeserialize,
@@ -78,9 +77,6 @@ pub trait Config: 'static {
/// Signature type.
type Signature: Verify + Encode + Send + Sync + 'static;
/// Extrinsic type within blocks.
type Extrinsic: Parameter + Extrinsic + Debug + MaybeSerializeDeserialize + Send;
/// This type defines the extrinsic extra and additional parameters.
type ExtrinsicParams: crate::tx::ExtrinsicParams<Self::Index, Self::Hash>;
}
@@ -104,7 +100,6 @@ impl Config for SubstrateConfig {
type Header =
sp_runtime::generic::Header<Self::BlockNumber, sp_runtime::traits::BlakeTwo256>;
type Signature = sp_runtime::MultiSignature;
type Extrinsic = sp_runtime::OpaqueExtrinsic;
type ExtrinsicParams = crate::tx::SubstrateExtrinsicParams<Self>;
}
@@ -145,6 +140,5 @@ impl<T: Config, E: crate::tx::ExtrinsicParams<T::Index, T::Hash>> Config
type Address = T::Address;
type Header = T::Header;
type Signature = T::Signature;
type Extrinsic = T::Extrinsic;
type ExtrinsicParams = E;
}
+21
View File
@@ -63,6 +63,9 @@ pub enum Error {
/// Transaction progress error.
#[error("Transaction error: {0}")]
Transaction(#[from] TransactionError),
/// Block related error.
#[error("Block error: {0}")]
Block(#[from] BlockError),
/// An error encoding a storage address.
#[error("Error encoding storage address: {0}")]
StorageAddress(#[from] StorageAddressError),
@@ -237,6 +240,24 @@ impl DispatchError {
}
}
/// Block error
#[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)]
pub enum BlockError {
/// The block
#[error(
"Could not find a block with hash {0} (perhaps it was on a non-finalized fork?)"
)]
BlockHashNotFound(String),
}
impl BlockError {
/// Produce an error that a block with the given hash cannot be found.
pub fn block_hash_not_found(hash: impl AsRef<[u8]>) -> BlockError {
let hash = format!("0x{}", hex::encode(hash));
BlockError::BlockHashNotFound(hash)
}
}
/// Transaction error.
#[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)]
pub enum TransactionError {
-208
View File
@@ -1,208 +0,0 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! Subscribing to events.
use crate::{
client::OnlineClientT,
error::Error,
events::EventsClient,
Config,
};
use derivative::Derivative;
use futures::{
stream::BoxStream,
Future,
FutureExt,
Stream,
StreamExt,
};
use sp_runtime::traits::Header;
use std::{
marker::Unpin,
task::Poll,
};
pub use super::{
EventDetails,
EventFilter,
Events,
FilterEvents,
};
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type FinalizedEventSub<Header> = BoxStream<'static, Result<Header, Error>>;
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = BoxStream<'static, Result<Item, Error>>;
/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
#[derivative(Debug(bound = "Sub: std::fmt::Debug, Client: std::fmt::Debug"))]
pub struct EventSubscription<T: Config, Client, Sub> {
finished: bool,
client: Client,
block_header_subscription: Sub,
#[derivative(Debug = "ignore")]
at: Option<std::pin::Pin<Box<dyn Future<Output = Result<Events<T>, Error>> + Send>>>,
}
impl<T: Config, Client, Sub, E: Into<Error>> EventSubscription<T, Client, Sub>
where
Sub: Stream<Item = Result<T::Header, E>> + Unpin,
{
/// Create a new [`EventSubscription`] from a client and a subscription
/// which returns block headers.
pub fn new(client: Client, block_header_subscription: Sub) -> Self {
EventSubscription {
finished: false,
client,
block_header_subscription,
at: None,
}
}
/// Return only specific events matching the tuple of 1 or more event
/// types that has been provided as the `Filter` type parameter.
///
/// # Example
///
/// ```no_run
/// use futures::StreamExt;
/// use subxt::{OnlineClient, PolkadotConfig};
///
/// #[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")]
/// pub mod polkadot {}
///
/// # #[tokio::main]
/// # async fn main() {
/// let api = OnlineClient::<PolkadotConfig>::new().await.unwrap();
///
/// let mut events = api
/// .events()
/// .subscribe()
/// .await
/// .unwrap()
/// .filter_events::<(
/// polkadot::balances::events::Transfer,
/// polkadot::balances::events::Deposit
/// )>();
///
/// while let Some(ev) = events.next().await {
/// let event_details = ev.unwrap();
/// match event_details.event {
/// (Some(transfer), None) => println!("Balance transfer event: {transfer:?}"),
/// (None, Some(deposit)) => println!("Balance deposit event: {deposit:?}"),
/// _ => unreachable!()
/// }
/// }
/// # }
/// ```
pub fn filter_events<Filter: EventFilter>(
self,
) -> FilterEvents<'static, Self, T, Filter> {
FilterEvents::new(self)
}
}
impl<T: Config, Client, Sub: Unpin> Unpin for EventSubscription<T, Client, Sub> {}
// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose
// way to roughly implement the following function:
//
// ```
// fn subscribe_events<T: Config, Evs: Decode>(client: &'_ Client<T>, block_sub: Subscription<T::Header>) -> impl Stream<Item=Result<Events<'_, T, Evs>, Error>> + '_ {
// use futures::StreamExt;
// block_sub.then(move |block_header_res| async move {
// use sp_runtime::traits::Header;
// let block_header = block_header_res?;
// let block_hash = block_header.hash();
// at(client, block_hash).await
// })
// }
// ```
//
// The advantage of this manual implementation is that we have a named type that we (and others)
// can derive things on, store away, alias etc.
impl<T, Client, Sub, E> Stream for EventSubscription<T, Client, Sub>
where
T: Config,
Client: OnlineClientT<T>,
Sub: Stream<Item = Result<T::Header, E>> + Unpin,
E: Into<Error>,
{
type Item = Result<Events<T>, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// We are finished; return None.
if self.finished {
return Poll::Ready(None)
}
// If there isn't an `at` function yet that's busy resolving a block hash into
// some event details, then poll the block header subscription to get one.
if self.at.is_none() {
match futures::ready!(self.block_header_subscription.poll_next_unpin(cx)) {
None => {
self.finished = true;
return Poll::Ready(None)
}
Some(Err(e)) => {
self.finished = true;
return Poll::Ready(Some(Err(e.into())))
}
Some(Ok(block_header)) => {
// Note [jsdw]: We may be able to get rid of the per-item allocation
// with https://github.com/oblique/reusable-box-future.
let at = EventsClient::new(self.client.clone())
.at(Some(block_header.hash()));
self.at = Some(Box::pin(at));
// Continue, so that we poll this function future we've just created.
}
}
}
// If we get here, there will be an `at` function stored. Unwrap it and poll it to
// completion to get our events, throwing it away as soon as it is ready.
let at_fn = self
.at
.as_mut()
.expect("'at' function should have been set above'");
let events = futures::ready!(at_fn.poll_unpin(cx));
self.at = None;
Poll::Ready(Some(events))
}
}
#[cfg(test)]
mod test {
use super::*;
// Ensure `EventSubscription` can be sent; only actually a compile-time check.
#[allow(unused)]
fn check_sendability() {
fn assert_send<T: Send>() {}
assert_send::<
EventSubscription<
crate::SubstrateConfig,
(),
EventSub<<crate::SubstrateConfig as Config>::Header>,
>,
>();
assert_send::<
EventSubscription<
crate::SubstrateConfig,
(),
FinalizedEventSub<<crate::SubstrateConfig as Config>::Header>,
>,
>();
}
}
+27 -118
View File
@@ -5,12 +5,7 @@
use crate::{
client::OnlineClientT,
error::Error,
events::{
EventSub,
EventSubscription,
Events,
FinalizedEventSub,
},
events::Events,
Config,
};
use derivative::Derivative;
@@ -44,6 +39,12 @@ where
Client: OnlineClientT<T>,
{
/// Obtain events at some block hash.
///
/// # Warning
///
/// This call only supports blocks produced since the most recent
/// runtime upgrade. You can attempt to retrieve events from older blocks,
/// but may run into errors attempting to work with them.
pub fn at(
&self,
block_hash: Option<T::Hash>,
@@ -51,122 +52,30 @@ where
// Clone and pass the client in like this so that we can explicitly
// return a Future that's Send + 'static, rather than tied to &self.
let client = self.client.clone();
async move { at(client, block_hash).await }
}
async move {
// If block hash is not provided, get the hash
// for the latest block and use that.
let block_hash = match block_hash {
Some(hash) => hash,
None => {
client
.rpc()
.block_hash(None)
.await?
.expect("didn't pass a block number; qed")
}
};
/// Subscribe to all events from blocks.
///
/// **Note:** these blocks haven't necessarily been finalised yet; prefer
/// [`EventsClient::subscribe_finalized()`] if that is important.
///
/// # Example
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() {
/// use futures::StreamExt;
/// use subxt::{ OnlineClient, PolkadotConfig };
///
/// let api = OnlineClient::<PolkadotConfig>::new().await.unwrap();
///
/// let mut events = api.events().subscribe().await.unwrap();
///
/// while let Some(ev) = events.next().await {
/// // Obtain all events from this block.
/// let ev = ev.unwrap();
/// // Print block hash.
/// println!("Event at block hash {:?}", ev.block_hash());
/// // Iterate over all events.
/// let mut iter = ev.iter();
/// while let Some(event_details) = iter.next() {
/// println!("Event details {:?}", event_details);
/// }
/// }
/// # }
/// ```
pub fn subscribe(
&self,
) -> impl Future<
Output = Result<EventSubscription<T, Client, EventSub<T::Header>>, Error>,
> + Send
+ 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
async move { subscribe(client).await }
}
/// Subscribe to events from finalized blocks. See [`EventsClient::subscribe()`] for details.
pub fn subscribe_finalized(
&self,
) -> impl Future<
Output = Result<
EventSubscription<T, Client, FinalizedEventSub<T::Header>>,
Error,
>,
> + Send
+ 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
async move { subscribe_finalized(client).await }
}
}
async fn at<T, Client>(
client: Client,
block_hash: Option<T::Hash>,
) -> Result<Events<T>, Error>
where
T: Config,
Client: OnlineClientT<T>,
{
// If block hash is not provided, get the hash
// for the latest block and use that.
let block_hash = match block_hash {
Some(hash) => hash,
None => {
client
let event_bytes = client
.rpc()
.block_hash(None)
.storage(&*system_events_key().0, Some(block_hash))
.await?
.expect("didn't pass a block number; qed")
.map(|e| e.0)
.unwrap_or_else(Vec::new);
Ok(Events::new(client.metadata(), block_hash, event_bytes))
}
};
let event_bytes = client
.rpc()
.storage(&*system_events_key().0, Some(block_hash))
.await?
.map(|e| e.0)
.unwrap_or_else(Vec::new);
Ok(Events::new(client.metadata(), block_hash, event_bytes))
}
async fn subscribe<T, Client>(
client: Client,
) -> Result<EventSubscription<T, Client, EventSub<T::Header>>, Error>
where
T: Config,
Client: OnlineClientT<T>,
{
let block_subscription = client.blocks().subscribe_headers().await?;
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}
/// Subscribe to events from finalized blocks.
async fn subscribe_finalized<T, Client>(
client: Client,
) -> Result<EventSubscription<T, Client, FinalizedEventSub<T::Header>>, Error>
where
T: Config,
Client: OnlineClientT<T>,
{
let block_subscription = client.blocks().subscribe_finalized_headers().await?;
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}
}
// The storage key needed to access events.
+3 -1
View File
@@ -25,7 +25,7 @@ use std::sync::Arc;
/// A collection of events obtained from a block, bundled with the necessary
/// information needed to decode and iterate over them.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
#[derivative(Debug(bound = ""), Clone(bound = ""))]
pub struct Events<T: Config> {
metadata: Metadata,
block_hash: T::Hash,
@@ -83,6 +83,8 @@ impl<T: Config> Events<T> {
/// Iterate over all of the events, using metadata to dynamically
/// decode them as we go, and returning the raw bytes and other associated
/// details. If an error occurs, all subsequent iterations return `None`.
// Dev note: The returned iterator is 'static + Send so that we can box it up and make
// use of it with our `FilterEvents` stuff.
pub fn iter(
&self,
) -> impl Iterator<Item = Result<EventDetails, Error>> + Send + Sync + 'static {
-403
View File
@@ -1,403 +0,0 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! Filtering individual events from subscriptions.
use super::{
Events,
Phase,
StaticEvent,
};
use crate::{
Config,
Error,
};
use futures::{
Stream,
StreamExt,
};
use std::{
marker::Unpin,
task::Poll,
};
/// A stream which filters events based on the `Filter` param provided.
/// If `Filter` is a 1-tuple of a single `Event` type, it will return every
/// instance of that event as it's found. If `filter` is ` tuple of multiple
/// `Event` types, it will return a corresponding tuple of `Option`s, where
/// exactly one of these will be `Some(event)` each iteration.
pub struct FilterEvents<'a, Sub: 'a, T: Config, Filter: EventFilter> {
// A subscription; in order for the Stream impl to apply, this will
// impl `Stream<Item = Result<Events<'a, T, Evs>, Error>> + Unpin + 'a`.
sub: Sub,
// Each time we get Events from our subscription, they are stored here
// and iterated through in future stream iterations until exhausted.
events: Option<
Box<
dyn Iterator<
Item = Result<
FilteredEventDetails<T::Hash, Filter::ReturnType>,
Error,
>,
> + Send
+ 'a,
>,
>,
}
impl<'a, Sub: 'a, T: Config, Filter: EventFilter> Unpin
for FilterEvents<'a, Sub, T, Filter>
{
}
impl<'a, Sub: 'a, T: Config, Filter: EventFilter> FilterEvents<'a, Sub, T, Filter> {
pub(crate) fn new(sub: Sub) -> Self {
Self { sub, events: None }
}
}
impl<'a, Sub, T, Filter> Stream for FilterEvents<'a, Sub, T, Filter>
where
Sub: Stream<Item = Result<Events<T>, Error>> + Unpin + 'a,
T: Config,
Filter: EventFilter,
{
type Item = Result<FilteredEventDetails<T::Hash, Filter::ReturnType>, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
// Drain the current events we're iterating over first:
if let Some(events_iter) = self.events.as_mut() {
match events_iter.next() {
Some(res) => return Poll::Ready(Some(res)),
None => {
self.events = None;
}
}
}
// Wait for new events to come in:
match futures::ready!(self.sub.poll_next_unpin(cx)) {
None => return Poll::Ready(None),
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
Some(Ok(events)) => {
self.events = Some(Filter::filter(events));
}
};
}
}
}
/// This is returned from the [`FilterEvents`] impl of [`Stream`]. It contains
/// some type representing an event we've filtered on, along with couple of additional
/// pieces of information about that event.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct FilteredEventDetails<BlockHash, Evs> {
/// During which [`Phase`] was the event produced?
pub phase: Phase,
/// Hash of the block that this event came from.
pub block_hash: BlockHash,
/// A type containing an event that we've filtered on.
/// Depending on the filter type, this may be a tuple
/// or a single event.
pub event: Evs,
}
/// This trait is implemented for tuples of Event types; any such tuple (up to size 8) can be
/// used to filter an event subscription to return only the specified events.
pub trait EventFilter: private::Sealed {
/// The type we'll be handed back from filtering.
type ReturnType;
/// Filter the events based on the type implementing this trait.
fn filter<T: Config>(
events: Events<T>,
) -> Box<
dyn Iterator<
Item = Result<FilteredEventDetails<T::Hash, Self::ReturnType>, Error>,
> + Send,
>;
}
// Prevent userspace implementations of the above trait; the interface is not considered stable
// and is not a particularly nice API to work with (particularly because it involves boxing, which
// would be nice to get rid of eventually).
pub(crate) mod private {
pub trait Sealed {}
}
// A special case impl for searching for a tuple of exactly one event (in this case, we don't
// need to return an `(Option<Event>,)`; we can just return `Event`.
impl<Ev: StaticEvent> private::Sealed for (Ev,) {}
impl<Ev: StaticEvent> EventFilter for (Ev,) {
type ReturnType = Ev;
fn filter<T: Config>(
events: Events<T>,
) -> Box<dyn Iterator<Item = Result<FilteredEventDetails<T::Hash, Ev>, Error>> + Send>
{
let block_hash = events.block_hash();
let mut iter = events.iter();
Box::new(std::iter::from_fn(move || {
for ev in iter.by_ref() {
// Forward any error immediately:
let raw_event = match ev {
Ok(ev) => ev,
Err(e) => return Some(Err(e)),
};
// Try decoding each type until we hit a match or an error:
let ev = raw_event.as_event::<Ev>();
if let Ok(Some(event)) = ev {
// We found a match; return our tuple.
return Some(Ok(FilteredEventDetails {
phase: raw_event.phase(),
block_hash,
event,
}))
}
if let Err(e) = ev {
// We hit an error. Return it.
return Some(Err(e.into()))
}
}
None
}))
}
}
// A generalised impl for tuples of sizes greater than 1:
macro_rules! impl_event_filter {
($($ty:ident $idx:tt),+) => {
impl <$($ty: StaticEvent),+> private::Sealed for ( $($ty,)+ ) {}
impl <$($ty: StaticEvent),+> EventFilter for ( $($ty,)+ ) {
type ReturnType = ( $(Option<$ty>,)+ );
fn filter<T: Config>(
events: Events<T>
) -> Box<dyn Iterator<Item=Result<FilteredEventDetails<T::Hash,Self::ReturnType>, Error>> + Send> {
let block_hash = events.block_hash();
let mut iter = events.iter();
Box::new(std::iter::from_fn(move || {
let mut out: ( $(Option<$ty>,)+ ) = Default::default();
for ev in iter.by_ref() {
// Forward any error immediately:
let raw_event = match ev {
Ok(ev) => ev,
Err(e) => return Some(Err(e))
};
// Try decoding each type until we hit a match or an error:
$({
let ev = raw_event.as_event::<$ty>();
if let Ok(Some(ev)) = ev {
// We found a match; return our tuple.
out.$idx = Some(ev);
return Some(Ok(FilteredEventDetails {
phase: raw_event.phase(),
block_hash,
event: out
}))
}
if let Err(e) = ev {
// We hit an error. Return it.
return Some(Err(e.into()))
}
})+
}
None
}))
}
}
}
}
impl_event_filter!(A 0, B 1);
impl_event_filter!(A 0, B 1, C 2);
impl_event_filter!(A 0, B 1, C 2, D 3);
impl_event_filter!(A 0, B 1, C 2, D 3, E 4);
impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5);
impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6);
impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7);
#[cfg(test)]
mod test {
use super::{
super::events_type::test_utils::{
event_record,
events,
metadata,
},
*,
};
use crate::{
Config,
Metadata,
SubstrateConfig,
};
use codec::{
Decode,
Encode,
};
use futures::{
stream,
Stream,
StreamExt,
};
use scale_info::TypeInfo;
// Some pretend events in a pallet
#[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)]
enum PalletEvents {
A(EventA),
B(EventB),
C(EventC),
}
// An event in our pallet that we can filter on.
#[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)]
struct EventA(u8);
impl StaticEvent for EventA {
const PALLET: &'static str = "Test";
const EVENT: &'static str = "A";
}
// An event in our pallet that we can filter on.
#[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)]
struct EventB(bool);
impl StaticEvent for EventB {
const PALLET: &'static str = "Test";
const EVENT: &'static str = "B";
}
// An event in our pallet that we can filter on.
#[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)]
struct EventC(u8, bool);
impl StaticEvent for EventC {
const PALLET: &'static str = "Test";
const EVENT: &'static str = "C";
}
// A stream of fake events for us to try filtering on.
fn events_stream(
metadata: Metadata,
) -> impl Stream<Item = Result<Events<SubstrateConfig>, Error>> {
stream::iter(vec![
events::<PalletEvents>(
metadata.clone(),
vec![
event_record(Phase::Initialization, PalletEvents::A(EventA(1))),
event_record(Phase::ApplyExtrinsic(0), PalletEvents::B(EventB(true))),
event_record(Phase::Finalization, PalletEvents::A(EventA(2))),
],
),
events::<PalletEvents>(
metadata.clone(),
vec![event_record(
Phase::ApplyExtrinsic(1),
PalletEvents::B(EventB(false)),
)],
),
events::<PalletEvents>(
metadata,
vec![
event_record(Phase::ApplyExtrinsic(2), PalletEvents::B(EventB(true))),
event_record(Phase::ApplyExtrinsic(3), PalletEvents::A(EventA(3))),
],
),
])
.map(Ok::<_, Error>)
}
#[tokio::test]
async fn filter_one_event_from_stream() {
let metadata = metadata::<PalletEvents>();
// Filter out fake event stream to select events matching `EventA` only.
let actual: Vec<_> =
FilterEvents::<_, SubstrateConfig, (EventA,)>::new(events_stream(metadata))
.map(|e| e.unwrap())
.collect()
.await;
let expected = vec![
FilteredEventDetails {
phase: Phase::Initialization,
block_hash: <SubstrateConfig as Config>::Hash::default(),
event: EventA(1),
},
FilteredEventDetails {
phase: Phase::Finalization,
block_hash: <SubstrateConfig as Config>::Hash::default(),
event: EventA(2),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(3),
block_hash: <SubstrateConfig as Config>::Hash::default(),
event: EventA(3),
},
];
assert_eq!(actual, expected);
}
#[tokio::test]
async fn filter_some_events_from_stream() {
let metadata = metadata::<PalletEvents>();
// Filter out fake event stream to select events matching `EventA` or `EventB`.
let actual: Vec<_> = FilterEvents::<_, SubstrateConfig, (EventA, EventB)>::new(
events_stream(metadata),
)
.map(|e| e.unwrap())
.collect()
.await;
let expected = vec![
FilteredEventDetails {
phase: Phase::Initialization,
block_hash: <SubstrateConfig as Config>::Hash::default(),
event: (Some(EventA(1)), None),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(0),
block_hash: <SubstrateConfig as Config>::Hash::default(),
event: (None, Some(EventB(true))),
},
FilteredEventDetails {
phase: Phase::Finalization,
block_hash: <SubstrateConfig as Config>::Hash::default(),
event: (Some(EventA(2)), None),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(1),
block_hash: <SubstrateConfig as Config>::Hash::default(),
event: (None, Some(EventB(false))),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(2),
block_hash: <SubstrateConfig as Config>::Hash::default(),
event: (None, Some(EventB(true))),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(3),
block_hash: <SubstrateConfig as Config>::Hash::default(),
event: (Some(EventA(3)), None),
},
];
assert_eq!(actual, expected);
}
#[tokio::test]
async fn filter_no_events_from_stream() {
let metadata = metadata::<PalletEvents>();
// Filter out fake event stream to select events matching `EventC` (none exist).
let actual: Vec<_> =
FilterEvents::<_, SubstrateConfig, (EventC,)>::new(events_stream(metadata))
.map(|e| e.unwrap())
.collect()
.await;
assert_eq!(actual, vec![]);
}
}
-12
View File
@@ -6,26 +6,14 @@
//! The two main entry points into events are [`crate::OnlineClient::events()`]
//! and calls like [crate::tx::TxProgress::wait_for_finalized_success()].
mod event_subscription;
mod events_client;
mod events_type;
mod filter_events;
pub use event_subscription::{
EventSub,
EventSubscription,
FinalizedEventSub,
};
pub use events_client::EventsClient;
pub use events_type::{
EventDetails,
Events,
};
pub use filter_events::{
EventFilter,
FilterEvents,
FilteredEventDetails,
};
use codec::{
Decode,
+5
View File
@@ -133,6 +133,11 @@
)]
#![allow(clippy::type_complexity)]
// Suppress an unused dependency warning because tokio is
// only used in example code snippets at the time of writing.
#[cfg(test)]
use tokio as _;
pub use subxt_macro::subxt;
pub mod blocks;
+69 -15
View File
@@ -69,13 +69,7 @@ use sp_core::{
Bytes,
U256,
};
use sp_runtime::{
generic::{
Block,
SignedBlock,
},
ApplyExtrinsicResult,
};
use sp_runtime::ApplyExtrinsicResult;
use std::{
collections::HashMap,
sync::Arc,
@@ -98,9 +92,40 @@ pub enum NumberOrHex {
Hex(U256),
}
/// Alias for the type of a block returned by `chain_getBlock`
pub type ChainBlock<T> =
SignedBlock<Block<<T as Config>::Header, <T as Config>::Extrinsic>>;
/// The response from `chain_getBlock`
#[derive(Debug, Deserialize)]
#[serde(bound = "T: Config")]
pub struct ChainBlockResponse<T: Config> {
/// The block itself.
pub block: ChainBlock<T>,
/// Block justification.
pub justifications: Option<sp_runtime::Justifications>,
}
/// Block details in the [`ChainBlockResponse`].
#[derive(Debug, Deserialize)]
pub struct ChainBlock<T: Config> {
/// The block header.
pub header: T::Header,
/// The accompanying extrinsics.
pub extrinsics: Vec<ChainBlockExtrinsic>,
}
/// Bytes representing an extrinsic in a [`ChainBlock`].
#[derive(Debug)]
pub struct ChainBlockExtrinsic(pub Vec<u8>);
impl<'a> ::serde::Deserialize<'a> for ChainBlockExtrinsic {
fn deserialize<D>(de: D) -> Result<Self, D::Error>
where
D: ::serde::Deserializer<'a>,
{
let r = sp_core::bytes::deserialize(de)?;
let bytes = Decode::decode(&mut &r[..])
.map_err(|e| ::serde::de::Error::custom(format!("Decode error: {}", e)))?;
Ok(ChainBlockExtrinsic(bytes))
}
}
/// Wrapper for NumberOrHex to allow custom From impls
#[derive(Serialize)]
@@ -498,7 +523,7 @@ impl<T: Config> Rpc<T> {
pub async fn block(
&self,
hash: Option<T::Hash>,
) -> Result<Option<ChainBlock<T>>, Error> {
) -> Result<Option<ChainBlockResponse<T>>, Error> {
let params = rpc_params![hash];
let block = self.client.request("chain_getBlock", params).await?;
Ok(block)
@@ -543,11 +568,16 @@ impl<T: Config> Rpc<T> {
Ok(version)
}
/// Subscribe to blocks.
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
/// Subscribe to all new best block headers.
pub async fn subscribe_best_block_headers(
&self,
) -> Result<Subscription<T::Header>, Error> {
let subscription = self
.client
.subscribe(
// Despite the name, this returns a stream of all new blocks
// imported by the node that happen to be added to the current best chain
// (ie all best blocks).
"chain_subscribeNewHeads",
rpc_params![],
"chain_unsubscribeNewHeads",
@@ -557,8 +587,32 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}
/// Subscribe to finalized blocks.
pub async fn subscribe_finalized_blocks(
/// Subscribe to all new block headers.
pub async fn subscribe_all_block_headers(
&self,
) -> Result<Subscription<T::Header>, Error> {
let subscription = self
.client
.subscribe(
// Despite the name, this returns a stream of all new blocks
// imported by the node that happen to be added to the current best chain
// (ie all best blocks).
"chain_subscribeAllHeads",
rpc_params![],
"chain_unsubscribeAllHeads",
)
.await?;
Ok(subscription)
}
/// Subscribe to finalized block headers.
///
/// Note: this may not produce _every_ block in the finalized chain;
/// sometimes multiple blocks are finalized at once, and in this case only the
/// latest one is returned. the higher level APIs that use this "fill in" the
/// gaps for us.
pub async fn subscribe_finalized_block_headers(
&self,
) -> Result<Subscription<T::Header>, Error> {
let subscription = self
-1
View File
@@ -52,7 +52,6 @@ pub use self::{
TxPayload,
},
tx_progress::{
TxEvents,
TxInBlock,
TxProgress,
TxStatus,
+13 -83
View File
@@ -14,14 +14,7 @@ use crate::{
RpcError,
TransactionError,
},
events::{
self,
EventDetails,
Events,
EventsClient,
Phase,
StaticEvent,
},
events::EventsClient,
rpc::{
Subscription,
SubstrateTxStatus,
@@ -147,7 +140,9 @@ where
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_finalized_success(self) -> Result<TxEvents<T>, Error> {
pub async fn wait_for_finalized_success(
self,
) -> Result<crate::blocks::ExtrinsicEvents<T>, Error> {
let evs = self.wait_for_finalized().await?.wait_for_success().await?;
Ok(evs)
}
@@ -343,7 +338,9 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
///
/// **Note:** This has to download block details from the node and decode events
/// from them.
pub async fn wait_for_success(&self) -> Result<TxEvents<T>, Error> {
pub async fn wait_for_success(
&self,
) -> Result<crate::blocks::ExtrinsicEvents<T>, Error> {
let events = self.fetch_events().await?;
// Try to find any errors; return the first one we encounter.
@@ -365,7 +362,7 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
///
/// **Note:** This has to download block details from the node and decode events
/// from them.
pub async fn fetch_events(&self) -> Result<TxEvents<T>, Error> {
pub async fn fetch_events(&self) -> Result<crate::blocks::ExtrinsicEvents<T>, Error> {
let block = self
.client
.rpc()
@@ -376,7 +373,7 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
let extrinsic_idx = block.block.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
let hash = T::Hashing::hash_of(&ext.0);
hash == self.ext_hash
})
// If we successfully obtain the block hash we think contains our
@@ -387,77 +384,10 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
.at(Some(self.block_hash))
.await?;
Ok(TxEvents {
ext_hash: self.ext_hash,
ext_idx: extrinsic_idx as u32,
Ok(crate::blocks::ExtrinsicEvents::new(
self.ext_hash,
extrinsic_idx as u32,
events,
})
}
}
/// This represents the events related to our transaction.
/// We can iterate over the events, or look for a specific one.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct TxEvents<T: Config> {
ext_hash: T::Hash,
ext_idx: u32,
events: Events<T>,
}
impl<T: Config> TxEvents<T> {
/// Return the hash of the block that the transaction has made it into.
pub fn block_hash(&self) -> T::Hash {
self.events.block_hash()
}
/// Return the hash of the extrinsic.
pub fn extrinsic_hash(&self) -> T::Hash {
self.ext_hash
}
/// Return all of the events in the block that the transaction made it into.
pub fn all_events_in_block(&self) -> &events::Events<T> {
&self.events
}
/// Iterate over all of the raw events associated with this transaction.
///
/// This works in the same way that [`events::Events::iter()`] does, with the
/// exception that it filters out events not related to the submitted extrinsic.
pub fn iter(&self) -> impl Iterator<Item = Result<EventDetails, Error>> + '_ {
self.events.iter().filter(|ev| {
ev.as_ref()
.map(|ev| ev.phase() == Phase::ApplyExtrinsic(self.ext_idx))
.unwrap_or(true) // Keep any errors.
})
}
/// Find all of the transaction events matching the event type provided as a generic parameter.
///
/// This works in the same way that [`events::Events::find()`] does, with the
/// exception that it filters out events not related to the submitted extrinsic.
pub fn find<Ev: StaticEvent>(&self) -> impl Iterator<Item = Result<Ev, Error>> + '_ {
self.iter().filter_map(|ev| {
ev.and_then(|ev| ev.as_event::<Ev>().map_err(Into::into))
.transpose()
})
}
/// Iterate through the transaction events using metadata to dynamically decode and skip
/// them, and return the first event found which decodes to the provided `Ev` type.
///
/// This works in the same way that [`events::Events::find_first()`] does, with the
/// exception that it ignores events not related to the submitted extrinsic.
pub fn find_first<Ev: StaticEvent>(&self) -> Result<Option<Ev>, Error> {
self.find::<Ev>().next().transpose()
}
/// Find an event in those associated with this transaction. Returns true if it was found.
///
/// This works in the same way that [`events::Events::has()`] does, with the
/// exception that it ignores events not related to the submitted extrinsic.
pub fn has<Ev: StaticEvent>(&self) -> Result<bool, Error> {
Ok(self.find::<Ev>().next().transpose()?.is_some())
))
}
}
+3 -3
View File
@@ -11,7 +11,7 @@ async fn non_finalized_headers_subscription() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();
let mut sub = api.blocks().subscribe_headers().await?;
let mut sub = api.blocks().subscribe_best().await?;
// Wait for the next set of headers, and check that the
// associated block hash is the one we just finalized.
@@ -30,7 +30,7 @@ async fn finalized_headers_subscription() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();
let mut sub = api.blocks().subscribe_finalized_headers().await?;
let mut sub = api.blocks().subscribe_finalized().await?;
// Wait for the next set of headers, and check that the
// associated block hash is the one we just finalized.
@@ -52,7 +52,7 @@ async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::Error> {
// that there will be some gaps, even if there aren't any from the subscription.
let some_finalized_blocks = api
.rpc()
.subscribe_finalized_blocks()
.subscribe_finalized_block_headers()
.await?
.enumerate()
.take(6)
+12 -3
View File
@@ -74,11 +74,20 @@ async fn fetch_read_proof() {
}
#[tokio::test]
async fn chain_subscribe_blocks() {
async fn chain_subscribe_all_blocks() {
let ctx = test_context().await;
let api = ctx.client();
let mut blocks = api.rpc().subscribe_blocks().await.unwrap();
let mut blocks = api.rpc().subscribe_all_block_headers().await.unwrap();
blocks.next().await.unwrap().unwrap();
}
#[tokio::test]
async fn chain_subscribe_best_blocks() {
let ctx = test_context().await;
let api = ctx.client();
let mut blocks = api.rpc().subscribe_best_block_headers().await.unwrap();
blocks.next().await.unwrap().unwrap();
}
@@ -87,7 +96,7 @@ async fn chain_subscribe_finalized_blocks() {
let ctx = test_context().await;
let api = ctx.client();
let mut blocks = api.rpc().subscribe_finalized_blocks().await.unwrap();
let mut blocks = api.rpc().subscribe_finalized_block_headers().await.unwrap();
blocks.next().await.unwrap().unwrap();
}
-212
View File
@@ -1,212 +0,0 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::{
node_runtime::{
self,
balances,
system,
},
pair_signer,
test_context,
utils::wait_for_blocks,
};
use futures::StreamExt;
use sp_keyring::AccountKeyring;
// Check that we can subscribe to non-finalized block events.
#[tokio::test]
async fn non_finalized_block_subscription() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();
let mut event_sub = api.events().subscribe().await?;
// Wait for the next set of events, and check that the
// associated block hash is not finalized yet.
let events = event_sub.next().await.unwrap()?;
let event_block_hash = events.block_hash();
let current_block_hash = api.rpc().block_hash(None).await?.unwrap();
assert_eq!(event_block_hash, current_block_hash);
Ok(())
}
// Check that we can subscribe to finalized block events.
#[tokio::test]
async fn finalized_block_subscription() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();
let mut event_sub = api.events().subscribe_finalized().await?;
// Wait for the next set of events, and check that the
// associated block hash is the one we just finalized.
// (this can be a bit slow as we have to wait for finalization)
let events = event_sub.next().await.unwrap()?;
let event_block_hash = events.block_hash();
let finalized_hash = api.rpc().finalized_head().await?;
assert_eq!(event_block_hash, finalized_hash);
Ok(())
}
// Check that our subscription actually keeps producing events for
// a few blocks.
#[tokio::test]
async fn subscription_produces_events_each_block() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();
wait_for_blocks(&api).await;
let mut event_sub = api.events().subscribe().await?;
for i in 0..3 {
let events = event_sub
.next()
.await
.expect("events expected each block")?;
let success_event = events
.find_first::<system::events::ExtrinsicSuccess>()
.expect("decode error");
if success_event.is_none() {
let n = events.len();
panic!("Expected an extrinsic success event on iteration {i} (saw {n} other events)")
}
}
Ok(())
}
// Iterate all of the events in a few blocks to ensure we can decode them properly.
#[tokio::test]
async fn decoding_all_events_in_a_block_works() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();
wait_for_blocks(&api).await;
let mut event_sub = api.events().subscribe().await?;
tokio::spawn(async move {
let alice = pair_signer(AccountKeyring::Alice.pair());
let bob = AccountKeyring::Bob.to_account_id();
let transfer_tx = node_runtime::tx()
.balances()
.transfer(bob.clone().into(), 10_000);
// Make a load of transfers to get lots of events going.
for _i in 0..10 {
api.tx()
.sign_and_submit_then_watch_default(&transfer_tx, &alice)
.await
.expect("can submit_transaction");
}
});
for _ in 0..4 {
let events = event_sub
.next()
.await
.expect("events expected each block")?;
for event in events.iter() {
// make sure that we can get every event properly.
let event = event.expect("valid event decoded");
// make sure that we can decode the field values from every event.
event.field_values().expect("can decode fields");
}
}
Ok(())
}
// Check that our subscription receives events, and we can filter them based on
// it's Stream impl, and ultimately see the event we expect.
#[tokio::test]
async fn balance_transfer_subscription() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();
// Subscribe to balance transfer events, ignoring all else.
let event_sub = api
.events()
.subscribe()
.await?
.filter_events::<(balances::events::Transfer,)>();
// Calling `.next()` on the above borrows it, and the `filter_map`
// means it's no longer `Unpin`, so we pin it on the stack:
futures::pin_mut!(event_sub);
// Make a transfer:
let alice = pair_signer(AccountKeyring::Alice.pair());
let bob = AccountKeyring::Bob.to_account_id();
let transfer_tx = node_runtime::tx()
.balances()
.transfer(bob.clone().into(), 10_000);
api.tx()
.sign_and_submit_then_watch_default(&transfer_tx, &alice)
.await?;
// Wait for the next balance transfer event in our subscription stream
// and check that it lines up:
let event = event_sub.next().await.unwrap().unwrap().event;
assert_eq!(
event,
balances::events::Transfer {
from: alice.account_id().clone(),
to: bob.clone(),
amount: 10_000
}
);
Ok(())
}
// This is just a compile-time check that we can subscribe to events in
// a context that requires the event subscription/filtering to be Send-able.
// We test a typical use of EventSubscription and FilterEvents. We don't need
// to run this code; just check that it compiles.
#[allow(unused)]
async fn check_events_are_sendable() {
// check that EventSubscription can be used across await points.
tokio::task::spawn(async {
let ctx = test_context().await;
let mut event_sub = ctx.client().events().subscribe().await?;
while let Some(ev) = event_sub.next().await {
// if `event_sub` doesn't implement Send, we can't hold
// it across an await point inside of a tokio::spawn, which
// requires Send. This will lead to a compile error.
}
Ok::<_, subxt::Error>(())
});
// Check that FilterEvents can be used across await points.
tokio::task::spawn(async {
let ctx = test_context().await;
let mut event_sub = ctx
.client()
.events()
.subscribe()
.await?
.filter_events::<(balances::events::Transfer,)>();
while let Some(ev) = event_sub.next().await {
// if `event_sub` doesn't implement Send, we can't hold
// it across an await point inside of a tokio::spawn, which
// requires Send; This will lead to a compile error.
}
Ok::<_, subxt::Error>(())
});
}
-2
View File
@@ -14,8 +14,6 @@ mod blocks;
#[cfg(test)]
mod client;
#[cfg(test)]
mod events;
#[cfg(test)]
mod frame;
#[cfg(test)]
mod metadata;
@@ -11,7 +11,7 @@ use subxt::{
/// (the genesis block and another one) seems to be enough to allow tests
/// like `dry_run_passes` to work properly.
pub async fn wait_for_blocks<C: Config>(api: &impl OnlineClientT<C>) {
let mut sub = api.rpc().subscribe_blocks().await.unwrap();
let mut sub = api.rpc().subscribe_all_block_headers().await.unwrap();
sub.next().await;
sub.next().await;
}