mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 22:11:06 +00:00
e48f0e3b1d
* WIP API changes * debug impls * Get main crate compiling with first round of changes * Some tidy up * Add WithExtrinsicParams, and have SubstrateConfig + PolkadotConfig, not DefaultConfig * move transaction into extrinsic folder * Add runtime updates back to OnlineClient * rework to be 'client first' to fit better with storage + events * add support for events to Client * tidy dupe trait bound * Wire storage into client, but need to remove static reliance * various tidy up and start stripping codegen to remove bits we dont need now * First pass updating calls and constants codegen * WIP storage client updates * First pass migrated runtime storage over to new format * pass over codegen to generate StorageAddresses and throw other stuff out * don't need a Call trait any more * shuffle things around a bit * Various proc_macro fixes to get 'cargo check' working * organise what's exposed from subxt * Get first example working; balance_transfer_with_params * get balance_transfer example compiling * get concurrent_storage_requests.rs example compiling * get fetch_all_accounts example compiling * get a bunch more of the examples compiling * almost get final example working; type mismatch to look into * wee tweaks * move StorageAddress to separate file * pass Defaultable/Iterable info to StorageAddress in codegen * fix storage validation ne, and partial run through example code * Remove static iteration and strip a generic param from everything * fix doc tests in subxt crate * update test utils and start fixing frame tests * fix frame staking tests * fix the rest of the test compile issues, Borrow on storage values * cargo fmt * remove extra logging during tests * Appease clippy and no more need for into_iter on events * cargo fmt * fix dryRun tests by waiting for blocks * wait for blocks instead of sleeping or other test hacks * cargo fmt * Fix doc links * Traitify StorageAddress * remove out-of-date doc comments * optimise decoding storage a little * cleanup tx stuff, trait for TxPayload, remove Err type param and decode at runtime * clippy fixes * fix doc links * fix doc example * constant address trait for consistency * fix a typo and remove EncodeWithMetadata stuff * Put EventDetails behind a proper interface and allow decoding into top level event, too * fix docs * tweak StorageAddress docs * re-export StorageAddress at root for consistency * fix clippy things * Add support for dynamic values * fix double encoding of storage map key after refactor * clippy fix * Fixes and add a dynamic usage example (needs new scale_value release) * bump scale_value version * cargo fmt * Tweak event bits * cargo fmt * Add a test and bump scale-value to 0.4.0 to support this * remove unnecessary vec from dynamic example * Various typo/grammar fixes Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> * Address PR nits * Undo accidental rename in changelog * Small PR nits/tidyups * fix tests; codegen change against latest substrate * tweak storage address util names * move error decoding to DecodeError and expose * impl some basic traits on the extrinsic param builder Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
250 lines
7.5 KiB
Rust
250 lines
7.5 KiB
Rust
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
|
|
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
|
// see LICENSE for license details.
|
|
|
|
use crate::{
|
|
client::OnlineClientT,
|
|
error::Error,
|
|
events::{
|
|
EventSub,
|
|
EventSubscription,
|
|
Events,
|
|
FinalizedEventSub,
|
|
},
|
|
Config,
|
|
};
|
|
use derivative::Derivative;
|
|
use futures::{
|
|
future::Either,
|
|
stream,
|
|
Stream,
|
|
StreamExt,
|
|
};
|
|
use sp_core::{
|
|
storage::StorageKey,
|
|
twox_128,
|
|
};
|
|
use sp_runtime::traits::Header;
|
|
use std::future::Future;
|
|
|
|
/// A client for working with events.
|
|
#[derive(Derivative)]
|
|
#[derivative(Clone(bound = "Client: Clone"))]
|
|
pub struct EventsClient<T, Client> {
|
|
client: Client,
|
|
_marker: std::marker::PhantomData<T>,
|
|
}
|
|
|
|
impl<T, Client> EventsClient<T, Client> {
|
|
/// Create a new [`EventsClient`].
|
|
pub fn new(client: Client) -> Self {
|
|
Self {
|
|
client,
|
|
_marker: std::marker::PhantomData,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T, Client> EventsClient<T, Client>
|
|
where
|
|
T: Config,
|
|
Client: OnlineClientT<T>,
|
|
{
|
|
/// Obtain events at some block hash.
|
|
pub fn at(
|
|
&self,
|
|
block_hash: Option<T::Hash>,
|
|
) -> impl Future<Output = Result<Events<T>, Error>> + Send + 'static {
|
|
// Clone and pass the client in like this so that we can explicitly
|
|
// return a Future that's Send + 'static, rather than tied to &self.
|
|
let client = self.client.clone();
|
|
async move { at(client, block_hash).await }
|
|
}
|
|
|
|
/// Subscribe to all events from blocks.
|
|
///
|
|
/// **Note:** these blocks haven't necessarily been finalised yet; prefer
|
|
/// [`EventsClient::subscribe_finalized()`] if that is important.
|
|
///
|
|
/// # Example
|
|
///
|
|
/// ```no_run
|
|
/// # #[tokio::main]
|
|
/// # async fn main() {
|
|
/// use futures::StreamExt;
|
|
/// use subxt::{ OnlineClient, PolkadotConfig };
|
|
///
|
|
/// let api = OnlineClient::<PolkadotConfig>::new().await.unwrap();
|
|
///
|
|
/// let mut events = api.events().subscribe().await.unwrap();
|
|
///
|
|
/// while let Some(ev) = events.next().await {
|
|
/// // Obtain all events from this block.
|
|
/// let ev = ev.unwrap();
|
|
/// // Print block hash.
|
|
/// println!("Event at block hash {:?}", ev.block_hash());
|
|
/// // Iterate over all events.
|
|
/// let mut iter = ev.iter();
|
|
/// while let Some(event_details) = iter.next() {
|
|
/// println!("Event details {:?}", event_details);
|
|
/// }
|
|
/// }
|
|
/// # }
|
|
/// ```
|
|
pub fn subscribe(
|
|
&self,
|
|
) -> impl Future<
|
|
Output = Result<EventSubscription<T, Client, EventSub<T::Header>>, Error>,
|
|
> + Send
|
|
+ 'static {
|
|
let client = self.client.clone();
|
|
async move { subscribe(client).await }
|
|
}
|
|
|
|
/// Subscribe to events from finalized blocks. See [`EventsClient::subscribe()`] for details.
|
|
pub fn subscribe_finalized(
|
|
&self,
|
|
) -> impl Future<
|
|
Output = Result<
|
|
EventSubscription<T, Client, FinalizedEventSub<T::Header>>,
|
|
Error,
|
|
>,
|
|
> + Send
|
|
+ 'static
|
|
where
|
|
Client: Send + Sync + 'static,
|
|
{
|
|
let client = self.client.clone();
|
|
async move { subscribe_finalized(client).await }
|
|
}
|
|
}
|
|
|
|
async fn at<T, Client>(
|
|
client: Client,
|
|
block_hash: Option<T::Hash>,
|
|
) -> Result<Events<T>, Error>
|
|
where
|
|
T: Config,
|
|
Client: OnlineClientT<T>,
|
|
{
|
|
// If block hash is not provided, get the hash
|
|
// for the latest block and use that.
|
|
let block_hash = match block_hash {
|
|
Some(hash) => hash,
|
|
None => {
|
|
client
|
|
.rpc()
|
|
.block_hash(None)
|
|
.await?
|
|
.expect("didn't pass a block number; qed")
|
|
}
|
|
};
|
|
|
|
let event_bytes = client
|
|
.rpc()
|
|
.storage(&*system_events_key().0, Some(block_hash))
|
|
.await?
|
|
.map(|e| e.0)
|
|
.unwrap_or_else(Vec::new);
|
|
|
|
Ok(Events::new(client.metadata(), block_hash, event_bytes))
|
|
}
|
|
|
|
async fn subscribe<T, Client>(
|
|
client: Client,
|
|
) -> Result<EventSubscription<T, Client, EventSub<T::Header>>, Error>
|
|
where
|
|
T: Config,
|
|
Client: OnlineClientT<T>,
|
|
{
|
|
let block_subscription = client.rpc().subscribe_blocks().await?;
|
|
Ok(EventSubscription::new(client, block_subscription))
|
|
}
|
|
|
|
/// Subscribe to events from finalized blocks.
|
|
async fn subscribe_finalized<T, Client>(
|
|
client: Client,
|
|
) -> Result<EventSubscription<T, Client, FinalizedEventSub<T::Header>>, Error>
|
|
where
|
|
T: Config,
|
|
Client: OnlineClientT<T>,
|
|
{
|
|
// fetch the last finalised block details immediately, so that we'll get
|
|
// events for each block after this one.
|
|
let last_finalized_block_hash = client.rpc().finalized_head().await?;
|
|
let last_finalized_block_number = client
|
|
.rpc()
|
|
.header(Some(last_finalized_block_hash))
|
|
.await?
|
|
.map(|h| (*h.number()).into());
|
|
|
|
let sub = client.rpc().subscribe_finalized_blocks().await?;
|
|
|
|
// Fill in any gaps between the block above and the finalized blocks reported.
|
|
let block_subscription = subscribe_to_block_headers_filling_in_gaps(
|
|
client.clone(),
|
|
last_finalized_block_number,
|
|
sub,
|
|
);
|
|
|
|
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
|
|
}
|
|
|
|
/// Note: This is exposed for testing but is not considered stable and may change
|
|
/// without notice in a patch release.
|
|
#[doc(hidden)]
|
|
pub fn subscribe_to_block_headers_filling_in_gaps<T, Client, S, E>(
|
|
client: Client,
|
|
mut last_block_num: Option<u64>,
|
|
sub: S,
|
|
) -> impl Stream<Item = Result<T::Header, Error>> + Send
|
|
where
|
|
T: Config,
|
|
Client: OnlineClientT<T> + Send + Sync,
|
|
S: Stream<Item = Result<T::Header, E>> + Send,
|
|
E: Into<Error> + Send + 'static,
|
|
{
|
|
sub.flat_map(move |s| {
|
|
let client = client.clone();
|
|
|
|
// Get the header, or return a stream containing just the error. Our EventSubscription
|
|
// stream will return `None` as soon as it hits an error like this.
|
|
let header = match s {
|
|
Ok(header) => header,
|
|
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
|
|
};
|
|
|
|
// We want all previous details up to, but not including this current block num.
|
|
let end_block_num = (*header.number()).into();
|
|
|
|
// This is one after the last block we returned details for last time.
|
|
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);
|
|
|
|
// Iterate over all of the previous blocks we need headers for, ignoring the current block
|
|
// (which we already have the header info for):
|
|
let previous_headers = stream::iter(start_block_num..end_block_num)
|
|
.then(move |n| {
|
|
let client = client.clone();
|
|
async move {
|
|
let hash = client.rpc().block_hash(Some(n.into())).await?;
|
|
let header = client.rpc().header(hash).await?;
|
|
Ok::<_, Error>(header)
|
|
}
|
|
})
|
|
.filter_map(|h| async { h.transpose() });
|
|
|
|
// On the next iteration, we'll get details starting just after this end block.
|
|
last_block_num = Some(end_block_num);
|
|
|
|
// Return a combination of any previous headers plus the new header.
|
|
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
|
|
})
|
|
}
|
|
|
|
// The storage key needed to access events.
|
|
fn system_events_key() -> StorageKey {
|
|
let mut storage_key = twox_128(b"System").to_vec();
|
|
storage_key.extend(twox_128(b"Events").to_vec());
|
|
StorageKey(storage_key)
|
|
}
|