Filter one or multiple events by type from an EventSubscription (#461)

* Split events.rs into multiple files and start work on FilterEvents

* First pass filtering event(s)

* Tweak event examples to show filter_events

* cargo clippy + fmt

* consistify and tidy

* cargo fmt

* Tweak a couple of comments

* Expose phase and block_hash of filtered events, too

* cargo fmt

* expose FilteredEventDetails

* Add docs

* cargo clippy

* remove FilterEvents knowledge of EventSubscription so it's easier to unit test

* unit test filter_events

* tweak an integration test to use filter_events

* cargo fmt

* cargo clippy

* Tweak a comment

Co-authored-by: David <dvdplm@gmail.com>

Co-authored-by: David <dvdplm@gmail.com>
This commit is contained in:
James Wilson
2022-03-01 10:42:05 +00:00
committed by GitHub
parent 70d83feaba
commit 13347362d5
14 changed files with 1343 additions and 655 deletions
+5 -5
View File
@@ -67,7 +67,7 @@ async fn simple_transfer() -> Result<(), Box<dyn std::error::Error>> {
.await?;
let transfer_event =
balance_transfer.find_first_event::<polkadot::balances::events::Transfer>()?;
balance_transfer.find_first::<polkadot::balances::events::Transfer>()?;
if let Some(event) = transfer_event {
println!("Balance transfer success: {event:?}");
@@ -108,7 +108,7 @@ async fn simple_transfer_separate_events() -> Result<(), Box<dyn std::error::Err
let events = balance_transfer.fetch_events().await?;
let failed_event =
events.find_first_event::<polkadot::system::events::ExtrinsicFailed>()?;
events.find_first::<polkadot::system::events::ExtrinsicFailed>()?;
if let Some(_ev) = failed_event {
// We found a failed event; the transfer didn't succeed.
@@ -117,7 +117,7 @@ async fn simple_transfer_separate_events() -> Result<(), Box<dyn std::error::Err
// We didn't find a failed event; the transfer succeeded. Find
// more details about it to report..
let transfer_event =
events.find_first_event::<polkadot::balances::events::Transfer>()?;
events.find_first::<polkadot::balances::events::Transfer>()?;
if let Some(event) = transfer_event {
println!("Balance transfer success: {event:?}");
} else {
@@ -161,7 +161,7 @@ async fn handle_transfer_events() -> Result<(), Box<dyn std::error::Error>> {
let events = details.wait_for_success().await?;
let transfer_event =
events.find_first_event::<polkadot::balances::events::Transfer>()?;
events.find_first::<polkadot::balances::events::Transfer>()?;
if let Some(event) = transfer_event {
println!(
@@ -181,7 +181,7 @@ async fn handle_transfer_events() -> Result<(), Box<dyn std::error::Error>> {
let events = details.wait_for_success().await?;
let transfer_event =
events.find_first_event::<polkadot::balances::events::Transfer>()?;
events.find_first::<polkadot::balances::events::Transfer>()?;
if let Some(event) = transfer_event {
println!("Balance transfer success: {event:?}");
+1 -1
View File
@@ -103,7 +103,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Or we can dynamically find the first transfer event, ignoring any others:
let transfer_event =
events.find_first_event::<polkadot::balances::events::Transfer>()?;
events.find_first::<polkadot::balances::events::Transfer>()?;
if let Some(ev) = transfer_event {
println!(" - Balance transfer success: value: {:?}", ev.amount);
+4 -16
View File
@@ -22,11 +22,7 @@
//! polkadot --dev --tmp
//! ```
use futures::{
future,
stream,
StreamExt,
};
use futures::StreamExt;
use sp_keyring::AccountKeyring;
use std::time::Duration;
use subxt::{
@@ -51,21 +47,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?
.to_runtime_api::<polkadot::RuntimeApi<DefaultConfig, DefaultExtra<DefaultConfig>>>();
// Subscribe to just balance transfer events, making use of `flat_map` and
// `filter_map` from the StreamExt trait to filter everything else out.
// Subscribe to just balance transfer events, making use of `filter_events`
// to select a single event type (note the 1-tuple) to filter out and return.
let mut transfer_events = api
.events()
.subscribe()
.await?
// Ignore errors returning events:
.filter_map(|events| future::ready(events.ok()))
// Map events to just the one we care about:
.flat_map(|events| {
let transfer_events = events
.find::<polkadot::balances::events::Transfer>()
.collect::<Vec<_>>();
stream::iter(transfer_events)
});
.filter_events::<(polkadot::balances::events::Transfer,)>();
// While this subscription is active, we imagine some balance transfers are made somewhere else:
async_std::task::spawn(async {
+100
View File
@@ -0,0 +1,100 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is part of subxt.
//
// subxt is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// subxt is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.
//! To run this example, a local polkadot node should be running. Example verified against polkadot 0.9.13-82616422d0-aarch64-macos.
//!
//! E.g.
//! ```bash
//! curl "https://github.com/paritytech/polkadot/releases/download/v0.9.13/polkadot" --output /usr/local/bin/polkadot --location
//! polkadot --dev --tmp
//! ```
use futures::StreamExt;
use sp_keyring::AccountKeyring;
use std::time::Duration;
use subxt::{
ClientBuilder,
DefaultConfig,
DefaultExtra,
PairSigner,
};
#[subxt::subxt(runtime_metadata_path = "examples/polkadot_metadata.scale")]
pub mod polkadot {}
/// Subscribe to all events, and then manually look through them and
/// pluck out the events that we care about.
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
// Subscribe to any events that occur:
let api = ClientBuilder::new()
.build()
.await?
.to_runtime_api::<polkadot::RuntimeApi<DefaultConfig, DefaultExtra<DefaultConfig>>>();
// Subscribe to several balance related events. If we ask for more than one event,
// we'll be given a correpsonding tuple of `Option`'s, with exactly one
// variant populated each time.
let mut balance_events = api.events().subscribe().await?.filter_events::<(
polkadot::balances::events::Withdraw,
polkadot::balances::events::Transfer,
polkadot::balances::events::Deposit,
)>();
// While this subscription is active, we imagine some balance transfers are made somewhere else:
async_std::task::spawn(async {
let signer = PairSigner::new(AccountKeyring::Alice.pair());
let api = ClientBuilder::new()
.build()
.await
.unwrap()
.to_runtime_api::<polkadot::RuntimeApi<DefaultConfig, DefaultExtra<DefaultConfig>>>();
// Make small balance transfers from Alice to Bob in a loop:
loop {
api.tx()
.balances()
.transfer(AccountKeyring::Bob.to_account_id().into(), 1_000_000_000)
.sign_and_submit(&signer)
.await
.unwrap();
async_std::task::sleep(Duration::from_secs(10)).await;
}
});
// Our subscription will see all of the balance events we're filtering on:
while let Some(ev) = balance_events.next().await {
let event_details = ev?;
let block_hash = event_details.block_hash;
let event = event_details.event;
println!("Event at {:?}:", block_hash);
if let (Some(withdraw), _, _) = &event {
println!(" Withdraw event: {withdraw:?}");
}
if let (_, Some(transfer), _) = &event {
println!(" Transfer event: {transfer:?}");
}
if let (_, _, Some(deposit)) = &event {
println!(" Deposit event: {deposit:?}");
}
}
Ok(())
}
+487
View File
@@ -0,0 +1,487 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is part of subxt.
//
// subxt is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// subxt is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.
//! Dynamically decoding events.
use crate::{
error::BasicError,
metadata::MetadataError,
};
use bitvec::{
order::Lsb0,
vec::BitVec,
};
use codec::{
Codec,
Compact,
Decode,
};
use scale_info::{
PortableRegistry,
TypeDef,
TypeDefPrimitive,
};
/// Given a type Id and a type registry, attempt to consume the bytes
/// corresponding to that type from our input.
pub fn decode_and_consume_type(
type_id: u32,
types: &PortableRegistry,
input: &mut &[u8],
) -> Result<(), BasicError> {
let ty = types
.resolve(type_id)
.ok_or(MetadataError::TypeNotFound(type_id))?;
fn consume_type<T: Codec>(input: &mut &[u8]) -> Result<(), BasicError> {
T::decode(input)?;
Ok(())
}
match ty.type_def() {
TypeDef::Composite(composite) => {
for field in composite.fields() {
decode_and_consume_type(field.ty().id(), types, input)?
}
Ok(())
}
TypeDef::Variant(variant) => {
let variant_index = u8::decode(input)?;
let variant = variant
.variants()
.iter()
.find(|v| v.index() == variant_index)
.ok_or_else(|| {
BasicError::Other(format!("Variant {} not found", variant_index))
})?;
for field in variant.fields() {
decode_and_consume_type(field.ty().id(), types, input)?;
}
Ok(())
}
TypeDef::Sequence(seq) => {
let len = <Compact<u32>>::decode(input)?;
for _ in 0..len.0 {
decode_and_consume_type(seq.type_param().id(), types, input)?;
}
Ok(())
}
TypeDef::Array(arr) => {
for _ in 0..arr.len() {
decode_and_consume_type(arr.type_param().id(), types, input)?;
}
Ok(())
}
TypeDef::Tuple(tuple) => {
for field in tuple.fields() {
decode_and_consume_type(field.id(), types, input)?;
}
Ok(())
}
TypeDef::Primitive(primitive) => {
match primitive {
TypeDefPrimitive::Bool => consume_type::<bool>(input),
TypeDefPrimitive::Char => {
Err(
EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char)
.into(),
)
}
TypeDefPrimitive::Str => consume_type::<String>(input),
TypeDefPrimitive::U8 => consume_type::<u8>(input),
TypeDefPrimitive::U16 => consume_type::<u16>(input),
TypeDefPrimitive::U32 => consume_type::<u32>(input),
TypeDefPrimitive::U64 => consume_type::<u64>(input),
TypeDefPrimitive::U128 => consume_type::<u128>(input),
TypeDefPrimitive::U256 => {
Err(
EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::U256)
.into(),
)
}
TypeDefPrimitive::I8 => consume_type::<i8>(input),
TypeDefPrimitive::I16 => consume_type::<i16>(input),
TypeDefPrimitive::I32 => consume_type::<i32>(input),
TypeDefPrimitive::I64 => consume_type::<i64>(input),
TypeDefPrimitive::I128 => consume_type::<i128>(input),
TypeDefPrimitive::I256 => {
Err(
EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::I256)
.into(),
)
}
}
}
TypeDef::Compact(compact) => {
let inner = types
.resolve(compact.type_param().id())
.ok_or(MetadataError::TypeNotFound(type_id))?;
let mut decode_compact_primitive = |primitive: &TypeDefPrimitive| {
match primitive {
TypeDefPrimitive::U8 => consume_type::<Compact<u8>>(input),
TypeDefPrimitive::U16 => consume_type::<Compact<u16>>(input),
TypeDefPrimitive::U32 => consume_type::<Compact<u32>>(input),
TypeDefPrimitive::U64 => consume_type::<Compact<u64>>(input),
TypeDefPrimitive::U128 => consume_type::<Compact<u128>>(input),
prim => {
Err(EventsDecodingError::InvalidCompactPrimitive(prim.clone())
.into())
}
}
};
match inner.type_def() {
TypeDef::Primitive(primitive) => decode_compact_primitive(primitive),
TypeDef::Composite(composite) => {
match composite.fields() {
[field] => {
let field_ty =
types.resolve(field.ty().id()).ok_or_else(|| {
MetadataError::TypeNotFound(field.ty().id())
})?;
if let TypeDef::Primitive(primitive) = field_ty.type_def() {
decode_compact_primitive(primitive)
} else {
Err(EventsDecodingError::InvalidCompactType(
"Composite type must have a single primitive field"
.into(),
)
.into())
}
}
_ => {
Err(EventsDecodingError::InvalidCompactType(
"Composite type must have a single field".into(),
)
.into())
}
}
}
_ => {
Err(EventsDecodingError::InvalidCompactType(
"Compact type must be a primitive or a composite type".into(),
)
.into())
}
}
}
TypeDef::BitSequence(bitseq) => {
let bit_store_def = types
.resolve(bitseq.bit_store_type().id())
.ok_or(MetadataError::TypeNotFound(type_id))?
.type_def();
// We just need to consume the correct number of bytes. Roughly, we encode this
// as a Compact<u32> length, and then a slice of T of that length, where T is the
// bit store type. So, we ignore the bit order and only care that the bit store type
// used lines up in terms of the number of bytes it will take to encode/decode it.
match bit_store_def {
TypeDef::Primitive(TypeDefPrimitive::U8) => {
consume_type::<BitVec<Lsb0, u8>>(input)
}
TypeDef::Primitive(TypeDefPrimitive::U16) => {
consume_type::<BitVec<Lsb0, u16>>(input)
}
TypeDef::Primitive(TypeDefPrimitive::U32) => {
consume_type::<BitVec<Lsb0, u32>>(input)
}
TypeDef::Primitive(TypeDefPrimitive::U64) => {
consume_type::<BitVec<Lsb0, u64>>(input)
}
store => {
return Err(EventsDecodingError::InvalidBitSequenceType(format!(
"{:?}",
store
))
.into())
}
}
}
}
}
/// The possible errors that we can run into attempting to decode events.
#[derive(Debug, thiserror::Error)]
pub enum EventsDecodingError {
/// Unsupported primitive type
#[error("Unsupported primitive type {0:?}")]
UnsupportedPrimitive(TypeDefPrimitive),
/// Invalid compact type, must be an unsigned int.
#[error("Invalid compact primitive {0:?}")]
InvalidCompactPrimitive(TypeDefPrimitive),
/// Invalid compact type; error details in string.
#[error("Invalid compact composite type {0}")]
InvalidCompactType(String),
/// Invalid bit sequence type; bit store type or bit order type used aren't supported.
#[error("Invalid bit sequence type; bit store type {0} is not supported")]
InvalidBitSequenceType(String),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::GenericError::{
Codec,
EventsDecoding,
Other,
};
use assert_matches::assert_matches;
use codec::Encode;
use scale_info::TypeInfo;
type TypeId = scale_info::interner::UntrackedSymbol<std::any::TypeId>;
/// Build a type registry that knows about the single type provided.
fn singleton_type_registry<T: scale_info::TypeInfo + 'static>(
) -> (TypeId, PortableRegistry) {
let m = scale_info::MetaType::new::<T>();
let mut types = scale_info::Registry::new();
let id = types.register_type(&m);
let portable_registry: PortableRegistry = types.into();
(id, portable_registry)
}
fn decode_and_consume_type_consumes_all_bytes<
T: codec::Encode + scale_info::TypeInfo + 'static,
>(
val: T,
) {
let (type_id, registry) = singleton_type_registry::<T>();
let bytes = val.encode();
let cursor = &mut &*bytes;
decode_and_consume_type(type_id.id(), &registry, cursor).unwrap();
assert_eq!(cursor.len(), 0);
}
#[test]
fn decode_bitvec() {
use bitvec::order::Msb0;
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Msb0, u8; 0, 1, 1, 0, 1, 0, 1, 0, 0],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Lsb0, u16; 0, 1, 1, 0, 1],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Msb0, u16; 0, 1, 1, 0, 1, 0, 1, 0, 0],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Lsb0, u32; 0, 1, 1, 0, 1],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Msb0, u32; 0, 1, 1, 0, 1, 0, 1, 0, 0],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Lsb0, u64; 0, 1, 1, 0, 1],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Msb0, u64; 0, 1, 1, 0, 1, 0, 1, 0, 0],
);
}
#[test]
fn decode_primitive() {
decode_and_consume_type_consumes_all_bytes(false);
decode_and_consume_type_consumes_all_bytes(true);
let dummy_data = vec![0u8];
let dummy_cursor = &mut &*dummy_data;
let (id, reg) = singleton_type_registry::<char>();
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(
res,
Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive(
TypeDefPrimitive::Char
)))
);
decode_and_consume_type_consumes_all_bytes("str".to_string());
decode_and_consume_type_consumes_all_bytes(1u8);
decode_and_consume_type_consumes_all_bytes(1i8);
decode_and_consume_type_consumes_all_bytes(1u16);
decode_and_consume_type_consumes_all_bytes(1i16);
decode_and_consume_type_consumes_all_bytes(1u32);
decode_and_consume_type_consumes_all_bytes(1i32);
decode_and_consume_type_consumes_all_bytes(1u64);
decode_and_consume_type_consumes_all_bytes(1i64);
decode_and_consume_type_consumes_all_bytes(1u128);
decode_and_consume_type_consumes_all_bytes(1i128);
}
#[test]
fn decode_tuple() {
decode_and_consume_type_consumes_all_bytes(());
decode_and_consume_type_consumes_all_bytes((true,));
decode_and_consume_type_consumes_all_bytes((true, "str"));
// Incomplete bytes for decoding
let dummy_data = false.encode();
let dummy_cursor = &mut &*dummy_data;
let (id, reg) = singleton_type_registry::<(bool, &'static str)>();
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(res, Err(Codec(_)));
// Incomplete bytes for decoding, with invalid char type
let dummy_data = (false, "str", 0u8).encode();
let dummy_cursor = &mut &*dummy_data;
let (id, reg) = singleton_type_registry::<(bool, &'static str, char)>();
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(
res,
Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive(
TypeDefPrimitive::Char
)))
);
// The last byte (0x0 u8) should not be consumed
assert_eq!(dummy_cursor.len(), 1);
}
#[test]
fn decode_array_and_seq() {
decode_and_consume_type_consumes_all_bytes([0]);
decode_and_consume_type_consumes_all_bytes([1, 2, 3, 4, 5]);
decode_and_consume_type_consumes_all_bytes([0; 500]);
decode_and_consume_type_consumes_all_bytes(["str", "abc", "cde"]);
decode_and_consume_type_consumes_all_bytes(vec![0]);
decode_and_consume_type_consumes_all_bytes(vec![1, 2, 3, 4, 5]);
decode_and_consume_type_consumes_all_bytes(vec!["str", "abc", "cde"]);
}
#[test]
fn decode_variant() {
#[derive(Clone, Encode, TypeInfo)]
enum EnumVar {
A,
B((&'static str, u8)),
C { named: i16 },
}
const INVALID_TYPE_ID: u32 = 1024;
decode_and_consume_type_consumes_all_bytes(EnumVar::A);
decode_and_consume_type_consumes_all_bytes(EnumVar::B(("str", 1)));
decode_and_consume_type_consumes_all_bytes(EnumVar::C { named: 1 });
// Invalid variant index
let dummy_data = 3u8.encode();
let dummy_cursor = &mut &*dummy_data;
let (id, reg) = singleton_type_registry::<EnumVar>();
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(res, Err(Other(_)));
// Valid index, incomplete data
let dummy_data = 2u8.encode();
let dummy_cursor = &mut &*dummy_data;
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(res, Err(Codec(_)));
let res = decode_and_consume_type(INVALID_TYPE_ID, &reg, dummy_cursor);
assert_matches!(res, Err(crate::error::GenericError::Metadata(_)));
}
#[test]
fn decode_composite() {
#[derive(Clone, Encode, TypeInfo)]
struct Composite {}
decode_and_consume_type_consumes_all_bytes(Composite {});
#[derive(Clone, Encode, TypeInfo)]
struct CompositeV2 {
id: u32,
name: String,
}
decode_and_consume_type_consumes_all_bytes(CompositeV2 {
id: 10,
name: "str".to_string(),
});
#[derive(Clone, Encode, TypeInfo)]
struct CompositeV3<T> {
id: u32,
extra: T,
}
decode_and_consume_type_consumes_all_bytes(CompositeV3 {
id: 10,
extra: vec![0, 1, 2],
});
decode_and_consume_type_consumes_all_bytes(CompositeV3 {
id: 10,
extra: bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1],
});
decode_and_consume_type_consumes_all_bytes(CompositeV3 {
id: 10,
extra: ("str", 1),
});
decode_and_consume_type_consumes_all_bytes(CompositeV3 {
id: 10,
extra: CompositeV2 {
id: 2,
name: "str".to_string(),
},
});
#[derive(Clone, Encode, TypeInfo)]
struct CompositeV4(u32, bool);
decode_and_consume_type_consumes_all_bytes(CompositeV4(1, true));
#[derive(Clone, Encode, TypeInfo)]
struct CompositeV5(u32);
decode_and_consume_type_consumes_all_bytes(CompositeV5(1));
}
#[test]
fn decode_compact() {
#[derive(Clone, Encode, TypeInfo)]
enum Compact {
A(#[codec(compact)] u32),
}
decode_and_consume_type_consumes_all_bytes(Compact::A(1));
#[derive(Clone, Encode, TypeInfo)]
struct CompactV2(#[codec(compact)] u32);
decode_and_consume_type_consumes_all_bytes(CompactV2(1));
#[derive(Clone, Encode, TypeInfo)]
struct CompactV3 {
#[codec(compact)]
val: u32,
}
decode_and_consume_type_consumes_all_bytes(CompactV3 { val: 1 });
#[derive(Clone, Encode, TypeInfo)]
struct CompactV4<T> {
#[codec(compact)]
val: T,
}
decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 0u8 });
decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 1u16 });
}
}
+177
View File
@@ -0,0 +1,177 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is part of subxt.
//
// subxt is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// subxt is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.
//! Subscribing to events.
use crate::{
error::BasicError,
Client,
Config,
};
use codec::Decode;
use derivative::Derivative;
use futures::{
Future,
FutureExt,
Stream,
StreamExt,
};
use jsonrpsee::core::client::Subscription;
use std::{
marker::Unpin,
task::Poll,
};
pub use super::{
at,
EventDetails,
EventFilter,
Events,
EventsDecodingError,
FilterEvents,
RawEventDetails,
};
/// Subscribe to events from blocks.
///
/// **Note:** these blocks haven't necessarily been finalised yet; prefer
/// [`Events::subscribe_finalized()`] if that is important.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe()` over calling this directly.
#[doc(hidden)]
pub async fn subscribe<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
}
/// Subscribe to events from finalized blocks.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe_finalized()` over calling this directly.
#[doc(hidden)]
pub async fn subscribe_finalized<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_finalized_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
}
/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct EventSubscription<'a, T: Config, Evs: 'static> {
finished: bool,
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
#[derivative(Debug = "ignore")]
at: Option<
std::pin::Pin<
Box<dyn Future<Output = Result<Events<'a, T, Evs>, BasicError>> + 'a>,
>,
>,
_event_type: std::marker::PhantomData<Evs>,
}
impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
fn new(
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
) -> Self {
EventSubscription {
finished: false,
client,
block_header_subscription,
at: None,
_event_type: std::marker::PhantomData,
}
}
/// Return only specific events matching the tuple of 1 or more event
/// types that has been provided as the `Filter` type parameter.
pub fn filter_events<Filter: EventFilter>(self) -> FilterEvents<'a, Self, T, Filter> {
FilterEvents::new(self)
}
}
impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose
// way to roughly implement the following function:
//
// ```
// fn subscribe_events<T: Config, Evs: Decode>(client: &'_ Client<T>, block_sub: Subscription<T::Header>) -> impl Stream<Item=Result<Events<'_, T, Evs>, BasicError>> + '_ {
// use futures::StreamExt;
// block_sub.then(move |block_header_res| async move {
// use sp_runtime::traits::Header;
// let block_header = block_header_res?;
// let block_hash = block_header.hash();
// at(client, block_hash).await
// })
// }
// ```
//
// The advantage of this manual implementation is that we have a named type that we (and others)
// can derive things on, store away, alias etc.
impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
type Item = Result<Events<'a, T, Evs>, BasicError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// We are finished; return None.
if self.finished {
return Poll::Ready(None)
}
// If there isn't an `at` function yet that's busy resolving a block hash into
// some event details, then poll the block header subscription to get one.
if self.at.is_none() {
match futures::ready!(self.block_header_subscription.poll_next_unpin(cx)) {
None => {
self.finished = true;
return Poll::Ready(None)
}
Some(Err(e)) => {
self.finished = true;
return Poll::Ready(Some(Err(e.into())))
}
Some(Ok(block_header)) => {
use sp_runtime::traits::Header;
// Note [jsdw]: We may be able to get rid of the per-item allocation
// with https://github.com/oblique/reusable-box-future.
self.at = Some(Box::pin(at(self.client, block_header.hash())));
// Continue, so that we poll this function future we've just created.
}
}
}
// If we get here, there will be an `at` function stored. Unwrap it and poll it to
// completion to get our events, throwing it away as soon as it is ready.
let at_fn = self
.at
.as_mut()
.expect("'at' function should have been set above'");
let events = futures::ready!(at_fn.poll_unpin(cx));
self.at = None;
Poll::Ready(Some(events))
}
}
@@ -14,50 +14,29 @@
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.
//! For working with events.
//! A representation of a block of events.
use super::decoding;
use crate::{
error::BasicError,
metadata::MetadataError,
Client,
Config,
Event,
Metadata,
Phase,
};
use bitvec::{
order::Lsb0,
vec::BitVec,
};
use codec::{
Codec,
Compact,
Decode,
Error as CodecError,
Input,
};
use derivative::Derivative;
use futures::{
Future,
FutureExt,
Stream,
StreamExt,
};
use jsonrpsee::core::client::Subscription;
use scale_info::{
PortableRegistry,
TypeDef,
TypeDefPrimitive,
};
use sp_core::{
storage::StorageKey,
twox_128,
Bytes,
};
use std::{
marker::Unpin,
task::Poll,
};
/// Obtain events at some block hash. The generic parameter is what we
/// will attempt to decode each event into if using [`Events::iter()`],
@@ -100,136 +79,18 @@ pub async fn at<T: Config, Evs: Decode>(
})
}
/// Subscribe to events from blocks.
///
/// **Note:** these blocks haven't necessarily been finalised yet; prefer
/// [`Events::subscribe_finalized()`] if that is important.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe()` over calling this directly.
#[doc(hidden)]
pub async fn subscribe<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
}
/// Subscribe to events from finalized blocks.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe_finalized()` over calling this directly.
#[doc(hidden)]
pub async fn subscribe_finalized<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_finalized_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
}
/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct EventSubscription<'a, T: Config, Evs: Decode + 'static> {
finished: bool,
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
#[derivative(Debug = "ignore")]
at: Option<
std::pin::Pin<
Box<dyn Future<Output = Result<Events<'a, T, Evs>, BasicError>> + 'a>,
>,
>,
_event_type: std::marker::PhantomData<Evs>,
}
impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
fn new(
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
) -> Self {
EventSubscription {
finished: false,
client,
block_header_subscription,
at: None,
_event_type: std::marker::PhantomData,
}
}
}
impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose
// way to roughly implement the following function:
//
// ```
// fn subscribe_events<T: Config, Evs: Decode>(client: &'_ Client<T>, block_sub: Subscription<T::Header>) -> impl Stream<Item=Result<Events<'_, T, Evs>, BasicError>> + '_ {
// use futures::StreamExt;
// block_sub.then(move |block_header_res| async move {
// use sp_runtime::traits::Header;
// let block_header = block_header_res?;
// let block_hash = block_header.hash();
// at(client, block_hash).await
// })
// }
// ```
//
// The advantage of this manual implementation is that we have a named type that we (and others)
// can derive things on, store away, alias etc.
impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
type Item = Result<Events<'a, T, Evs>, BasicError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// We are finished; return None.
if self.finished {
return Poll::Ready(None)
}
// If there isn't an `at` function yet that's busy resolving a block hash into
// some event details, then poll the block header subscription to get one.
if self.at.is_none() {
match futures::ready!(self.block_header_subscription.poll_next_unpin(cx)) {
None => {
self.finished = true;
return Poll::Ready(None)
}
Some(Err(e)) => {
self.finished = true;
return Poll::Ready(Some(Err(e.into())))
}
Some(Ok(block_header)) => {
use sp_runtime::traits::Header;
// Note [jsdw]: We may be able to get rid of the per-item allocation
// with https://github.com/oblique/reusable-box-future.
self.at = Some(Box::pin(at(self.client, block_header.hash())));
// Continue, so that we poll this function future we've just created.
}
}
}
// If we get here, there will be an `at` function stored. Unwrap it and poll it to
// completion to get our events, throwing it away as soon as it is ready.
let at_fn = self
.at
.as_mut()
.expect("'at' function should have been set above'");
let events = futures::ready!(at_fn.poll_unpin(cx));
self.at = None;
Poll::Ready(Some(events))
}
// The storage key needed to access events.
fn system_events_key() -> StorageKey {
let mut storage_key = twox_128(b"System").to_vec();
storage_key.extend(twox_128(b"Events").to_vec());
StorageKey(storage_key)
}
/// A collection of events obtained from a block, bundled with the necessary
/// information needed to decode and iterate over them.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct Events<'a, T: Config, Evs: Decode> {
pub struct Events<'a, T: Config, Evs> {
metadata: &'a Metadata,
block_hash: T::Hash,
// Note; raw event bytes are prefixed with a Compact<u32> containing
@@ -352,6 +213,49 @@ impl<'a, T: Config, Evs: Decode> Events<'a, T, Evs> {
})
}
/// Iterate over all of the events, using metadata to dynamically
/// decode them as we go, and returning the raw bytes and other associated
/// details. If an error occurs, all subsequent iterations return `None`.
///
/// This method is safe to use even if you do not statically know about
/// all of the possible events; it splits events up using the metadata
/// obtained at runtime, which does.
///
/// Unlike [`Events::iter_raw()`] this consumes `self`, which can be useful
/// if you need to store the iterator somewhere and avoid lifetime issues.
pub fn into_iter_raw(
self,
) -> impl Iterator<Item = Result<RawEventDetails, BasicError>> + 'a {
let mut pos = 0;
let mut index = 0;
std::iter::from_fn(move || {
let cursor = &mut &self.event_bytes[pos..];
let start_len = cursor.len();
if start_len == 0 || self.num_events == index {
None
} else {
match decode_raw_event_details::<T>(self.metadata, index, cursor) {
Ok(raw_event) => {
// Skip over decoded bytes in next iteration:
pos += start_len - cursor.len();
// Increment the index:
index += 1;
// Return the event details:
Some(Ok(raw_event))
}
Err(e) => {
// By setting the position to the "end" of the event bytes,
// the cursor len will become 0 and the iterator will return `None`
// from now on:
pos = self.event_bytes.len();
Some(Err(e))
}
}
}
})
}
/// Iterate through the events using metadata to dynamically decode and skip
/// them, and return only those which should decode to the provided `Ev` type.
/// If an error occurs, all subsequent iterations return `None`.
@@ -370,7 +274,7 @@ impl<'a, T: Config, Evs: Decode> Events<'a, T, Evs> {
///
/// **Note:** This method internally uses [`Events::iter_raw()`], so it is safe to
/// use even if you do not statically know about all of the possible events.
pub fn find_first_event<Ev: Event>(&self) -> Result<Option<Ev>, BasicError> {
pub fn find_first<Ev: Event>(&self) -> Result<Option<Ev>, BasicError> {
self.find::<Ev>().next().transpose()
}
@@ -457,7 +361,11 @@ fn decode_raw_event_details<T: Config>(
let type_id = arg.ty().id();
let all_bytes = *input;
// consume some bytes, moving the cursor forward:
decode_and_consume_type(type_id, &metadata.runtime_metadata().types, input)?;
decoding::decode_and_consume_type(
type_id,
&metadata.runtime_metadata().types,
input,
)?;
// count how many bytes were consumed based on remaining length:
let consumed_len = all_bytes.len() - input.len();
// move those consumed bytes to the output vec unaltered:
@@ -480,222 +388,15 @@ fn decode_raw_event_details<T: Config>(
})
}
// The storage key needed to access events.
fn system_events_key() -> StorageKey {
let mut storage_key = twox_128(b"System").to_vec();
storage_key.extend(twox_128(b"Events").to_vec());
StorageKey(storage_key)
}
// Given a type Id and a type registry, attempt to consume the bytes
// corresponding to that type from our input.
fn decode_and_consume_type(
type_id: u32,
types: &PortableRegistry,
input: &mut &[u8],
) -> Result<(), BasicError> {
let ty = types
.resolve(type_id)
.ok_or(MetadataError::TypeNotFound(type_id))?;
fn consume_type<T: Codec>(input: &mut &[u8]) -> Result<(), BasicError> {
T::decode(input)?;
Ok(())
}
match ty.type_def() {
TypeDef::Composite(composite) => {
for field in composite.fields() {
decode_and_consume_type(field.ty().id(), types, input)?
}
Ok(())
}
TypeDef::Variant(variant) => {
let variant_index = u8::decode(input)?;
let variant = variant
.variants()
.iter()
.find(|v| v.index() == variant_index)
.ok_or_else(|| {
BasicError::Other(format!("Variant {} not found", variant_index))
})?;
for field in variant.fields() {
decode_and_consume_type(field.ty().id(), types, input)?;
}
Ok(())
}
TypeDef::Sequence(seq) => {
let len = <Compact<u32>>::decode(input)?;
for _ in 0..len.0 {
decode_and_consume_type(seq.type_param().id(), types, input)?;
}
Ok(())
}
TypeDef::Array(arr) => {
for _ in 0..arr.len() {
decode_and_consume_type(arr.type_param().id(), types, input)?;
}
Ok(())
}
TypeDef::Tuple(tuple) => {
for field in tuple.fields() {
decode_and_consume_type(field.id(), types, input)?;
}
Ok(())
}
TypeDef::Primitive(primitive) => {
match primitive {
TypeDefPrimitive::Bool => consume_type::<bool>(input),
TypeDefPrimitive::Char => {
Err(
EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char)
.into(),
)
}
TypeDefPrimitive::Str => consume_type::<String>(input),
TypeDefPrimitive::U8 => consume_type::<u8>(input),
TypeDefPrimitive::U16 => consume_type::<u16>(input),
TypeDefPrimitive::U32 => consume_type::<u32>(input),
TypeDefPrimitive::U64 => consume_type::<u64>(input),
TypeDefPrimitive::U128 => consume_type::<u128>(input),
TypeDefPrimitive::U256 => {
Err(
EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::U256)
.into(),
)
}
TypeDefPrimitive::I8 => consume_type::<i8>(input),
TypeDefPrimitive::I16 => consume_type::<i16>(input),
TypeDefPrimitive::I32 => consume_type::<i32>(input),
TypeDefPrimitive::I64 => consume_type::<i64>(input),
TypeDefPrimitive::I128 => consume_type::<i128>(input),
TypeDefPrimitive::I256 => {
Err(
EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::I256)
.into(),
)
}
}
}
TypeDef::Compact(compact) => {
let inner = types
.resolve(compact.type_param().id())
.ok_or(MetadataError::TypeNotFound(type_id))?;
let mut decode_compact_primitive = |primitive: &TypeDefPrimitive| {
match primitive {
TypeDefPrimitive::U8 => consume_type::<Compact<u8>>(input),
TypeDefPrimitive::U16 => consume_type::<Compact<u16>>(input),
TypeDefPrimitive::U32 => consume_type::<Compact<u32>>(input),
TypeDefPrimitive::U64 => consume_type::<Compact<u64>>(input),
TypeDefPrimitive::U128 => consume_type::<Compact<u128>>(input),
prim => {
Err(EventsDecodingError::InvalidCompactPrimitive(prim.clone())
.into())
}
}
};
match inner.type_def() {
TypeDef::Primitive(primitive) => decode_compact_primitive(primitive),
TypeDef::Composite(composite) => {
match composite.fields() {
[field] => {
let field_ty =
types.resolve(field.ty().id()).ok_or_else(|| {
MetadataError::TypeNotFound(field.ty().id())
})?;
if let TypeDef::Primitive(primitive) = field_ty.type_def() {
decode_compact_primitive(primitive)
} else {
Err(EventsDecodingError::InvalidCompactType(
"Composite type must have a single primitive field"
.into(),
)
.into())
}
}
_ => {
Err(EventsDecodingError::InvalidCompactType(
"Composite type must have a single field".into(),
)
.into())
}
}
}
_ => {
Err(EventsDecodingError::InvalidCompactType(
"Compact type must be a primitive or a composite type".into(),
)
.into())
}
}
}
TypeDef::BitSequence(bitseq) => {
let bit_store_def = types
.resolve(bitseq.bit_store_type().id())
.ok_or(MetadataError::TypeNotFound(type_id))?
.type_def();
// We just need to consume the correct number of bytes. Roughly, we encode this
// as a Compact<u32> length, and then a slice of T of that length, where T is the
// bit store type. So, we ignore the bit order and only care that the bit store type
// used lines up in terms of the number of bytes it will take to encode/decode it.
match bit_store_def {
TypeDef::Primitive(TypeDefPrimitive::U8) => {
consume_type::<BitVec<Lsb0, u8>>(input)
}
TypeDef::Primitive(TypeDefPrimitive::U16) => {
consume_type::<BitVec<Lsb0, u16>>(input)
}
TypeDef::Primitive(TypeDefPrimitive::U32) => {
consume_type::<BitVec<Lsb0, u32>>(input)
}
TypeDef::Primitive(TypeDefPrimitive::U64) => {
consume_type::<BitVec<Lsb0, u64>>(input)
}
store => {
return Err(EventsDecodingError::InvalidBitSequenceType(format!(
"{:?}",
store
))
.into())
}
}
}
}
}
/// The possible errors that we can run into attempting to decode events.
#[derive(Debug, thiserror::Error)]
pub enum EventsDecodingError {
/// Unsupported primitive type
#[error("Unsupported primitive type {0:?}")]
UnsupportedPrimitive(TypeDefPrimitive),
/// Invalid compact type, must be an unsigned int.
#[error("Invalid compact primitive {0:?}")]
InvalidCompactPrimitive(TypeDefPrimitive),
/// Invalid compact type; error details in string.
#[error("Invalid compact composite type {0}")]
InvalidCompactType(String),
/// Invalid bit sequence type; bit store type or bit order type used aren't supported.
#[error("Invalid bit sequence type; bit store type {0} is not supported")]
InvalidBitSequenceType(String),
}
/// Event related test utilities used outside this module.
#[cfg(test)]
mod tests {
pub(crate) mod test_utils {
use super::*;
use crate::{
error::GenericError::{
Codec,
EventsDecoding,
Other,
},
events::EventsDecodingError::UnsupportedPrimitive,
Config,
DefaultConfig,
Phase,
};
use assert_matches::assert_matches;
use codec::Encode;
use frame_metadata::{
v14::{
@@ -712,12 +413,10 @@ mod tests {
};
use std::convert::TryFrom;
type TypeId = scale_info::interner::UntrackedSymbol<std::any::TypeId>;
/// An "outer" events enum containing exactly one event.
#[derive(Encode, Decode, TypeInfo, Clone, Debug, PartialEq)]
pub enum AllEvents<Ev> {
E(Ev),
Test(Ev),
}
/// This encodes to the same format an event is expected to encode to
@@ -731,28 +430,17 @@ mod tests {
/// Build an EventRecord, which encoded events in the format expected
/// to be handed back from storage queries to System.Events.
fn event_record<E: Encode>(phase: Phase, event: E) -> EventRecord<E> {
pub fn event_record<E: Encode>(phase: Phase, event: E) -> EventRecord<E> {
EventRecord {
phase,
event: AllEvents::E(event),
event: AllEvents::Test(event),
topics: vec![],
}
}
/// Build a type registry that knows about the single type provided.
fn singleton_type_registry<T: scale_info::TypeInfo + 'static>(
) -> (TypeId, PortableRegistry) {
let m = scale_info::MetaType::new::<T>();
let mut types = scale_info::Registry::new();
let id = types.register_type(&m);
let portable_registry: PortableRegistry = types.into();
(id, portable_registry)
}
/// Build fake metadata consisting of a single pallet that knows
/// about the event type provided.
fn metadata<E: TypeInfo + 'static>() -> Metadata {
pub fn metadata<E: TypeInfo + 'static>() -> Metadata {
let pallets = vec![PalletMetadata {
name: "Test",
storage: None,
@@ -779,7 +467,7 @@ mod tests {
/// Build an `Events` object for test purposes, based on the details provided,
/// and with a default block hash.
fn events<E: Decode + Encode>(
pub fn events<E: Decode + Encode>(
metadata: &'_ Metadata,
event_records: Vec<EventRecord<E>>,
) -> Events<'_, DefaultConfig, AllEvents<E>> {
@@ -793,7 +481,7 @@ mod tests {
/// Much like [`events`], but takes pre-encoded events and event count, so that we can
/// mess with the bytes in tests if we need to.
fn events_raw<E: Decode + Encode>(
pub fn events_raw<E: Decode + Encode>(
metadata: &'_ Metadata,
event_bytes: Vec<u8>,
num_events: u32,
@@ -806,19 +494,23 @@ mod tests {
_event_type: std::marker::PhantomData,
}
}
}
fn decode_and_consume_type_consumes_all_bytes<
T: codec::Encode + scale_info::TypeInfo + 'static,
>(
val: T,
) {
let (type_id, registry) = singleton_type_registry::<T>();
let bytes = val.encode();
let cursor = &mut &*bytes;
decode_and_consume_type(type_id.id(), &registry, cursor).unwrap();
assert_eq!(cursor.len(), 0);
}
#[cfg(test)]
mod tests {
use super::{
test_utils::{
event_record,
events,
events_raw,
metadata,
AllEvents,
},
*,
};
use crate::Phase;
use codec::Encode;
use scale_info::TypeInfo;
#[test]
fn statically_decode_single_event() {
@@ -844,7 +536,7 @@ mod tests {
vec![EventDetails {
index: 0,
phase: Phase::Finalization,
event: AllEvents::E(Event::A(1))
event: AllEvents::Test(Event::A(1))
}]
);
}
@@ -879,17 +571,17 @@ mod tests {
EventDetails {
index: 0,
phase: Phase::Initialization,
event: AllEvents::E(Event::A(1))
event: AllEvents::Test(Event::A(1))
},
EventDetails {
index: 1,
phase: Phase::ApplyExtrinsic(123),
event: AllEvents::E(Event::B(true))
event: AllEvents::Test(Event::B(true))
},
EventDetails {
index: 2,
phase: Phase::Finalization,
event: AllEvents::E(Event::A(234))
event: AllEvents::Test(Event::A(234))
},
]
);
@@ -929,7 +621,7 @@ mod tests {
EventDetails {
index: 0,
phase: Phase::Initialization,
event: AllEvents::E(Event::A(1))
event: AllEvents::Test(Event::A(1))
}
);
assert_eq!(
@@ -937,7 +629,7 @@ mod tests {
EventDetails {
index: 1,
phase: Phase::ApplyExtrinsic(123),
event: AllEvents::E(Event::B(true))
event: AllEvents::Test(Event::B(true))
}
);
@@ -1151,7 +843,7 @@ mod tests {
vec![EventDetails {
index: 0,
phase: Phase::Finalization,
event: AllEvents::E(Event::A(1))
event: AllEvents::Test(Event::A(1))
}]
);
@@ -1209,7 +901,7 @@ mod tests {
vec![EventDetails {
index: 0,
phase: Phase::Finalization,
event: AllEvents::E(Event::A(CompactWrapper(1)))
event: AllEvents::Test(Event::A(CompactWrapper(1)))
}]
);
@@ -1268,7 +960,7 @@ mod tests {
vec![EventDetails {
index: 0,
phase: Phase::Finalization,
event: AllEvents::E(Event::A(MyType::B))
event: AllEvents::Test(Event::A(MyType::B))
}]
);
@@ -1294,218 +986,4 @@ mod tests {
}]
);
}
#[test]
fn decode_bitvec() {
use bitvec::order::Msb0;
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Msb0, u8; 0, 1, 1, 0, 1, 0, 1, 0, 0],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Lsb0, u16; 0, 1, 1, 0, 1],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Msb0, u16; 0, 1, 1, 0, 1, 0, 1, 0, 0],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Lsb0, u32; 0, 1, 1, 0, 1],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Msb0, u32; 0, 1, 1, 0, 1, 0, 1, 0, 0],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Lsb0, u64; 0, 1, 1, 0, 1],
);
decode_and_consume_type_consumes_all_bytes(
bitvec::bitvec![Msb0, u64; 0, 1, 1, 0, 1, 0, 1, 0, 0],
);
}
#[test]
fn decode_primitive() {
decode_and_consume_type_consumes_all_bytes(false);
decode_and_consume_type_consumes_all_bytes(true);
let dummy_data = vec![0u8];
let dummy_cursor = &mut &*dummy_data;
let (id, reg) = singleton_type_registry::<char>();
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(
res,
Err(EventsDecoding(UnsupportedPrimitive(TypeDefPrimitive::Char)))
);
decode_and_consume_type_consumes_all_bytes("str".to_string());
decode_and_consume_type_consumes_all_bytes(1u8);
decode_and_consume_type_consumes_all_bytes(1i8);
decode_and_consume_type_consumes_all_bytes(1u16);
decode_and_consume_type_consumes_all_bytes(1i16);
decode_and_consume_type_consumes_all_bytes(1u32);
decode_and_consume_type_consumes_all_bytes(1i32);
decode_and_consume_type_consumes_all_bytes(1u64);
decode_and_consume_type_consumes_all_bytes(1i64);
decode_and_consume_type_consumes_all_bytes(1u128);
decode_and_consume_type_consumes_all_bytes(1i128);
}
#[test]
fn decode_tuple() {
decode_and_consume_type_consumes_all_bytes(());
decode_and_consume_type_consumes_all_bytes((true,));
decode_and_consume_type_consumes_all_bytes((true, "str"));
// Incomplete bytes for decoding
let dummy_data = false.encode();
let dummy_cursor = &mut &*dummy_data;
let (id, reg) = singleton_type_registry::<(bool, &'static str)>();
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(res, Err(Codec(_)));
// Incomplete bytes for decoding, with invalid char type
let dummy_data = (false, "str", 0u8).encode();
let dummy_cursor = &mut &*dummy_data;
let (id, reg) = singleton_type_registry::<(bool, &'static str, char)>();
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(
res,
Err(EventsDecoding(UnsupportedPrimitive(TypeDefPrimitive::Char)))
);
// The last byte (0x0 u8) should not be consumed
assert_eq!(dummy_cursor.len(), 1);
}
#[test]
fn decode_array_and_seq() {
decode_and_consume_type_consumes_all_bytes([0]);
decode_and_consume_type_consumes_all_bytes([1, 2, 3, 4, 5]);
decode_and_consume_type_consumes_all_bytes([0; 500]);
decode_and_consume_type_consumes_all_bytes(["str", "abc", "cde"]);
decode_and_consume_type_consumes_all_bytes(vec![0]);
decode_and_consume_type_consumes_all_bytes(vec![1, 2, 3, 4, 5]);
decode_and_consume_type_consumes_all_bytes(vec!["str", "abc", "cde"]);
}
#[test]
fn decode_variant() {
#[derive(Clone, Encode, TypeInfo)]
enum EnumVar {
A,
B((&'static str, u8)),
C { named: i16 },
}
const INVALID_TYPE_ID: u32 = 1024;
decode_and_consume_type_consumes_all_bytes(EnumVar::A);
decode_and_consume_type_consumes_all_bytes(EnumVar::B(("str", 1)));
decode_and_consume_type_consumes_all_bytes(EnumVar::C { named: 1 });
// Invalid variant index
let dummy_data = 3u8.encode();
let dummy_cursor = &mut &*dummy_data;
let (id, reg) = singleton_type_registry::<EnumVar>();
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(res, Err(Other(_)));
// Valid index, incomplete data
let dummy_data = 2u8.encode();
let dummy_cursor = &mut &*dummy_data;
let res = decode_and_consume_type(id.id(), &reg, dummy_cursor);
assert_matches!(res, Err(Codec(_)));
let res = decode_and_consume_type(INVALID_TYPE_ID, &reg, dummy_cursor);
assert_matches!(res, Err(crate::error::GenericError::Metadata(_)));
}
#[test]
fn decode_composite() {
#[derive(Clone, Encode, TypeInfo)]
struct Composite {}
decode_and_consume_type_consumes_all_bytes(Composite {});
#[derive(Clone, Encode, TypeInfo)]
struct CompositeV2 {
id: u32,
name: String,
}
decode_and_consume_type_consumes_all_bytes(CompositeV2 {
id: 10,
name: "str".to_string(),
});
#[derive(Clone, Encode, TypeInfo)]
struct CompositeV3<T> {
id: u32,
extra: T,
}
decode_and_consume_type_consumes_all_bytes(CompositeV3 {
id: 10,
extra: vec![0, 1, 2],
});
decode_and_consume_type_consumes_all_bytes(CompositeV3 {
id: 10,
extra: bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1],
});
decode_and_consume_type_consumes_all_bytes(CompositeV3 {
id: 10,
extra: ("str", 1),
});
decode_and_consume_type_consumes_all_bytes(CompositeV3 {
id: 10,
extra: CompositeV2 {
id: 2,
name: "str".to_string(),
},
});
#[derive(Clone, Encode, TypeInfo)]
struct CompositeV4(u32, bool);
decode_and_consume_type_consumes_all_bytes(CompositeV4(1, true));
#[derive(Clone, Encode, TypeInfo)]
struct CompositeV5(u32);
decode_and_consume_type_consumes_all_bytes(CompositeV5(1));
}
#[test]
fn decode_compact() {
#[derive(Clone, Encode, TypeInfo)]
enum Compact {
A(#[codec(compact)] u32),
}
decode_and_consume_type_consumes_all_bytes(Compact::A(1));
#[derive(Clone, Encode, TypeInfo)]
struct CompactV2(#[codec(compact)] u32);
decode_and_consume_type_consumes_all_bytes(CompactV2(1));
#[derive(Clone, Encode, TypeInfo)]
struct CompactV3 {
#[codec(compact)]
val: u32,
}
decode_and_consume_type_consumes_all_bytes(CompactV3 { val: 1 });
#[derive(Clone, Encode, TypeInfo)]
struct CompactV4<T> {
#[codec(compact)]
val: T,
}
decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 0u8 });
decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 1u16 });
}
}
+418
View File
@@ -0,0 +1,418 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is part of subxt.
//
// subxt is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// subxt is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.
//! Filtering individual events from subscriptions.
use super::Events;
use crate::{
BasicError,
Config,
Event,
Phase,
};
use codec::Decode;
use futures::{
Stream,
StreamExt,
};
use std::{
marker::Unpin,
task::Poll,
};
/// A stream which filters events based on the `Filter` param provided.
/// If `Filter` is a 1-tuple of a single `Event` type, it will return every
/// instance of that event as it's found. If `filter` is ` tuple of multiple
/// `Event` types, it will return a corresponding tuple of `Option`s, where
/// exactly one of these will be `Some(event)` each iteration.
pub struct FilterEvents<'a, Sub: 'a, T: Config, Filter: EventFilter> {
// A subscription; in order for the Stream impl to apply, this will
// impl `Stream<Item = Result<Events<'a, T, Evs>, BasicError>> + Unpin + 'a`.
sub: Sub,
// Each time we get Events from our subscription, they are stored here
// and iterated through in future stream iterations until exhausted.
events: Option<
Box<
dyn Iterator<
Item = Result<
FilteredEventDetails<T::Hash, Filter::ReturnType>,
BasicError,
>,
> + 'a,
>,
>,
}
impl<'a, Sub: 'a, T: Config, Filter: EventFilter> Unpin
for FilterEvents<'a, Sub, T, Filter>
{
}
impl<'a, Sub: 'a, T: Config, Filter: EventFilter> FilterEvents<'a, Sub, T, Filter> {
pub(crate) fn new(sub: Sub) -> Self {
Self { sub, events: None }
}
}
impl<'a, Sub, T, Evs, Filter> Stream for FilterEvents<'a, Sub, T, Filter>
where
Sub: Stream<Item = Result<Events<'a, T, Evs>, BasicError>> + Unpin + 'a,
T: Config,
Evs: Decode + 'static,
Filter: EventFilter,
{
type Item = Result<FilteredEventDetails<T::Hash, Filter::ReturnType>, BasicError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
// Drain the current events we're iterating over first:
if let Some(events_iter) = self.events.as_mut() {
match events_iter.next() {
Some(res) => return Poll::Ready(Some(res)),
None => {
self.events = None;
}
}
}
// Wait for new events to come in:
match futures::ready!(self.sub.poll_next_unpin(cx)) {
None => return Poll::Ready(None),
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
Some(Ok(events)) => {
self.events = Some(Filter::filter(events));
}
};
}
}
}
/// This is returned from the [`FilterEvents`] impl of [`Stream`]. It contains
/// some type representing an event we've filtered on, along with couple of additional
/// pieces of information about that event.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct FilteredEventDetails<BlockHash, Evs> {
/// During which [`Phase`] was the event produced?
pub phase: Phase,
/// Hash of the block that this event came from.
pub block_hash: BlockHash,
/// A type containing an event that we've filtered on.
/// Depending on the filter type, this may be a tuple
/// or a single event.
pub event: Evs,
}
/// This trait is implemented for tuples of Event types; any such tuple (up to size 8) can be
/// used to filter an event subscription to return only the specified events.
pub trait EventFilter: private::Sealed {
/// The type we'll be handed back from filtering.
type ReturnType;
/// Filter the events based on the type implementing this trait.
fn filter<'a, T: Config, Evs: Decode + 'static>(
events: Events<'a, T, Evs>,
) -> Box<
dyn Iterator<
Item = Result<
FilteredEventDetails<T::Hash, Self::ReturnType>,
BasicError,
>,
> + 'a,
>;
}
// Prevent userspace implementations of the above trait; the interface is not considered stable
// and is not a particularly nice API to work with (particularly because it involves boxing, which
// would be nice to get rid of eventually).
pub(crate) mod private {
pub trait Sealed {}
}
// A special case impl for searching for a tuple of exactly one event (in this case, we don't
// need to return an `(Option<Event>,)`; we can just return `Event`.
impl<Ev: Event> private::Sealed for (Ev,) {}
impl<Ev: Event> EventFilter for (Ev,) {
type ReturnType = Ev;
fn filter<'a, T: Config, Evs: Decode + 'static>(
events: Events<'a, T, Evs>,
) -> Box<
dyn Iterator<Item = Result<FilteredEventDetails<T::Hash, Ev>, BasicError>> + 'a,
> {
let block_hash = events.block_hash();
let mut iter = events.into_iter_raw();
Box::new(std::iter::from_fn(move || {
for ev in iter.by_ref() {
// Forward any error immediately:
let raw_event = match ev {
Ok(ev) => ev,
Err(e) => return Some(Err(e)),
};
// Try decoding each type until we hit a match or an error:
let ev = raw_event.as_event::<Ev>();
if let Ok(Some(event)) = ev {
// We found a match; return our tuple.
return Some(Ok(FilteredEventDetails {
phase: raw_event.phase,
block_hash,
event,
}))
}
if let Err(e) = ev {
// We hit an error. Return it.
return Some(Err(e.into()))
}
}
None
}))
}
}
// A generalised impl for tuples of sizes greater than 1:
macro_rules! impl_event_filter {
($($ty:ident $idx:tt),+) => {
impl <$($ty: Event),+> private::Sealed for ( $($ty,)+ ) {}
impl <$($ty: Event),+> EventFilter for ( $($ty,)+ ) {
type ReturnType = ( $(Option<$ty>,)+ );
fn filter<'a, T: Config, Evs: Decode + 'static>(
events: Events<'a, T, Evs>
) -> Box<dyn Iterator<Item=Result<FilteredEventDetails<T::Hash,Self::ReturnType>, BasicError>> + 'a> {
let block_hash = events.block_hash();
let mut iter = events.into_iter_raw();
Box::new(std::iter::from_fn(move || {
let mut out: ( $(Option<$ty>,)+ ) = Default::default();
for ev in iter.by_ref() {
// Forward any error immediately:
let raw_event = match ev {
Ok(ev) => ev,
Err(e) => return Some(Err(e))
};
// Try decoding each type until we hit a match or an error:
$({
let ev = raw_event.as_event::<$ty>();
if let Ok(Some(ev)) = ev {
// We found a match; return our tuple.
out.$idx = Some(ev);
return Some(Ok(FilteredEventDetails {
phase: raw_event.phase,
block_hash,
event: out
}))
}
if let Err(e) = ev {
// We hit an error. Return it.
return Some(Err(e.into()))
}
})+
}
None
}))
}
}
}
}
impl_event_filter!(A 0, B 1);
impl_event_filter!(A 0, B 1, C 2);
impl_event_filter!(A 0, B 1, C 2, D 3);
impl_event_filter!(A 0, B 1, C 2, D 3, E 4);
impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5);
impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6);
impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7);
#[cfg(test)]
mod test {
use super::{
super::events_type::test_utils::{
event_record,
events,
metadata,
AllEvents,
},
*,
};
use crate::{
Config,
DefaultConfig,
Metadata,
};
use codec::Encode;
use futures::{
stream,
Stream,
StreamExt,
};
use scale_info::TypeInfo;
// Some pretend events in a pallet
#[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)]
enum PalletEvents {
A(EventA),
B(EventB),
C(EventC),
}
// An event in our pallet that we can filter on.
#[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)]
struct EventA(u8);
impl crate::Event for EventA {
const PALLET: &'static str = "Test";
const EVENT: &'static str = "A";
}
// An event in our pallet that we can filter on.
#[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)]
struct EventB(bool);
impl crate::Event for EventB {
const PALLET: &'static str = "Test";
const EVENT: &'static str = "B";
}
// An event in our pallet that we can filter on.
#[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)]
struct EventC(u8, bool);
impl crate::Event for EventC {
const PALLET: &'static str = "Test";
const EVENT: &'static str = "C";
}
// A stream of fake events for us to try filtering on.
fn events_stream(
metadata: &'_ Metadata,
) -> impl Stream<
Item = Result<Events<'_, DefaultConfig, AllEvents<PalletEvents>>, BasicError>,
> {
stream::iter(vec![
events::<PalletEvents>(
metadata,
vec![
event_record(Phase::Initialization, PalletEvents::A(EventA(1))),
event_record(Phase::ApplyExtrinsic(0), PalletEvents::B(EventB(true))),
event_record(Phase::Finalization, PalletEvents::A(EventA(2))),
],
),
events::<PalletEvents>(
metadata,
vec![event_record(
Phase::ApplyExtrinsic(1),
PalletEvents::B(EventB(false)),
)],
),
events::<PalletEvents>(
metadata,
vec![
event_record(Phase::ApplyExtrinsic(2), PalletEvents::B(EventB(true))),
event_record(Phase::ApplyExtrinsic(3), PalletEvents::A(EventA(3))),
],
),
])
.map(Ok::<_, BasicError>)
}
#[async_std::test]
async fn filter_one_event_from_stream() {
let metadata = metadata::<PalletEvents>();
// Filter out fake event stream to select events matching `EventA` only.
let actual: Vec<_> =
FilterEvents::<_, DefaultConfig, (EventA,)>::new(events_stream(&metadata))
.map(|e| e.unwrap())
.collect()
.await;
let expected = vec![
FilteredEventDetails {
phase: Phase::Initialization,
block_hash: <DefaultConfig as Config>::Hash::default(),
event: EventA(1),
},
FilteredEventDetails {
phase: Phase::Finalization,
block_hash: <DefaultConfig as Config>::Hash::default(),
event: EventA(2),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(3),
block_hash: <DefaultConfig as Config>::Hash::default(),
event: EventA(3),
},
];
assert_eq!(actual, expected);
}
#[async_std::test]
async fn filter_some_events_from_stream() {
let metadata = metadata::<PalletEvents>();
// Filter out fake event stream to select events matching `EventA` or `EventB`.
let actual: Vec<_> = FilterEvents::<_, DefaultConfig, (EventA, EventB)>::new(
events_stream(&metadata),
)
.map(|e| e.unwrap())
.collect()
.await;
let expected = vec![
FilteredEventDetails {
phase: Phase::Initialization,
block_hash: <DefaultConfig as Config>::Hash::default(),
event: (Some(EventA(1)), None),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(0),
block_hash: <DefaultConfig as Config>::Hash::default(),
event: (None, Some(EventB(true))),
},
FilteredEventDetails {
phase: Phase::Finalization,
block_hash: <DefaultConfig as Config>::Hash::default(),
event: (Some(EventA(2)), None),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(1),
block_hash: <DefaultConfig as Config>::Hash::default(),
event: (None, Some(EventB(false))),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(2),
block_hash: <DefaultConfig as Config>::Hash::default(),
event: (None, Some(EventB(true))),
},
FilteredEventDetails {
phase: Phase::ApplyExtrinsic(3),
block_hash: <DefaultConfig as Config>::Hash::default(),
event: (Some(EventA(3)), None),
},
];
assert_eq!(actual, expected);
}
#[async_std::test]
async fn filter_no_events_from_stream() {
let metadata = metadata::<PalletEvents>();
// Filter out fake event stream to select events matching `EventC` (none exist).
let actual: Vec<_> =
FilterEvents::<_, DefaultConfig, (EventC,)>::new(events_stream(&metadata))
.map(|e| e.unwrap())
.collect()
.await;
assert_eq!(actual, vec![]);
}
}
+40
View File
@@ -0,0 +1,40 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is part of subxt.
//
// subxt is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// subxt is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.
//! For working with events.
mod decoding;
mod event_subscription;
mod events_type;
mod filter_events;
pub use decoding::EventsDecodingError;
pub use event_subscription::{
subscribe,
subscribe_finalized,
EventSubscription,
};
pub use events_type::{
at,
EventDetails,
Events,
RawEventDetails,
};
pub use filter_events::{
EventFilter,
FilterEvents,
FilteredEventDetails,
};
+2 -2
View File
@@ -514,9 +514,9 @@ impl<'client, T: Config, Evs: Decode> TransactionEvents<'client, T, Evs> {
/// Iterate through the transaction events using metadata to dynamically decode and skip
/// them, and return the first event found which decodes to the provided `Ev` type.
///
/// This works in the same way that [`events::Events::find_first_event()`] does, with the
/// This works in the same way that [`events::Events::find_first()`] does, with the
/// exception that it ignores events not related to the submitted extrinsic.
pub fn find_first_event<Ev: crate::Event>(&self) -> Result<Option<Ev>, BasicError> {
pub fn find_first<Ev: crate::Event>(&self) -> Result<Option<Ev>, BasicError> {
self.find::<Ev>().next().transpose()
}
+8 -10
View File
@@ -78,7 +78,7 @@ async fn subscription_produces_events_each_block() -> Result<(), subxt::BasicErr
.await
.expect("events expected each block")?;
let success_event = events
.find_first_event::<system::events::ExtrinsicSuccess>()
.find_first::<system::events::ExtrinsicSuccess>()
.expect("decode error");
// Every now and then I get no bytes back for the first block events;
// I assume that this might be the case for the genesis block, so don't
@@ -100,14 +100,12 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> {
let ctx = test_context().await;
// Subscribe to balance transfer events, ignoring all else.
let event_sub = ctx.api.events().subscribe().await?.filter_map(|events| {
async move {
let events = events.ok()?;
events
.find_first_event::<balances::events::Transfer>()
.ok()?
}
});
let event_sub = ctx
.api
.events()
.subscribe()
.await?
.filter_events::<(balances::events::Transfer,)>();
// Calling `.next()` on the above borrows it, and the `filter_map`
// means it's no longer `Unpin`, so we pin it on the stack:
@@ -125,7 +123,7 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> {
// Wait for the next balance transfer event in our subscription stream
// and check that it lines up:
let event = event_sub.next().await.unwrap();
let event = event_sub.next().await.unwrap().unwrap().event;
assert_eq!(
event,
balances::events::Transfer {
+4 -4
View File
@@ -63,11 +63,11 @@ async fn tx_basic_transfer() -> Result<(), subxt::Error<DispatchError>> {
.wait_for_finalized_success()
.await?;
let event = events
.find_first_event::<balances::events::Transfer>()
.find_first::<balances::events::Transfer>()
.expect("Failed to decode balances::events::Transfer")
.expect("Failed to find balances::events::Transfer");
let _extrinsic_success = events
.find_first_event::<system::events::ExtrinsicSuccess>()
.find_first::<system::events::ExtrinsicSuccess>()
.expect("Failed to decode ExtrinisicSuccess")
.expect("Failed to find ExtrinisicSuccess");
@@ -125,7 +125,7 @@ async fn storage_balance_lock() -> Result<(), subxt::Error<DispatchError>> {
.await?
.wait_for_finalized_success()
.await?
.find_first_event::<system::events::ExtrinsicSuccess>()?
.find_first::<system::events::ExtrinsicSuccess>()?
.expect("No ExtrinsicSuccess Event found");
let locks = cxt
@@ -205,7 +205,7 @@ async fn transfer_implicit_subscription() {
.wait_for_finalized_success()
.await
.unwrap()
.find_first_event::<balances::events::Transfer>()
.find_first::<balances::events::Transfer>()
.expect("Can decode events")
.expect("Can find balance transfer event");
+4 -4
View File
@@ -100,13 +100,13 @@ impl ContractsTestContext {
.await?;
let code_stored = events
.find_first_event::<events::CodeStored>()?
.find_first::<events::CodeStored>()?
.ok_or_else(|| Error::Other("Failed to find a CodeStored event".into()))?;
let instantiated = events
.find_first_event::<events::Instantiated>()?
.find_first::<events::Instantiated>()?
.ok_or_else(|| Error::Other("Failed to find a Instantiated event".into()))?;
let _extrinsic_success = events
.find_first_event::<system::events::ExtrinsicSuccess>()?
.find_first::<system::events::ExtrinsicSuccess>()?
.ok_or_else(|| {
Error::Other("Failed to find a ExtrinsicSuccess event".into())
})?;
@@ -141,7 +141,7 @@ impl ContractsTestContext {
log::info!("Instantiate result: {:?}", result);
let instantiated = result
.find_first_event::<events::Instantiated>()?
.find_first::<events::Instantiated>()?
.ok_or_else(|| Error::Other("Failed to find a Instantiated event".into()))?;
Ok(instantiated.contract)
+3 -1
View File
@@ -28,7 +28,9 @@ use sp_keyring::AccountKeyring;
async fn storage_plain_lookup() -> Result<(), subxt::Error<DispatchError>> {
let ctx = test_context().await;
// Look up a plain value
// Look up a plain value. Wait long enough that we don't get the genesis block data,
// because it may have no storage associated with it.
async_std::task::sleep(std::time::Duration::from_secs(6)).await;
let entry = ctx.api.storage().timestamp().now(None).await?;
assert!(entry > 0);