New Event Subscription API (#442)

* Add reworked event types

* first pass implementing event subscriptions

* make clear that some methods are private

* comment tidy

* use Events in transaction stuff

* align transaction and event APIs

* remove __private_ prefixes; they are ugly

* fix examples and remove old events and subscription code

* better comments on hidden event functions

* re-add find_first_event; it's used a bunch in tests and examples

* cargo check --all-targets now passes

* Fix up existing event tests

* cargo fmt

* change todo to note

* clippy and doc niggles

* revert to find_first_event

* Add specific subscription related tests

* cargo fmt

* Update tests and add/fix examples

* cargo fmt

* add a little to subscribe_all_events example

* cargo fmt

* move an example comment

* easy access to root mod for more clarity

* add a couple of tests to ensure that events properly decoded until naff bytes

* Simplify EventSubscription Stream impl a little

* Address some PR feedback
This commit is contained in:
James Wilson
2022-02-14 11:18:16 +00:00
committed by GitHub
parent 7615b2586d
commit b1b717332e
27 changed files with 9475 additions and 3164 deletions
+5 -14
View File
@@ -20,7 +20,6 @@ pub use sp_runtime::traits::SignedExtension;
use crate::{
error::BasicError,
events::EventsDecoder,
extrinsic::{
self,
SignedExtra,
@@ -98,13 +97,10 @@ impl ClientBuilder {
.await;
let metadata = metadata?;
let events_decoder = EventsDecoder::new(metadata.clone());
Ok(Client {
rpc,
genesis_hash: genesis_hash?,
metadata: Arc::new(metadata),
events_decoder,
properties: properties.unwrap_or_else(|_| Default::default()),
runtime_version: runtime_version?,
iter_page_size: self.page_size.unwrap_or(10),
@@ -119,7 +115,6 @@ pub struct Client<T: Config> {
rpc: Rpc<T>,
genesis_hash: T::Hash,
metadata: Arc<Metadata>,
events_decoder: EventsDecoder<T>,
properties: SystemProperties,
runtime_version: RuntimeVersion,
iter_page_size: u32,
@@ -179,27 +174,23 @@ impl<T: Config> Client<T> {
pub fn to_runtime_api<R: From<Self>>(self) -> R {
self.into()
}
/// Returns the events decoder.
pub fn events_decoder(&self) -> &EventsDecoder<T> {
&self.events_decoder
}
}
/// A constructed call ready to be signed and submitted.
pub struct SubmittableExtrinsic<'client, T: Config, X, A, C, E: Decode> {
pub struct SubmittableExtrinsic<'client, T: Config, X, A, C, E: Decode, Evs: Decode> {
client: &'client Client<T>,
call: C,
marker: std::marker::PhantomData<(X, A, E)>,
marker: std::marker::PhantomData<(X, A, E, Evs)>,
}
impl<'client, T, X, A, C, E> SubmittableExtrinsic<'client, T, X, A, C, E>
impl<'client, T, X, A, C, E, Evs> SubmittableExtrinsic<'client, T, X, A, C, E, Evs>
where
T: Config,
X: SignedExtra<T>,
A: AccountData,
C: Call + Send + Sync,
E: Decode,
Evs: Decode,
{
/// Create a new [`SubmittableExtrinsic`].
pub fn new(client: &'client Client<T>, call: C) -> Self {
@@ -217,7 +208,7 @@ where
pub async fn sign_and_submit_then_watch(
self,
signer: &(dyn Signer<T, X> + Send + Sync),
) -> Result<TransactionProgress<'client, T, E>, BasicError>
) -> Result<TransactionProgress<'client, T, E, Evs>, BasicError>
where
<<X as SignedExtra<T>>::Extra as SignedExtension>::AdditionalSigned:
Send + Sync + 'static,
+884 -207
View File
File diff suppressed because it is too large Load Diff
+4 -9
View File
@@ -59,12 +59,11 @@ use derivative::Derivative;
mod client;
mod config;
mod error;
mod events;
pub mod events;
pub mod extrinsic;
mod metadata;
pub mod rpc;
pub mod storage;
mod subscription;
mod transaction;
pub use crate::{
@@ -86,8 +85,9 @@ pub use crate::{
TransactionError,
},
events::{
EventsDecoder,
RawEvent,
EventDetails,
Events,
RawEventDetails,
},
extrinsic::{
DefaultExtra,
@@ -114,11 +114,6 @@ pub use crate::{
StorageEntryKey,
StorageMapKey,
},
subscription::{
EventStorageSubscription,
EventSubscription,
FinalizedEventStorageSubscription,
},
transaction::{
TransactionEvents,
TransactionInBlock,
-34
View File
@@ -29,11 +29,6 @@ use std::{
use crate::{
error::BasicError,
storage::StorageKeyPrefix,
subscription::{
EventStorageSubscription,
FinalizedEventStorageSubscription,
SystemEvents,
},
Config,
Metadata,
};
@@ -406,35 +401,6 @@ impl<T: Config> Rpc<T> {
Ok(version)
}
/// Subscribe to System Events that are imported into blocks.
///
/// *WARNING* these may not be included in the finalized chain, use
/// `subscribe_finalized_events` to ensure events are finalized.
pub async fn subscribe_events(
&self,
) -> Result<EventStorageSubscription<T>, BasicError> {
let keys = Some(vec![StorageKey::from(SystemEvents::new())]);
let params = rpc_params![keys];
let subscription = self
.client
.subscribe("state_subscribeStorage", params, "state_unsubscribeStorage")
.await?;
Ok(EventStorageSubscription::Imported(subscription))
}
/// Subscribe to finalized events.
pub async fn subscribe_finalized_events(
&self,
) -> Result<EventStorageSubscription<T>, BasicError> {
Ok(EventStorageSubscription::Finalized(
FinalizedEventStorageSubscription::new(
self.clone(),
self.subscribe_finalized_blocks().await?,
),
))
}
/// Subscribe to blocks.
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, BasicError> {
let subscription = self
-481
View File
@@ -1,481 +0,0 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is part of subxt.
//
// subxt is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// subxt is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.
use crate::{
error::BasicError,
events::{
EventsDecoder,
RawEvent,
},
rpc::Rpc,
Config,
Event,
Phase,
};
use jsonrpsee::core::{
client::Subscription,
DeserializeOwned,
};
use sp_core::{
storage::{
StorageChangeSet,
StorageKey,
},
twox_128,
};
use sp_runtime::traits::Header;
use std::collections::VecDeque;
/// Raw bytes for an Event, including the block hash where it occurred and its
/// corresponding event index.
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Clone))]
pub struct EventContext<Hash> {
pub block_hash: Hash,
pub event_idx: usize,
pub event: RawEvent,
}
/// Event subscription simplifies filtering a storage change set stream for
/// events of interest.
pub struct EventSubscription<'a, T: Config> {
block_reader: BlockReader<'a, T>,
block: Option<T::Hash>,
extrinsic: Option<usize>,
event: Option<(&'static str, &'static str)>,
events: VecDeque<EventContext<T::Hash>>,
finished: bool,
}
enum BlockReader<'a, T: Config> {
Decoder {
subscription: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
},
/// Mock event listener for unit tests
#[cfg(test)]
Mock(
Box<
dyn Iterator<
Item = (T::Hash, Result<Vec<(Phase, usize, RawEvent)>, BasicError>),
>,
>,
),
}
impl<'a, T: Config> BlockReader<'a, T> {
async fn next(
&mut self,
) -> Option<(T::Hash, Result<Vec<(Phase, usize, RawEvent)>, BasicError>)> {
match self {
BlockReader::Decoder {
subscription,
decoder,
} => {
let change_set = subscription.next().await?;
let events: Result<Vec<_>, _> = change_set
.changes
.into_iter()
.filter_map(|(_key, change)| {
Some(decoder.decode_events(&mut change?.0.as_slice()))
})
.collect();
let flattened_events = events.map(|x| {
x.into_iter()
.flatten()
.enumerate()
.map(|(event_idx, (phase, raw))| (phase, event_idx, raw))
.collect()
});
Some((change_set.block, flattened_events))
}
#[cfg(test)]
BlockReader::Mock(it) => it.next(),
}
}
}
impl<'a, T: Config> EventSubscription<'a, T> {
/// Creates a new event subscription.
pub fn new(
subscription: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
) -> Self {
Self {
block_reader: BlockReader::Decoder {
subscription,
decoder,
},
block: None,
extrinsic: None,
event: None,
events: Default::default(),
finished: false,
}
}
/// Only returns events contained in the block with the given hash.
pub fn filter_block(&mut self, block: T::Hash) {
self.block = Some(block);
}
/// Only returns events from block emitted by extrinsic with index.
pub fn filter_extrinsic(&mut self, block: T::Hash, ext_index: usize) {
self.block = Some(block);
self.extrinsic = Some(ext_index);
}
/// Filters events by type.
pub fn filter_event<Ev: Event>(&mut self) {
self.event = Some((Ev::PALLET, Ev::EVENT));
}
/// Gets the next event.
pub async fn next(&mut self) -> Option<Result<RawEvent, BasicError>> {
self.next_context()
.await
.map(|res| res.map(|ctx| ctx.event))
}
/// Gets the next event with the associated block hash and its corresponding
/// event index.
pub async fn next_context(
&mut self,
) -> Option<Result<EventContext<T::Hash>, BasicError>> {
loop {
if let Some(raw_event) = self.events.pop_front() {
return Some(Ok(raw_event))
}
if self.finished {
return None
}
// always return None if subscription has closed
let (received_hash, events) = self.block_reader.next().await?;
if let Some(hash) = self.block.as_ref() {
if &received_hash == hash {
self.finished = true;
} else {
continue
}
}
match events {
Err(err) => return Some(Err(err)),
Ok(raw_events) => {
for (phase, event_idx, raw) in raw_events {
if let Some(ext_index) = self.extrinsic {
if !matches!(phase, Phase::ApplyExtrinsic(i) if i as usize == ext_index)
{
continue
}
}
if let Some((module, variant)) = self.event {
if raw.pallet != module || raw.variant != variant {
continue
}
}
self.events.push_back(EventContext {
block_hash: received_hash,
event_idx,
event: raw,
});
}
}
}
}
}
}
pub(crate) struct SystemEvents(StorageKey);
impl SystemEvents {
pub(crate) fn new() -> Self {
let mut storage_key = twox_128(b"System").to_vec();
storage_key.extend(twox_128(b"Events").to_vec());
log::debug!("Events storage key {:?}", hex::encode(&storage_key));
Self(StorageKey(storage_key))
}
}
impl From<SystemEvents> for StorageKey {
fn from(key: SystemEvents) -> Self {
key.0
}
}
/// Event subscription to only fetch finalized storage changes.
pub struct FinalizedEventStorageSubscription<T: Config> {
rpc: Rpc<T>,
subscription: Subscription<T::Header>,
storage_changes: VecDeque<StorageChangeSet<T::Hash>>,
storage_key: StorageKey,
}
impl<T: Config> FinalizedEventStorageSubscription<T> {
/// Creates a new finalized event storage subscription.
pub fn new(rpc: Rpc<T>, subscription: Subscription<T::Header>) -> Self {
Self {
rpc,
subscription,
storage_changes: Default::default(),
storage_key: SystemEvents::new().into(),
}
}
/// Gets the next change_set.
pub async fn next(&mut self) -> Option<StorageChangeSet<T::Hash>> {
loop {
if let Some(storage_change) = self.storage_changes.pop_front() {
return Some(storage_change)
}
let header: T::Header =
read_subscription_response("HeaderSubscription", &mut self.subscription)
.await?;
self.storage_changes.extend(
self.rpc
.query_storage_at(&[self.storage_key.clone()], Some(header.hash()))
.await
.ok()?,
);
}
}
}
/// Wrapper over imported and finalized event subscriptions.
pub enum EventStorageSubscription<T: Config> {
/// Events that are InBlock
Imported(Subscription<StorageChangeSet<T::Hash>>),
/// Events that are Finalized
Finalized(FinalizedEventStorageSubscription<T>),
}
impl<T: Config> EventStorageSubscription<T> {
/// Gets the next change_set from the subscription.
pub async fn next(&mut self) -> Option<StorageChangeSet<T::Hash>> {
match self {
Self::Imported(event_sub) => {
read_subscription_response("StorageChangeSetSubscription", event_sub)
.await
}
Self::Finalized(event_sub) => event_sub.next().await,
}
}
}
async fn read_subscription_response<T>(
sub_name: &str,
sub: &mut Subscription<T>,
) -> Option<T>
where
T: DeserializeOwned,
{
match sub.next().await {
Some(Ok(next)) => Some(next),
Some(Err(e)) => {
log::error!("Subscription {} failed: {:?} dropping", sub_name, e);
None
}
None => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::DefaultConfig;
use sp_core::H256;
fn named_event(event_name: &str) -> RawEvent {
RawEvent {
data: sp_core::Bytes::from(Vec::new()),
pallet: event_name.to_string(),
variant: event_name.to_string(),
pallet_index: 0,
variant_index: 0,
}
}
#[async_std::test]
/// test that filters work correctly, and are independent of each other
async fn test_filters() {
let mut events: Vec<(H256, Phase, usize, RawEvent)> = vec![];
// create all events
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
for phase in [
Phase::Initialization,
Phase::ApplyExtrinsic(0),
Phase::ApplyExtrinsic(1),
Phase::Finalization,
] {
[named_event("a"), named_event("b")]
.iter()
.enumerate()
.for_each(|(idx, event)| {
events.push((
block_hash,
phase.clone(),
// The event index
idx,
event.clone(),
))
});
}
}
// set variant index so we can uniquely identify the event
events.iter_mut().enumerate().for_each(|(idx, event)| {
event.3.variant_index = idx as u8;
});
let half_len = events.len() / 2;
for block_filter in [None, Some(H256::from([1; 32]))] {
for extrinsic_filter in [None, Some(1)] {
for event_filter in [None, Some(("b", "b"))] {
let mut subscription: EventSubscription<DefaultConfig> =
EventSubscription {
block_reader: BlockReader::Mock(Box::new(
vec![
(
events[0].0,
Ok(events
.iter()
.take(half_len)
.map(|(_, phase, idx, event)| {
(phase.clone(), *idx, event.clone())
})
.collect()),
),
(
events[half_len].0,
Ok(events
.iter()
.skip(half_len)
.map(|(_, phase, idx, event)| {
(phase.clone(), *idx, event.clone())
})
.collect()),
),
]
.into_iter(),
)),
block: block_filter,
extrinsic: extrinsic_filter,
event: event_filter,
events: Default::default(),
finished: false,
};
let mut expected_events: Vec<(H256, Phase, usize, RawEvent)> =
events.clone();
if let Some(hash) = block_filter {
expected_events.retain(|(h, _, _, _)| h == &hash);
}
if let Some(idx) = extrinsic_filter {
expected_events.retain(|(_, phase, _, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
}
if let Some(name) = event_filter {
expected_events.retain(|(_, _, _, event)| event.pallet == name.0);
}
for expected_event in expected_events {
assert_eq!(
subscription.next().await.unwrap().unwrap(),
expected_event.3
);
}
assert!(subscription.next().await.is_none());
}
}
}
}
#[async_std::test]
async fn test_context() {
let mut events = vec![];
// create all events
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
for phase in [
Phase::Initialization,
Phase::ApplyExtrinsic(0),
Phase::ApplyExtrinsic(1),
Phase::Finalization,
] {
[named_event("a"), named_event("b")]
.iter()
.enumerate()
.for_each(|(idx, event)| {
events.push((
phase.clone(),
EventContext {
block_hash,
event_idx: idx,
event: event.clone(),
},
));
});
}
}
// set variant index so we can uniquely identify the event
events.iter_mut().enumerate().for_each(|(idx, (_, ctx))| {
ctx.event.variant_index = idx as u8;
});
let half_len = events.len() / 2;
let mut subscription: EventSubscription<DefaultConfig> = EventSubscription {
block_reader: BlockReader::Mock(Box::new(
vec![
(
events[0].1.block_hash,
Ok(events
.iter()
.take(half_len)
.map(|(phase, ctx)| {
(phase.clone(), ctx.event_idx, ctx.event.clone())
})
.collect()),
),
(
events[half_len].1.block_hash,
Ok(events
.iter()
.skip(half_len)
.map(|(phase, ctx)| {
(phase.clone(), ctx.event_idx, ctx.event.clone())
})
.collect()),
),
]
.into_iter(),
)),
block: None,
extrinsic: None,
event: None,
events: Default::default(),
finished: false,
};
let expected_events = events.clone();
for exp in expected_events {
assert_eq!(subscription.next_context().await.unwrap().unwrap(), exp.1);
}
assert!(subscription.next().await.is_none());
}
}
+99 -82
View File
@@ -18,7 +18,6 @@ use std::task::Poll;
use crate::PhantomDataSendSync;
use codec::Decode;
use sp_core::storage::StorageKey;
use sp_runtime::traits::Hash;
pub use sp_runtime::traits::SignedExtension;
pub use sp_version::RuntimeVersion;
@@ -31,8 +30,13 @@ use crate::{
RuntimeError,
TransactionError,
},
events::{
self,
EventDetails,
Events,
RawEventDetails,
},
rpc::SubstrateTransactionStatus,
subscription::SystemEvents,
Config,
Phase,
};
@@ -50,19 +54,22 @@ use jsonrpsee::core::{
/// returned from [`crate::SubmittableExtrinsic::sign_and_submit_then_watch()`].
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct TransactionProgress<'client, T: Config, E: Decode> {
pub struct TransactionProgress<'client, T: Config, E: Decode, Evs: Decode> {
sub: Option<RpcSubscription<SubstrateTransactionStatus<T::Hash, T::Hash>>>,
ext_hash: T::Hash,
client: &'client Client<T>,
_error: PhantomDataSendSync<E>,
_error: PhantomDataSendSync<(E, Evs)>,
}
// The above type is not `Unpin` by default unless the generic param `T` is,
// so we manually make it clear that Unpin is actually fine regardless of `T`
// (we don't care if this moves around in memory while it's "pinned").
impl<'client, T: Config, E: Decode> Unpin for TransactionProgress<'client, T, E> {}
impl<'client, T: Config, E: Decode, Evs: Decode> Unpin
for TransactionProgress<'client, T, E, Evs>
{
}
impl<'client, T: Config, E: Decode> TransactionProgress<'client, T, E> {
impl<'client, T: Config, E: Decode, Evs: Decode> TransactionProgress<'client, T, E, Evs> {
/// Instantiate a new [`TransactionProgress`] from a custom subscription.
pub fn new(
sub: RpcSubscription<SubstrateTransactionStatus<T::Hash, T::Hash>>,
@@ -82,7 +89,7 @@ impl<'client, T: Config, E: Decode> TransactionProgress<'client, T, E> {
/// avoid importing that trait if you don't otherwise need it.
pub async fn next_item(
&mut self,
) -> Option<Result<TransactionStatus<'client, T, E>, BasicError>> {
) -> Option<Result<TransactionStatus<'client, T, E, Evs>, BasicError>> {
self.next().await
}
@@ -99,7 +106,7 @@ impl<'client, T: Config, E: Decode> TransactionProgress<'client, T, E> {
/// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_in_block(
mut self,
) -> Result<TransactionInBlock<'client, T, E>, BasicError> {
) -> Result<TransactionInBlock<'client, T, E, Evs>, BasicError> {
while let Some(status) = self.next_item().await {
match status? {
// Finalized or otherwise in a block! Return.
@@ -129,7 +136,7 @@ impl<'client, T: Config, E: Decode> TransactionProgress<'client, T, E> {
/// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_finalized(
mut self,
) -> Result<TransactionInBlock<'client, T, E>, BasicError> {
) -> Result<TransactionInBlock<'client, T, E, Evs>, BasicError> {
while let Some(status) = self.next_item().await {
match status? {
// Finalized! Return.
@@ -158,14 +165,16 @@ impl<'client, T: Config, E: Decode> TransactionProgress<'client, T, E> {
/// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_finalized_success(
self,
) -> Result<TransactionEvents<T>, Error<E>> {
) -> Result<TransactionEvents<'client, T, Evs>, Error<E>> {
let evs = self.wait_for_finalized().await?.wait_for_success().await?;
Ok(evs)
}
}
impl<'client, T: Config, E: Decode> Stream for TransactionProgress<'client, T, E> {
type Item = Result<TransactionStatus<'client, T, E>, BasicError>;
impl<'client, T: Config, E: Decode, Evs: Decode> Stream
for TransactionProgress<'client, T, E, Evs>
{
type Item = Result<TransactionStatus<'client, T, E, Evs>, BasicError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
@@ -274,7 +283,7 @@ impl<'client, T: Config, E: Decode> Stream for TransactionProgress<'client, T, E
/// or that finality gadget is lagging behind.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub enum TransactionStatus<'client, T: Config, E: Decode> {
pub enum TransactionStatus<'client, T: Config, E: Decode, Evs: Decode> {
/// The transaction is part of the "future" queue.
Future,
/// The transaction is part of the "ready" queue.
@@ -282,7 +291,7 @@ pub enum TransactionStatus<'client, T: Config, E: Decode> {
/// The transaction has been broadcast to the given peers.
Broadcast(Vec<String>),
/// The transaction has been included in a block with given hash.
InBlock(TransactionInBlock<'client, T, E>),
InBlock(TransactionInBlock<'client, T, E, Evs>),
/// The block this transaction was included in has been retracted,
/// probably because it did not make it onto the blocks which were
/// finalized.
@@ -291,7 +300,7 @@ pub enum TransactionStatus<'client, T: Config, E: Decode> {
/// blocks, and so the subscription has ended.
FinalityTimeout(T::Hash),
/// The transaction has been finalized by a finality-gadget, e.g GRANDPA.
Finalized(TransactionInBlock<'client, T, E>),
Finalized(TransactionInBlock<'client, T, E, Evs>),
/// The transaction has been replaced in the pool by another transaction
/// that provides the same tags. (e.g. same (sender, nonce)).
Usurped(T::Hash),
@@ -301,10 +310,10 @@ pub enum TransactionStatus<'client, T: Config, E: Decode> {
Invalid,
}
impl<'client, T: Config, E: Decode> TransactionStatus<'client, T, E> {
impl<'client, T: Config, E: Decode, Evs: Decode> TransactionStatus<'client, T, E, Evs> {
/// A convenience method to return the `Finalized` details. Returns
/// [`None`] if the enum variant is not [`TransactionStatus::Finalized`].
pub fn as_finalized(&self) -> Option<&TransactionInBlock<'client, T, E>> {
pub fn as_finalized(&self) -> Option<&TransactionInBlock<'client, T, E, Evs>> {
match self {
Self::Finalized(val) => Some(val),
_ => None,
@@ -313,7 +322,7 @@ impl<'client, T: Config, E: Decode> TransactionStatus<'client, T, E> {
/// A convenience method to return the `InBlock` details. Returns
/// [`None`] if the enum variant is not [`TransactionStatus::InBlock`].
pub fn as_in_block(&self) -> Option<&TransactionInBlock<'client, T, E>> {
pub fn as_in_block(&self) -> Option<&TransactionInBlock<'client, T, E, Evs>> {
match self {
Self::InBlock(val) => Some(val),
_ => None,
@@ -324,14 +333,14 @@ impl<'client, T: Config, E: Decode> TransactionStatus<'client, T, E> {
/// This struct represents a transaction that has made it into a block.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct TransactionInBlock<'client, T: Config, E: Decode> {
pub struct TransactionInBlock<'client, T: Config, E: Decode, Evs: Decode> {
block_hash: T::Hash,
ext_hash: T::Hash,
client: &'client Client<T>,
_error: PhantomDataSendSync<E>,
_error: PhantomDataSendSync<(E, Evs)>,
}
impl<'client, T: Config, E: Decode> TransactionInBlock<'client, T, E> {
impl<'client, T: Config, E: Decode, Evs: Decode> TransactionInBlock<'client, T, E, Evs> {
pub(crate) fn new(
block_hash: T::Hash,
ext_hash: T::Hash,
@@ -368,11 +377,14 @@ impl<'client, T: Config, E: Decode> TransactionInBlock<'client, T, E> {
///
/// **Note:** This has to download block details from the node and decode events
/// from them.
pub async fn wait_for_success(&self) -> Result<TransactionEvents<T>, Error<E>> {
pub async fn wait_for_success(
&self,
) -> Result<TransactionEvents<'client, T, Evs>, Error<E>> {
let events = self.fetch_events().await?;
// Try to find any errors; return the first one we encounter.
for ev in events.as_slice() {
for ev in events.iter_raw() {
let ev = ev?;
if &ev.pallet == "System" && &ev.variant == "ExtrinsicFailed" {
let dispatch_error = E::decode(&mut &*ev.data)?;
return Err(Error::Runtime(RuntimeError(dispatch_error)))
@@ -388,7 +400,9 @@ impl<'client, T: Config, E: Decode> TransactionInBlock<'client, T, E> {
///
/// **Note:** This has to download block details from the node and decode events
/// from them.
pub async fn fetch_events(&self) -> Result<TransactionEvents<T>, BasicError> {
pub async fn fetch_events(
&self,
) -> Result<TransactionEvents<'client, T, Evs>, BasicError> {
let block = self
.client
.rpc()
@@ -406,31 +420,11 @@ impl<'client, T: Config, E: Decode> TransactionInBlock<'client, T, E> {
// extrinsic, the extrinsic should be in there somewhere..
.ok_or(BasicError::Transaction(TransactionError::BlockHashNotFound))?;
let raw_events = self
.client
.rpc()
.storage(
&StorageKey::from(SystemEvents::new()),
Some(self.block_hash),
)
.await?
.map(|s| s.0)
.unwrap_or_else(Vec::new);
let events = self
.client
.events_decoder()
.decode_events(&mut &*raw_events)?
.into_iter()
.filter(move |(phase, _raw)| {
phase == &Phase::ApplyExtrinsic(extrinsic_idx as u32)
})
.map(|(_phase, event)| event)
.collect();
let events = events::at::<T, Evs>(self.client, self.block_hash).await?;
Ok(TransactionEvents {
block_hash: self.block_hash,
ext_hash: self.ext_hash,
ext_idx: extrinsic_idx as u32,
events,
})
}
@@ -440,16 +434,16 @@ impl<'client, T: Config, E: Decode> TransactionInBlock<'client, T, E> {
/// We can iterate over the events, or look for a specific one.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct TransactionEvents<T: Config> {
block_hash: T::Hash,
pub struct TransactionEvents<'client, T: Config, Evs: Decode> {
ext_hash: T::Hash,
events: Vec<crate::RawEvent>,
ext_idx: u32,
events: Events<'client, T, Evs>,
}
impl<T: Config> TransactionEvents<T> {
impl<'client, T: Config, Evs: Decode> TransactionEvents<'client, T, Evs> {
/// Return the hash of the block that the transaction has made it into.
pub fn block_hash(&self) -> T::Hash {
self.block_hash
self.events.block_hash()
}
/// Return the hash of the extrinsic.
@@ -457,43 +451,66 @@ impl<T: Config> TransactionEvents<T> {
self.ext_hash
}
/// Return a slice of the returned events.
pub fn as_slice(&self) -> &[crate::RawEvent] {
/// Return all of the events in the block that the transaction made it into.
pub fn all_events_in_block(&self) -> &events::Events<'client, T, Evs> {
&self.events
}
/// Find all of the events matching the event type provided as a generic parameter. This
/// will return an error if a matching event is found but cannot be properly decoded.
pub fn find_events<Ev: crate::Event>(&self) -> Result<Vec<Ev>, BasicError> {
self.events
.iter()
.filter_map(|e| e.as_event::<Ev>().map_err(Into::into).transpose())
.collect()
}
/// Find the first event that matches the event type provided as a generic parameter. This
/// will return an error if a matching event is found but cannot be properly decoded.
/// Iterate over the statically decoded events associated with this transaction.
///
/// Use [`TransactionEvents::find_events`], or iterate over [`TransactionEvents`] yourself
/// if you'd like to handle multiple events of the same type.
/// This works in the same way that [`events::Events::iter()`] does, with the
/// exception that it filters out events not related to the submitted extrinsic.
pub fn iter(
&self,
) -> impl Iterator<Item = Result<EventDetails<Evs>, BasicError>> + '_ {
self.events.iter().filter(|ev| {
ev.as_ref()
.map(|ev| ev.phase == Phase::ApplyExtrinsic(self.ext_idx))
.unwrap_or(true) // Keep any errors
})
}
/// Iterate over all of the raw events associated with this transaction.
///
/// This works in the same way that [`events::Events::iter_raw()`] does, with the
/// exception that it filters out events not related to the submitted extrinsic.
pub fn iter_raw(
&self,
) -> impl Iterator<Item = Result<RawEventDetails, BasicError>> + '_ {
self.events.iter_raw().filter(|ev| {
ev.as_ref()
.map(|ev| ev.phase == Phase::ApplyExtrinsic(self.ext_idx))
.unwrap_or(true) // Keep any errors.
})
}
/// Find all of the transaction events matching the event type provided as a generic parameter.
///
/// This works in the same way that [`events::Events::find()`] does, with the
/// exception that it filters out events not related to the submitted extrinsic.
pub fn find<Ev: crate::Event>(
&self,
) -> impl Iterator<Item = Result<Ev, BasicError>> + '_ {
self.iter_raw().filter_map(|ev| {
ev.and_then(|ev| ev.as_event::<Ev>().map_err(Into::into))
.transpose()
})
}
/// Iterate through the transaction events using metadata to dynamically decode and skip
/// them, and return the first event found which decodes to the provided `Ev` type.
///
/// This works in the same way that [`events::Events::find_first_event()`] does, with the
/// exception that it ignores events not related to the submitted extrinsic.
pub fn find_first_event<Ev: crate::Event>(&self) -> Result<Option<Ev>, BasicError> {
self.events
.iter()
.filter_map(|e| e.as_event::<Ev>().transpose())
.next()
.transpose()
.map_err(Into::into)
self.find::<Ev>().next().transpose()
}
/// Find an event. Returns true if it was found.
pub fn has_event<Ev: crate::Event>(&self) -> Result<bool, BasicError> {
Ok(self.find_first_event::<Ev>()?.is_some())
}
}
impl<T: Config> std::ops::Deref for TransactionEvents<T> {
type Target = [crate::RawEvent];
fn deref(&self) -> &Self::Target {
&self.events
/// Find an event in those associated with this transaction. Returns true if it was found.
///
/// This works in the same way that [`events::Events::has()`] does, with the
/// exception that it ignores events not related to the submitted extrinsic.
pub fn has<Ev: crate::Event>(&self) -> Result<bool, BasicError> {
Ok(self.find::<Ev>().next().transpose()?.is_some())
}
}