mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 01:41:09 +00:00
Events sub (#126)
* Make event subscription logic more generic. * Fix build. * Add test-node. * Update deps. * Address review comments.
This commit is contained in:
@@ -114,10 +114,12 @@ mod tests {
|
||||
Error,
|
||||
RuntimeError,
|
||||
},
|
||||
events::EventsDecoder,
|
||||
signer::{
|
||||
PairSigner,
|
||||
Signer,
|
||||
},
|
||||
subscription::EventSubscription,
|
||||
system::AccountStoreExt,
|
||||
tests::{
|
||||
test_client,
|
||||
@@ -201,4 +203,28 @@ mod tests {
|
||||
panic!("expected an error");
|
||||
}
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
async fn test_transfer_subscription() {
|
||||
env_logger::try_init().ok();
|
||||
let alice = PairSigner::new(AccountKeyring::Alice.pair());
|
||||
let bob = AccountKeyring::Bob.to_account_id();
|
||||
let (client, _) = test_client().await;
|
||||
let sub = client.subscribe_events().await.unwrap();
|
||||
let mut decoder = EventsDecoder::<TestRuntime>::new(client.metadata().clone());
|
||||
decoder.with_balances();
|
||||
let mut sub = EventSubscription::<TestRuntime>::new(sub, decoder);
|
||||
sub.filter_event::<TransferEvent<_>>();
|
||||
client.transfer(&alice, &bob, 10_000).await.unwrap();
|
||||
let raw = sub.next().await.unwrap().unwrap();
|
||||
let event = TransferEvent::<TestRuntime>::decode(&mut &raw.data[..]).unwrap();
|
||||
assert_eq!(
|
||||
event,
|
||||
TransferEvent {
|
||||
from: alice.account_id().clone(),
|
||||
to: bob.clone(),
|
||||
amount: 10_000,
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
+5
-3
@@ -68,6 +68,7 @@ mod metadata;
|
||||
mod rpc;
|
||||
mod runtimes;
|
||||
mod signer;
|
||||
mod subscription;
|
||||
|
||||
pub use crate::{
|
||||
error::Error,
|
||||
@@ -87,6 +88,7 @@ pub use crate::{
|
||||
},
|
||||
runtimes::*,
|
||||
signer::*,
|
||||
subscription::*,
|
||||
substrate_subxt_proc_macro::*,
|
||||
};
|
||||
use crate::{
|
||||
@@ -446,6 +448,7 @@ mod tests {
|
||||
pub(crate) type TestRuntime = crate::NodeTemplateRuntime;
|
||||
|
||||
pub(crate) async fn test_client() -> (Client<TestRuntime>, TempDir) {
|
||||
env_logger::try_init().ok();
|
||||
let tmp = TempDir::new("subxt-").expect("failed to create tempdir");
|
||||
let config = SubxtClientConfig {
|
||||
impl_name: "substrate-subxt-full-client",
|
||||
@@ -456,8 +459,8 @@ mod tests {
|
||||
path: tmp.path().into(),
|
||||
cache_size: 128,
|
||||
},
|
||||
builder: node_template::service::new_full,
|
||||
chain_spec: node_template::chain_spec::development_config(),
|
||||
builder: test_node::service::new_full,
|
||||
chain_spec: test_node::chain_spec::development_config(),
|
||||
role: Role::Authority(AccountKeyring::Alice),
|
||||
};
|
||||
let client = ClientBuilder::new()
|
||||
@@ -470,7 +473,6 @@ mod tests {
|
||||
|
||||
#[async_std::test]
|
||||
async fn test_tx_transfer_balance() {
|
||||
env_logger::try_init().ok();
|
||||
let mut signer = PairSigner::new(AccountKeyring::Alice.pair());
|
||||
let dest = AccountKeyring::Bob.to_account_id().into();
|
||||
|
||||
|
||||
+32
-72
@@ -70,13 +70,12 @@ use crate::{
|
||||
RawEvent,
|
||||
},
|
||||
frame::{
|
||||
system::{
|
||||
Phase,
|
||||
System,
|
||||
},
|
||||
system::System,
|
||||
Event,
|
||||
},
|
||||
metadata::Metadata,
|
||||
runtimes::Runtime,
|
||||
subscription::EventSubscription,
|
||||
};
|
||||
|
||||
pub type ChainBlock<T> =
|
||||
@@ -107,12 +106,12 @@ where
|
||||
}
|
||||
|
||||
/// Client for substrate rpc interfaces
|
||||
pub struct Rpc<T: System> {
|
||||
pub struct Rpc<T: Runtime> {
|
||||
client: Client,
|
||||
marker: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T: System> Clone for Rpc<T> {
|
||||
impl<T: Runtime> Clone for Rpc<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
client: self.client.clone(),
|
||||
@@ -121,7 +120,7 @@ impl<T: System> Clone for Rpc<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: System> Rpc<T> {
|
||||
impl<T: Runtime> Rpc<T> {
|
||||
pub fn new(client: Client) -> Self {
|
||||
Self {
|
||||
client,
|
||||
@@ -256,7 +255,7 @@ impl<T: System> Rpc<T> {
|
||||
/// Subscribe to substrate System Events
|
||||
pub async fn subscribe_events(
|
||||
&self,
|
||||
) -> Result<Subscription<StorageChangeSet<<T as System>::Hash>>, Error> {
|
||||
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
|
||||
let mut storage_key = twox_128(b"System").to_vec();
|
||||
storage_key.extend(twox_128(b"Events").to_vec());
|
||||
log::debug!("Events storage key {:?}", hex::encode(&storage_key));
|
||||
@@ -360,14 +359,31 @@ impl<T: System> Rpc<T> {
|
||||
block_hash,
|
||||
signed_block.block.extrinsics.len()
|
||||
);
|
||||
wait_for_block_events(
|
||||
decoder,
|
||||
ext_hash,
|
||||
signed_block,
|
||||
block_hash,
|
||||
events_sub,
|
||||
)
|
||||
.await
|
||||
let ext_index = signed_block
|
||||
.block
|
||||
.extrinsics
|
||||
.iter()
|
||||
.position(|ext| {
|
||||
let hash = T::Hashing::hash_of(ext);
|
||||
hash == ext_hash
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
Error::Other(format!(
|
||||
"Failed to find Extrinsic with hash {:?}",
|
||||
ext_hash,
|
||||
))
|
||||
})?;
|
||||
let mut sub = EventSubscription::new(events_sub, decoder);
|
||||
sub.filter_extrinsic(block_hash, ext_index);
|
||||
let mut events = vec![];
|
||||
while let Some(event) = sub.next().await {
|
||||
events.push(event?);
|
||||
}
|
||||
Ok(ExtrinsicSuccess {
|
||||
block: block_hash,
|
||||
extrinsic: ext_hash,
|
||||
events,
|
||||
})
|
||||
}
|
||||
None => {
|
||||
Err(format!("Failed to find block {:?}", block_hash).into())
|
||||
@@ -424,59 +440,3 @@ impl<T: System> ExtrinsicSuccess<T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for events for the block triggered by the extrinsic
|
||||
pub async fn wait_for_block_events<T: System>(
|
||||
decoder: EventsDecoder<T>,
|
||||
ext_hash: T::Hash,
|
||||
signed_block: ChainBlock<T>,
|
||||
block_hash: T::Hash,
|
||||
events_subscription: Subscription<StorageChangeSet<T::Hash>>,
|
||||
) -> Result<ExtrinsicSuccess<T>, Error> {
|
||||
let ext_index = signed_block
|
||||
.block
|
||||
.extrinsics
|
||||
.iter()
|
||||
.position(|ext| {
|
||||
let hash = T::Hashing::hash_of(ext);
|
||||
hash == ext_hash
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
Error::Other(format!("Failed to find Extrinsic with hash {:?}", ext_hash))
|
||||
})?;
|
||||
|
||||
let mut subscription = events_subscription;
|
||||
while let change_set = subscription.next().await {
|
||||
// only interested in events for the given block
|
||||
if change_set.block != block_hash {
|
||||
continue
|
||||
}
|
||||
let mut events = Vec::new();
|
||||
for (_key, data) in change_set.changes {
|
||||
if let Some(data) = data {
|
||||
match decoder.decode_events(&mut &data.0[..]) {
|
||||
Ok(raw_events) => {
|
||||
for (phase, event) in raw_events {
|
||||
if let Phase::ApplyExtrinsic(i) = phase {
|
||||
if i as usize == ext_index {
|
||||
events.push(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
return if !events.is_empty() {
|
||||
Ok(ExtrinsicSuccess {
|
||||
block: block_hash,
|
||||
extrinsic: ext_hash,
|
||||
events,
|
||||
})
|
||||
} else {
|
||||
Err(format!("No events found for block {}", block_hash).into())
|
||||
}
|
||||
}
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
@@ -0,0 +1,121 @@
|
||||
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of substrate-subxt.
|
||||
//
|
||||
// subxt is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// subxt is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with substrate-subxt. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use jsonrpsee::client::Subscription;
|
||||
use sp_core::storage::StorageChangeSet;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use crate::{
|
||||
error::Error,
|
||||
events::{
|
||||
EventsDecoder,
|
||||
RawEvent,
|
||||
},
|
||||
frame::{
|
||||
system::Phase,
|
||||
Event,
|
||||
},
|
||||
runtimes::Runtime,
|
||||
};
|
||||
|
||||
/// Event subscription simplifies filtering a storage change set stream for
|
||||
/// events of interest.
|
||||
pub struct EventSubscription<T: Runtime> {
|
||||
subscription: Subscription<StorageChangeSet<T::Hash>>,
|
||||
decoder: EventsDecoder<T>,
|
||||
block: Option<T::Hash>,
|
||||
extrinsic: Option<usize>,
|
||||
event: Option<(&'static str, &'static str)>,
|
||||
events: VecDeque<RawEvent>,
|
||||
finished: bool,
|
||||
}
|
||||
|
||||
impl<T: Runtime> EventSubscription<T> {
|
||||
/// Creates a new event subscription.
|
||||
pub fn new(
|
||||
subscription: Subscription<StorageChangeSet<T::Hash>>,
|
||||
decoder: EventsDecoder<T>,
|
||||
) -> Self {
|
||||
Self {
|
||||
subscription,
|
||||
decoder,
|
||||
block: None,
|
||||
extrinsic: None,
|
||||
event: None,
|
||||
events: Default::default(),
|
||||
finished: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Only returns events contained in the block with the given hash.
|
||||
pub fn filter_block(&mut self, block: T::Hash) {
|
||||
self.block = Some(block);
|
||||
}
|
||||
|
||||
/// Only returns events from block emitted by extrinsic with index.
|
||||
pub fn filter_extrinsic(&mut self, block: T::Hash, ext_index: usize) {
|
||||
self.block = Some(block);
|
||||
self.extrinsic = Some(ext_index);
|
||||
}
|
||||
|
||||
/// Filters events by type.
|
||||
pub fn filter_event<E: Event<T>>(&mut self) {
|
||||
self.event = Some((E::MODULE, E::EVENT));
|
||||
}
|
||||
|
||||
/// Gets the next event.
|
||||
pub async fn next(&mut self) -> Option<Result<RawEvent, Error>> {
|
||||
loop {
|
||||
if let Some(event) = self.events.pop_front() {
|
||||
return Some(Ok(event))
|
||||
}
|
||||
if self.finished {
|
||||
return None
|
||||
}
|
||||
let change_set = self.subscription.next().await;
|
||||
if let Some(hash) = self.block.as_ref() {
|
||||
if &change_set.block == hash {
|
||||
self.finished = true;
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
for (_key, data) in change_set.changes {
|
||||
if let Some(data) = data {
|
||||
let raw_events = match self.decoder.decode_events(&mut &data.0[..]) {
|
||||
Ok(events) => events,
|
||||
Err(error) => return Some(Err(error)),
|
||||
};
|
||||
for (phase, event) in raw_events {
|
||||
if let Phase::ApplyExtrinsic(i) = phase {
|
||||
if let Some(ext_index) = self.extrinsic {
|
||||
if i as usize != ext_index {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if let Some((module, variant)) = self.event {
|
||||
if event.module != module || event.variant != variant {
|
||||
continue
|
||||
}
|
||||
}
|
||||
self.events.push_back(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user