diff --git a/examples/fetch_all_accounts.rs b/examples/fetch_all_accounts.rs new file mode 100644 index 0000000000..b202f2992f --- /dev/null +++ b/examples/fetch_all_accounts.rs @@ -0,0 +1,33 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of substrate-subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with substrate-subxt. If not, see . + +use substrate_subxt::{ + system::AccountStoreExt, + ClientBuilder, + DefaultNodeRuntime, +}; + +#[async_std::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let client = ClientBuilder::::new().build().await?; + let mut iter = client.account_iter(None).await?; + while let Some((key, account)) = iter.next().await? { + println!("{:?}: {}", key, account.data.free); + } + Ok(()) +} diff --git a/proc-macro/src/store.rs b/proc-macro/src/store.rs index 5dcabe0d53..19c4568f4a 100644 --- a/proc-macro/src/store.rs +++ b/proc-macro/src/store.rs @@ -73,6 +73,7 @@ pub fn store(s: Structure) -> TokenStream { let module = utils::module_name(generics); let store_name = utils::ident_to_name(ident, "Store").to_camel_case(); let store = format_ident!("{}", store_name.to_snake_case()); + let store_iter = format_ident!("{}_iter", store_name.to_snake_case()); let store_trait = format_ident!("{}StoreExt", store_name); let bindings = utils::bindings(&s); let fields = utils::fields(&bindings); @@ -110,6 +111,7 @@ pub fn store(s: Structure) -> TokenStream { let keys = filtered_fields .iter() .map(|(field, _)| quote!(&self.#field)); + let key_iter = quote!(#subxt::KeyIter>); quote! { impl#generics #subxt::Store for #ident<#(#params),*> { @@ -139,13 +141,19 @@ pub fn store(s: Structure) -> TokenStream { } /// Store extension trait. - pub trait #store_trait { + pub trait #store_trait { /// Retrieve the store element. fn #store<'a>( &'a self, #args hash: Option, ) -> core::pin::Pin> + Send + 'a>>; + + /// Iterate over the store element. + fn #store_iter<'a>( + &'a self, + hash: Option, + ) -> core::pin::Pin> + Send + 'a>>; } impl #store_trait for #subxt::Client { @@ -155,7 +163,14 @@ pub fn store(s: Structure) -> TokenStream { hash: Option, ) -> core::pin::Pin> + Send + 'a>> { let #marker = core::marker::PhantomData::; - Box::pin(self.#fetch(#build_struct, hash)) + Box::pin(async move { self.#fetch(&#build_struct, hash).await }) + } + + fn #store_iter<'a>( + &'a self, + hash: Option, + ) -> core::pin::Pin> + Send + 'a>> { + Box::pin(self.iter(hash)) } } } @@ -202,13 +217,18 @@ mod tests { } /// Store extension trait. - pub trait AccountStoreExt { + pub trait AccountStoreExt { /// Retrieve the store element. fn account<'a>( &'a self, account_id: &'a ::AccountId, hash: Option, ) -> core::pin::Pin, substrate_subxt::Error>> + Send + 'a>>; + /// Iterate over the store element. + fn account_iter<'a>( + &'a self, + hash: Option, + ) -> core::pin::Pin>, substrate_subxt::Error>> + Send + 'a>>; } impl AccountStoreExt for substrate_subxt::Client { @@ -219,7 +239,14 @@ mod tests { ) -> core::pin::Pin, substrate_subxt::Error>> + Send + 'a>> { let _ = core::marker::PhantomData::; - Box::pin(self.fetch_or_default(AccountStore { account_id, }, hash)) + Box::pin(async move { self.fetch_or_default(&AccountStore { account_id, }, hash).await }) + } + + fn account_iter<'a>( + &'a self, + hash: Option, + ) -> core::pin::Pin>, substrate_subxt::Error>> + Send + 'a>> { + Box::pin(self.iter(hash)) } } }; diff --git a/proc-macro/tests/balances.rs b/proc-macro/tests/balances.rs index ea8c5ae6ec..0f167e9803 100644 --- a/proc-macro/tests/balances.rs +++ b/proc-macro/tests/balances.rs @@ -89,8 +89,8 @@ subxt_test!({ account: Alice, step: { state: { - alice: AccountStore { account_id: &alice }, - bob: AccountStore { account_id: &bob }, + alice: &AccountStore { account_id: &alice }, + bob: &AccountStore { account_id: &bob }, }, call: TransferCall { to: &bob, diff --git a/src/lib.rs b/src/lib.rs index b3c823b235..b790083cd8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,7 @@ use sc_rpc_api::state::ReadProof; use sp_core::{ storage::{ StorageChangeSet, + StorageData, StorageKey, }, Bytes, @@ -115,6 +116,7 @@ pub struct ClientBuilder { _marker: std::marker::PhantomData, url: Option, client: Option, + page_size: Option, } impl ClientBuilder { @@ -124,6 +126,7 @@ impl ClientBuilder { _marker: std::marker::PhantomData, url: None, client: None, + page_size: None, } } @@ -139,6 +142,12 @@ impl ClientBuilder { self } + /// Set the page size. + pub fn set_page_size(mut self, size: u32) -> Self { + self.page_size = Some(size); + self + } + /// Creates a new Client. pub async fn build(self) -> Result, Error> { let client = if let Some(client) = self.client { @@ -164,6 +173,7 @@ impl ClientBuilder { metadata: metadata?, runtime_version: runtime_version?, _marker: PhantomData, + page_size: self.page_size.unwrap_or(10), }) } } @@ -175,6 +185,7 @@ pub struct Client { metadata: Metadata, runtime_version: RuntimeVersion, _marker: PhantomData<(fn() -> T::Signature, T::Extra)>, + page_size: u32, } impl Clone for Client { @@ -185,6 +196,53 @@ impl Clone for Client { metadata: self.metadata.clone(), runtime_version: self.runtime_version.clone(), _marker: PhantomData, + page_size: self.page_size, + } + } +} + +/// Iterates over key value pairs in a map. +pub struct KeyIter> { + client: Client, + _marker: PhantomData, + count: u32, + hash: T::Hash, + start_key: Option, + buffer: Vec<(StorageKey, StorageData)>, +} + +impl> KeyIter { + /// Returns the next key value pair from a map. + pub async fn next(&mut self) -> Result, Error> { + loop { + if let Some((k, v)) = self.buffer.pop() { + return Ok(Some((k, Decode::decode(&mut &v.0[..])?))) + } else { + let keys = self + .client + .fetch_keys::(self.count, self.start_key.take(), Some(self.hash)) + .await?; + + if keys.is_empty() { + return Ok(None) + } + + self.start_key = keys.last().cloned(); + + let change_sets = self + .client + .rpc + .query_storage_at(&keys, Some(self.hash)) + .await?; + for change_set in change_sets { + for (k, v) in change_set.changes { + if let Some(v) = v { + self.buffer.push((k, v)); + } + } + } + debug_assert_eq!(self.buffer.len(), self.count as usize); + } } } } @@ -200,37 +258,58 @@ impl Client { &self.metadata } - /// Fetch a StorageKey with default value. - pub async fn fetch_or_default>( - &self, - store: F, - hash: Option, - ) -> Result { - let key = store.key(&self.metadata)?; - if let Some(data) = self.rpc.storage(key, hash).await? { - Ok(Decode::decode(&mut &data.0[..])?) - } else { - Ok(store.default(&self.metadata)?) - } - } - - /// Fetch a StorageKey an optional storage key. + /// Fetch a StorageKey with an optional block hash. pub async fn fetch>( &self, - store: F, + store: &F, hash: Option, ) -> Result, Error> { let key = store.key(&self.metadata)?; - if let Some(data) = self.rpc.storage(key, hash).await? { + if let Some(data) = self.rpc.storage(&key, hash).await? { Ok(Some(Decode::decode(&mut &data.0[..])?)) } else { Ok(None) } } - /// Fetch up to `count` keys for a storage map in lexicographic order. - /// - /// Supports pagination by passing a value to `start_key`. + /// Fetch a StorageKey that has a default value with an optional block hash. + pub async fn fetch_or_default>( + &self, + store: &F, + hash: Option, + ) -> Result { + if let Some(data) = self.fetch(store, hash).await? { + Ok(data) + } else { + Ok(store.default(&self.metadata)?) + } + } + + /// Returns an iterator of key value pairs. + pub async fn iter>( + &self, + hash: Option, + ) -> Result, Error> { + let hash = if let Some(hash) = hash { + hash + } else { + self.block_hash(None) + .await? + .expect("didn't pass a block number; qed") + }; + Ok(KeyIter { + client: self.clone(), + hash, + count: self.page_size, + start_key: None, + buffer: Default::default(), + _marker: PhantomData, + }) + } + + /// Fetch up to `count` keys for a storage map in lexicographic order. + /// + /// Supports pagination by passing a value to `start_key`. pub async fn fetch_keys>( &self, count: u32, @@ -238,7 +317,10 @@ impl Client { hash: Option, ) -> Result, Error> { let prefix = >::prefix(&self.metadata)?; - let keys = self.rpc.storage_keys_paged(Some(prefix), count, start_key, hash).await?; + let keys = self + .rpc + .storage_keys_paged(Some(prefix), count, start_key, hash) + .await?; Ok(keys) } @@ -513,6 +595,7 @@ mod tests { SubxtClient::from_config(config, test_node::service::new_full) .expect("Error creating subxt client"), ) + .set_page_size(2) .build() .await .expect("Error creating client"); @@ -630,7 +713,21 @@ mod tests { #[async_std::test] async fn test_fetch_keys() { let (client, _) = test_client().await; - let keys = client.fetch_keys::>(4, None, None).await.unwrap(); + let keys = client + .fetch_keys::>(4, None, None) + .await + .unwrap(); assert_eq!(keys.len(), 4) } + + #[async_std::test] + async fn test_iter() { + let (client, _) = test_client().await; + let mut iter = client.iter::>(None).await.unwrap(); + let mut i = 0; + while let Some(_) = iter.next().await.unwrap() { + i += 1; + } + assert_eq!(i, 4); + } } diff --git a/src/rpc.rs b/src/rpc.rs index ecb2a055bf..dd2aeff40e 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -122,7 +122,7 @@ impl Rpc { /// Fetch a storage key pub async fn storage( &self, - key: StorageKey, + key: &StorageKey, hash: Option, ) -> Result, Error> { let params = Params::Array(vec![to_json_value(key)?, to_json_value(hash)?]); @@ -132,8 +132,8 @@ impl Rpc { } /// Returns the keys with prefix with pagination support. - /// Up to `count` keys will be returned. - /// If `start_key` is passed, return next keys in storage in lexicographic order. + /// Up to `count` keys will be returned. + /// If `start_key` is passed, return next keys in storage in lexicographic order. pub async fn storage_keys_paged( &self, prefix: Option, @@ -170,6 +170,19 @@ impl Rpc { .map_err(Into::into) } + /// Query historical storage entries + pub async fn query_storage_at( + &self, + keys: &[StorageKey], + at: Option, + ) -> Result::Hash>>, Error> { + let params = Params::Array(vec![to_json_value(keys)?, to_json_value(at)?]); + self.client + .request("state_queryStorage", params) + .await + .map_err(Into::into) + } + /// Fetch the genesis hash pub async fn genesis_hash(&self) -> Result { let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0)));