mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 19:01:08 +00:00
implement variant of subscription that returns finalized storage changes (#237)
Signed-off-by: Gregory Hill <gregorydhill@outlook.com>
This commit is contained in:
+29
-5
@@ -110,7 +110,11 @@ pub use crate::{
|
||||
SystemProperties,
|
||||
},
|
||||
runtimes::*,
|
||||
subscription::*,
|
||||
subscription::{
|
||||
EventStorageSubscription,
|
||||
EventSubscription,
|
||||
FinalizedEventStorageSubscription,
|
||||
},
|
||||
substrate_subxt_proc_macro::*,
|
||||
};
|
||||
use crate::{
|
||||
@@ -133,6 +137,7 @@ pub struct ClientBuilder<T: Runtime> {
|
||||
page_size: Option<u32>,
|
||||
event_type_registry: EventTypeRegistry<T>,
|
||||
skip_type_sizes_check: bool,
|
||||
accept_weak_inclusion: bool,
|
||||
}
|
||||
|
||||
impl<T: Runtime> ClientBuilder<T> {
|
||||
@@ -144,6 +149,7 @@ impl<T: Runtime> ClientBuilder<T> {
|
||||
page_size: None,
|
||||
event_type_registry: EventTypeRegistry::new(),
|
||||
skip_type_sizes_check: false,
|
||||
accept_weak_inclusion: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,6 +193,12 @@ impl<T: Runtime> ClientBuilder<T> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Only check that transactions are InBlock on submit.
|
||||
pub fn accept_weak_inclusion(mut self) -> Self {
|
||||
self.accept_weak_inclusion = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Creates a new Client.
|
||||
pub async fn build<'a>(self) -> Result<Client<T>, Error> {
|
||||
let client = if let Some(client) = self.client {
|
||||
@@ -202,7 +214,10 @@ impl<T: Runtime> ClientBuilder<T> {
|
||||
RpcClient::Http(Arc::new(client))
|
||||
}
|
||||
};
|
||||
let rpc = Rpc::new(client);
|
||||
let mut rpc = Rpc::new(client);
|
||||
if self.accept_weak_inclusion {
|
||||
rpc.accept_weak_inclusion();
|
||||
}
|
||||
let (metadata, genesis_hash, runtime_version, properties) = future::join4(
|
||||
rpc.metadata(),
|
||||
rpc.genesis_hash(),
|
||||
@@ -466,13 +481,22 @@ impl<T: Runtime> Client<T> {
|
||||
}
|
||||
|
||||
/// Subscribe to events.
|
||||
pub async fn subscribe_events(
|
||||
&self,
|
||||
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
|
||||
///
|
||||
/// *WARNING* these may not be included in the finalized chain, use
|
||||
/// `subscribe_finalized_events` to ensure events are finalized.
|
||||
pub async fn subscribe_events(&self) -> Result<EventStorageSubscription<T>, Error> {
|
||||
let events = self.rpc.subscribe_events().await?;
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
/// Subscribe to finalized events.
|
||||
pub async fn subscribe_finalized_events(
|
||||
&self,
|
||||
) -> Result<EventStorageSubscription<T>, Error> {
|
||||
let events = self.rpc.subscribe_finalized_events().await?;
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
/// Subscribe to new blocks.
|
||||
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
|
||||
let headers = self.rpc.subscribe_blocks().await?;
|
||||
|
||||
+96
-57
@@ -58,7 +58,6 @@ use sp_core::{
|
||||
StorageData,
|
||||
StorageKey,
|
||||
},
|
||||
twox_128,
|
||||
Bytes,
|
||||
};
|
||||
use sp_rpc::{
|
||||
@@ -86,7 +85,12 @@ use crate::{
|
||||
},
|
||||
metadata::Metadata,
|
||||
runtimes::Runtime,
|
||||
subscription::EventSubscription,
|
||||
subscription::{
|
||||
EventStorageSubscription,
|
||||
EventSubscription,
|
||||
FinalizedEventStorageSubscription,
|
||||
SystemEvents,
|
||||
},
|
||||
};
|
||||
|
||||
pub type ChainBlock<T> =
|
||||
@@ -256,6 +260,7 @@ pub struct ReadProof<Hash> {
|
||||
pub struct Rpc<T: Runtime> {
|
||||
client: RpcClient,
|
||||
marker: PhantomData<T>,
|
||||
accept_weak_inclusion: bool,
|
||||
}
|
||||
|
||||
impl<T: Runtime> Clone for Rpc<T> {
|
||||
@@ -263,6 +268,7 @@ impl<T: Runtime> Clone for Rpc<T> {
|
||||
Self {
|
||||
client: self.client.clone(),
|
||||
marker: PhantomData,
|
||||
accept_weak_inclusion: self.accept_weak_inclusion,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -272,9 +278,16 @@ impl<T: Runtime> Rpc<T> {
|
||||
Self {
|
||||
client,
|
||||
marker: PhantomData,
|
||||
accept_weak_inclusion: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Configure the Rpc to accept non-finalized blocks
|
||||
/// in `submit_and_watch_extrinsic`
|
||||
pub fn accept_weak_inclusion(&mut self) {
|
||||
self.accept_weak_inclusion = true;
|
||||
}
|
||||
|
||||
/// Fetch a storage key
|
||||
pub async fn storage(
|
||||
&self,
|
||||
@@ -439,22 +452,31 @@ impl<T: Runtime> Rpc<T> {
|
||||
Ok(version)
|
||||
}
|
||||
|
||||
/// Subscribe to substrate System Events
|
||||
pub async fn subscribe_events(
|
||||
&self,
|
||||
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
|
||||
let mut storage_key = twox_128(b"System").to_vec();
|
||||
storage_key.extend(twox_128(b"Events").to_vec());
|
||||
log::debug!("Events storage key {:?}", hex::encode(&storage_key));
|
||||
|
||||
let keys = Some(vec![StorageKey(storage_key)]);
|
||||
/// Subscribe to System Events that are imported into blocks.
|
||||
///
|
||||
/// *WARNING* these may not be included in the finalized chain, use
|
||||
/// `subscribe_finalized_events` to ensure events are finalized.
|
||||
pub async fn subscribe_events(&self) -> Result<EventStorageSubscription<T>, Error> {
|
||||
let keys = Some(vec![StorageKey::from(SystemEvents::new())]);
|
||||
let params = Params::Array(vec![to_json_value(keys)?]);
|
||||
|
||||
let subscription = self
|
||||
.client
|
||||
.subscribe("state_subscribeStorage", params, "state_unsubscribeStorage")
|
||||
.await?;
|
||||
Ok(subscription)
|
||||
Ok(EventStorageSubscription::Imported(subscription))
|
||||
}
|
||||
|
||||
/// Subscribe to finalized events.
|
||||
pub async fn subscribe_finalized_events(
|
||||
&self,
|
||||
) -> Result<EventStorageSubscription<T>, Error> {
|
||||
Ok(EventStorageSubscription::Finalized(
|
||||
FinalizedEventStorageSubscription::new(
|
||||
self.clone(),
|
||||
self.subscribe_finalized_blocks().await?,
|
||||
),
|
||||
))
|
||||
}
|
||||
|
||||
/// Subscribe to blocks.
|
||||
@@ -464,7 +486,7 @@ impl<T: Runtime> Rpc<T> {
|
||||
.subscribe(
|
||||
"chain_subscribeNewHeads",
|
||||
Params::None,
|
||||
"chain_subscribeNewHeads",
|
||||
"chain_unsubscribeNewHeads",
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -480,7 +502,7 @@ impl<T: Runtime> Rpc<T> {
|
||||
.subscribe(
|
||||
"chain_subscribeFinalizedHeads",
|
||||
Params::None,
|
||||
"chain_subscribeFinalizedHeads",
|
||||
"chain_unsubscribeFinalizedHeads",
|
||||
)
|
||||
.await?;
|
||||
Ok(subscription)
|
||||
@@ -526,56 +548,27 @@ impl<T: Runtime> Rpc<T> {
|
||||
let ext_hash = T::Hashing::hash_of(&extrinsic);
|
||||
log::info!("Submitting Extrinsic `{:?}`", ext_hash);
|
||||
|
||||
let events_sub = self.subscribe_events().await?;
|
||||
let events_sub = if self.accept_weak_inclusion {
|
||||
self.subscribe_events().await
|
||||
} else {
|
||||
self.subscribe_finalized_events().await
|
||||
}?;
|
||||
let mut xt_sub = self.watch_extrinsic(extrinsic).await?;
|
||||
|
||||
while let Some(status) = xt_sub.next().await {
|
||||
// log::info!("received status {:?}", status);
|
||||
log::info!("received status {:?}", status);
|
||||
match status {
|
||||
// ignore in progress extrinsic for now
|
||||
TransactionStatus::Future
|
||||
| TransactionStatus::Ready
|
||||
| TransactionStatus::Broadcast(_) => continue,
|
||||
TransactionStatus::InBlock(block_hash) => {
|
||||
log::info!("Fetching block {:?}", block_hash);
|
||||
let block = self.block(Some(block_hash)).await?;
|
||||
return match block {
|
||||
Some(signed_block) => {
|
||||
log::info!(
|
||||
"Found block {:?}, with {} extrinsics",
|
||||
block_hash,
|
||||
signed_block.block.extrinsics.len()
|
||||
);
|
||||
let ext_index = signed_block
|
||||
.block
|
||||
.extrinsics
|
||||
.iter()
|
||||
.position(|ext| {
|
||||
let hash = T::Hashing::hash_of(ext);
|
||||
hash == ext_hash
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
Error::Other(format!(
|
||||
"Failed to find Extrinsic with hash {:?}",
|
||||
ext_hash,
|
||||
))
|
||||
})?;
|
||||
let mut sub = EventSubscription::new(events_sub, &decoder);
|
||||
sub.filter_extrinsic(block_hash, ext_index);
|
||||
let mut events = vec![];
|
||||
while let Some(event) = sub.next().await {
|
||||
events.push(event?);
|
||||
}
|
||||
Ok(ExtrinsicSuccess {
|
||||
block: block_hash,
|
||||
extrinsic: ext_hash,
|
||||
events,
|
||||
})
|
||||
}
|
||||
None => {
|
||||
Err(format!("Failed to find block {:?}", block_hash).into())
|
||||
}
|
||||
if self.accept_weak_inclusion {
|
||||
return self
|
||||
.process_block(events_sub, decoder, block_hash, ext_hash)
|
||||
.await
|
||||
}
|
||||
continue
|
||||
}
|
||||
TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()),
|
||||
TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()),
|
||||
@@ -583,9 +576,11 @@ impl<T: Runtime> Rpc<T> {
|
||||
TransactionStatus::Retracted(_) => {
|
||||
return Err("Extrinsic Retracted".into())
|
||||
}
|
||||
// should have made it `InBlock` before either of these
|
||||
TransactionStatus::Finalized(_) => {
|
||||
return Err("Extrinsic Finalized".into())
|
||||
TransactionStatus::Finalized(block_hash) => {
|
||||
// read finalized blocks by default
|
||||
return self
|
||||
.process_block(events_sub, decoder, block_hash, ext_hash)
|
||||
.await
|
||||
}
|
||||
TransactionStatus::FinalityTimeout(_) => {
|
||||
return Err("Extrinsic FinalityTimeout".into())
|
||||
@@ -595,6 +590,50 @@ impl<T: Runtime> Rpc<T> {
|
||||
Err(RpcError::Custom("RPC subscription dropped".into()).into())
|
||||
}
|
||||
|
||||
async fn process_block<'a>(
|
||||
&self,
|
||||
events_sub: EventStorageSubscription<T>,
|
||||
decoder: &'a EventsDecoder<T>,
|
||||
block_hash: T::Hash,
|
||||
ext_hash: T::Hash,
|
||||
) -> Result<ExtrinsicSuccess<T>, Error> {
|
||||
log::info!("Fetching block {:?}", block_hash);
|
||||
if let Some(signed_block) = self.block(Some(block_hash)).await? {
|
||||
log::info!(
|
||||
"Found block {:?}, with {} extrinsics",
|
||||
block_hash,
|
||||
signed_block.block.extrinsics.len()
|
||||
);
|
||||
let ext_index = signed_block
|
||||
.block
|
||||
.extrinsics
|
||||
.iter()
|
||||
.position(|ext| {
|
||||
let hash = T::Hashing::hash_of(ext);
|
||||
hash == ext_hash
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
Error::Other(format!(
|
||||
"Failed to find Extrinsic with hash {:?}",
|
||||
ext_hash,
|
||||
))
|
||||
})?;
|
||||
let mut sub = EventSubscription::new(events_sub, &decoder);
|
||||
sub.filter_extrinsic(block_hash, ext_index);
|
||||
let mut events = vec![];
|
||||
while let Some(event) = sub.next().await {
|
||||
events.push(event?);
|
||||
}
|
||||
Ok(ExtrinsicSuccess {
|
||||
block: block_hash,
|
||||
extrinsic: ext_hash,
|
||||
events,
|
||||
})
|
||||
} else {
|
||||
Err(format!("Failed to find block {:?}", block_hash).into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a key into the keystore.
|
||||
pub async fn insert_key(
|
||||
&self,
|
||||
|
||||
+83
-3
@@ -16,7 +16,14 @@
|
||||
|
||||
use jsonrpsee_types::error::Error as RpcError;
|
||||
use jsonrpsee_ws_client::WsSubscription as Subscription;
|
||||
use sp_core::storage::StorageChangeSet;
|
||||
use sp_core::{
|
||||
storage::{
|
||||
StorageChangeSet,
|
||||
StorageKey,
|
||||
},
|
||||
twox_128,
|
||||
};
|
||||
use sp_runtime::traits::Header;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use crate::{
|
||||
@@ -30,13 +37,14 @@ use crate::{
|
||||
system::Phase,
|
||||
Event,
|
||||
},
|
||||
rpc::Rpc,
|
||||
runtimes::Runtime,
|
||||
};
|
||||
|
||||
/// Event subscription simplifies filtering a storage change set stream for
|
||||
/// events of interest.
|
||||
pub struct EventSubscription<'a, T: Runtime> {
|
||||
subscription: Subscription<StorageChangeSet<T::Hash>>,
|
||||
subscription: EventStorageSubscription<T>,
|
||||
decoder: &'a EventsDecoder<T>,
|
||||
block: Option<T::Hash>,
|
||||
extrinsic: Option<usize>,
|
||||
@@ -48,7 +56,7 @@ pub struct EventSubscription<'a, T: Runtime> {
|
||||
impl<'a, T: Runtime> EventSubscription<'a, T> {
|
||||
/// Creates a new event subscription.
|
||||
pub fn new(
|
||||
subscription: Subscription<StorageChangeSet<T::Hash>>,
|
||||
subscription: EventStorageSubscription<T>,
|
||||
decoder: &'a EventsDecoder<T>,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -132,3 +140,75 @@ impl<'a, T: Runtime> EventSubscription<'a, T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SystemEvents(StorageKey);
|
||||
|
||||
impl SystemEvents {
|
||||
pub(crate) fn new() -> Self {
|
||||
let mut storage_key = twox_128(b"System").to_vec();
|
||||
storage_key.extend(twox_128(b"Events").to_vec());
|
||||
log::debug!("Events storage key {:?}", hex::encode(&storage_key));
|
||||
Self(StorageKey(storage_key))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SystemEvents> for StorageKey {
|
||||
fn from(key: SystemEvents) -> Self {
|
||||
key.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Event subscription to only fetch finalized storage changes.
|
||||
pub struct FinalizedEventStorageSubscription<T: Runtime> {
|
||||
rpc: Rpc<T>,
|
||||
subscription: Subscription<T::Header>,
|
||||
storage_changes: VecDeque<StorageChangeSet<T::Hash>>,
|
||||
storage_key: StorageKey,
|
||||
}
|
||||
|
||||
impl<T: Runtime> FinalizedEventStorageSubscription<T> {
|
||||
/// Creates a new finalized event storage subscription.
|
||||
pub fn new(rpc: Rpc<T>, subscription: Subscription<T::Header>) -> Self {
|
||||
Self {
|
||||
rpc,
|
||||
subscription,
|
||||
storage_changes: Default::default(),
|
||||
storage_key: SystemEvents::new().into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the next change_set.
|
||||
pub async fn next(&mut self) -> Option<StorageChangeSet<T::Hash>> {
|
||||
loop {
|
||||
if let Some(storage_change) = self.storage_changes.pop_front() {
|
||||
return Some(storage_change)
|
||||
}
|
||||
let header: T::Header = self.subscription.next().await?;
|
||||
if let Ok(storage_changes) = self
|
||||
.rpc
|
||||
.query_storage_at(&[self.storage_key.clone()], Some(header.hash()))
|
||||
.await
|
||||
{
|
||||
self.storage_changes.extend(storage_changes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper over imported and finalized event subscriptions.
|
||||
pub enum EventStorageSubscription<T: Runtime> {
|
||||
/// Events that are InBlock
|
||||
Imported(Subscription<StorageChangeSet<T::Hash>>),
|
||||
/// Events that are Finalized
|
||||
Finalized(FinalizedEventStorageSubscription<T>),
|
||||
}
|
||||
|
||||
impl<T: Runtime> EventStorageSubscription<T> {
|
||||
/// Gets the next change_set from the subscription.
|
||||
pub async fn next(&mut self) -> Option<StorageChangeSet<T::Hash>> {
|
||||
match self {
|
||||
Self::Imported(event_sub) => event_sub.next().await,
|
||||
Self::Finalized(event_sub) => event_sub.next().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user