Storage map iter (#148)

* Add example file

* Fmt

* Add KeyIter.

* Add iter method to store proc-macro.

* Fetch all values at once.

* Update docs.

* Run rustfmt.

Co-authored-by: Andrew Jones <ascjones@gmail.com>
This commit is contained in:
David Craven
2020-08-05 10:08:12 +02:00
committed by GitHub
parent 663934ca37
commit 271775bf99
5 changed files with 201 additions and 31 deletions
+33
View File
@@ -0,0 +1,33 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of substrate-subxt.
//
// subxt is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// subxt is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with substrate-subxt. If not, see <http://www.gnu.org/licenses/>.
use substrate_subxt::{
system::AccountStoreExt,
ClientBuilder,
DefaultNodeRuntime,
};
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let client = ClientBuilder::<DefaultNodeRuntime>::new().build().await?;
let mut iter = client.account_iter(None).await?;
while let Some((key, account)) = iter.next().await? {
println!("{:?}: {}", key, account.data.free);
}
Ok(())
}
+31 -4
View File
@@ -73,6 +73,7 @@ pub fn store(s: Structure) -> TokenStream {
let module = utils::module_name(generics);
let store_name = utils::ident_to_name(ident, "Store").to_camel_case();
let store = format_ident!("{}", store_name.to_snake_case());
let store_iter = format_ident!("{}_iter", store_name.to_snake_case());
let store_trait = format_ident!("{}StoreExt", store_name);
let bindings = utils::bindings(&s);
let fields = utils::fields(&bindings);
@@ -110,6 +111,7 @@ pub fn store(s: Structure) -> TokenStream {
let keys = filtered_fields
.iter()
.map(|(field, _)| quote!(&self.#field));
let key_iter = quote!(#subxt::KeyIter<T, #ident<#(#params),*>>);
quote! {
impl#generics #subxt::Store<T> for #ident<#(#params),*> {
@@ -139,13 +141,19 @@ pub fn store(s: Structure) -> TokenStream {
}
/// Store extension trait.
pub trait #store_trait<T: #module> {
pub trait #store_trait<T: #subxt::Runtime + #module> {
/// Retrieve the store element.
fn #store<'a>(
&'a self,
#args
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<#ret, #subxt::Error>> + Send + 'a>>;
/// Iterate over the store element.
fn #store_iter<'a>(
&'a self,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<#key_iter, #subxt::Error>> + Send + 'a>>;
}
impl<T: #subxt::Runtime + #module> #store_trait<T> for #subxt::Client<T> {
@@ -155,7 +163,14 @@ pub fn store(s: Structure) -> TokenStream {
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<#ret, #subxt::Error>> + Send + 'a>> {
let #marker = core::marker::PhantomData::<T>;
Box::pin(self.#fetch(#build_struct, hash))
Box::pin(async move { self.#fetch(&#build_struct, hash).await })
}
fn #store_iter<'a>(
&'a self,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<#key_iter, #subxt::Error>> + Send + 'a>> {
Box::pin(self.iter(hash))
}
}
}
@@ -202,13 +217,18 @@ mod tests {
}
/// Store extension trait.
pub trait AccountStoreExt<T: Balances> {
pub trait AccountStoreExt<T: substrate_subxt::Runtime + Balances> {
/// Retrieve the store element.
fn account<'a>(
&'a self,
account_id: &'a <T as System>::AccountId,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<AccountData<T::Balance>, substrate_subxt::Error>> + Send + 'a>>;
/// Iterate over the store element.
fn account_iter<'a>(
&'a self,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<substrate_subxt::KeyIter<T, AccountStore<'a, T>>, substrate_subxt::Error>> + Send + 'a>>;
}
impl<T: substrate_subxt::Runtime + Balances> AccountStoreExt<T> for substrate_subxt::Client<T> {
@@ -219,7 +239,14 @@ mod tests {
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<AccountData<T::Balance>, substrate_subxt::Error>> + Send + 'a>>
{
let _ = core::marker::PhantomData::<T>;
Box::pin(self.fetch_or_default(AccountStore { account_id, }, hash))
Box::pin(async move { self.fetch_or_default(&AccountStore { account_id, }, hash).await })
}
fn account_iter<'a>(
&'a self,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<substrate_subxt::KeyIter<T, AccountStore<'a, T>>, substrate_subxt::Error>> + Send + 'a>> {
Box::pin(self.iter(hash))
}
}
};
+2 -2
View File
@@ -89,8 +89,8 @@ subxt_test!({
account: Alice,
step: {
state: {
alice: AccountStore { account_id: &alice },
bob: AccountStore { account_id: &bob },
alice: &AccountStore { account_id: &alice },
bob: &AccountStore { account_id: &bob },
},
call: TransferCall {
to: &bob,
+119 -22
View File
@@ -55,6 +55,7 @@ use sc_rpc_api::state::ReadProof;
use sp_core::{
storage::{
StorageChangeSet,
StorageData,
StorageKey,
},
Bytes,
@@ -115,6 +116,7 @@ pub struct ClientBuilder<T: Runtime> {
_marker: std::marker::PhantomData<T>,
url: Option<String>,
client: Option<jsonrpsee::Client>,
page_size: Option<u32>,
}
impl<T: Runtime> ClientBuilder<T> {
@@ -124,6 +126,7 @@ impl<T: Runtime> ClientBuilder<T> {
_marker: std::marker::PhantomData,
url: None,
client: None,
page_size: None,
}
}
@@ -139,6 +142,12 @@ impl<T: Runtime> ClientBuilder<T> {
self
}
/// Set the page size.
pub fn set_page_size(mut self, size: u32) -> Self {
self.page_size = Some(size);
self
}
/// Creates a new Client.
pub async fn build(self) -> Result<Client<T>, Error> {
let client = if let Some(client) = self.client {
@@ -164,6 +173,7 @@ impl<T: Runtime> ClientBuilder<T> {
metadata: metadata?,
runtime_version: runtime_version?,
_marker: PhantomData,
page_size: self.page_size.unwrap_or(10),
})
}
}
@@ -175,6 +185,7 @@ pub struct Client<T: Runtime> {
metadata: Metadata,
runtime_version: RuntimeVersion,
_marker: PhantomData<(fn() -> T::Signature, T::Extra)>,
page_size: u32,
}
impl<T: Runtime> Clone for Client<T> {
@@ -185,6 +196,53 @@ impl<T: Runtime> Clone for Client<T> {
metadata: self.metadata.clone(),
runtime_version: self.runtime_version.clone(),
_marker: PhantomData,
page_size: self.page_size,
}
}
}
/// Iterates over key value pairs in a map.
pub struct KeyIter<T: Runtime, F: Store<T>> {
client: Client<T>,
_marker: PhantomData<F>,
count: u32,
hash: T::Hash,
start_key: Option<StorageKey>,
buffer: Vec<(StorageKey, StorageData)>,
}
impl<T: Runtime, F: Store<T>> KeyIter<T, F> {
/// Returns the next key value pair from a map.
pub async fn next(&mut self) -> Result<Option<(StorageKey, F::Returns)>, Error> {
loop {
if let Some((k, v)) = self.buffer.pop() {
return Ok(Some((k, Decode::decode(&mut &v.0[..])?)))
} else {
let keys = self
.client
.fetch_keys::<F>(self.count, self.start_key.take(), Some(self.hash))
.await?;
if keys.is_empty() {
return Ok(None)
}
self.start_key = keys.last().cloned();
let change_sets = self
.client
.rpc
.query_storage_at(&keys, Some(self.hash))
.await?;
for change_set in change_sets {
for (k, v) in change_set.changes {
if let Some(v) = v {
self.buffer.push((k, v));
}
}
}
debug_assert_eq!(self.buffer.len(), self.count as usize);
}
}
}
}
@@ -200,37 +258,58 @@ impl<T: Runtime> Client<T> {
&self.metadata
}
/// Fetch a StorageKey with default value.
pub async fn fetch_or_default<F: Store<T>>(
&self,
store: F,
hash: Option<T::Hash>,
) -> Result<F::Returns, Error> {
let key = store.key(&self.metadata)?;
if let Some(data) = self.rpc.storage(key, hash).await? {
Ok(Decode::decode(&mut &data.0[..])?)
} else {
Ok(store.default(&self.metadata)?)
}
}
/// Fetch a StorageKey an optional storage key.
/// Fetch a StorageKey with an optional block hash.
pub async fn fetch<F: Store<T>>(
&self,
store: F,
store: &F,
hash: Option<T::Hash>,
) -> Result<Option<F::Returns>, Error> {
let key = store.key(&self.metadata)?;
if let Some(data) = self.rpc.storage(key, hash).await? {
if let Some(data) = self.rpc.storage(&key, hash).await? {
Ok(Some(Decode::decode(&mut &data.0[..])?))
} else {
Ok(None)
}
}
/// Fetch up to `count` keys for a storage map in lexicographic order.
///
/// Supports pagination by passing a value to `start_key`.
/// Fetch a StorageKey that has a default value with an optional block hash.
pub async fn fetch_or_default<F: Store<T>>(
&self,
store: &F,
hash: Option<T::Hash>,
) -> Result<F::Returns, Error> {
if let Some(data) = self.fetch(store, hash).await? {
Ok(data)
} else {
Ok(store.default(&self.metadata)?)
}
}
/// Returns an iterator of key value pairs.
pub async fn iter<F: Store<T>>(
&self,
hash: Option<T::Hash>,
) -> Result<KeyIter<T, F>, Error> {
let hash = if let Some(hash) = hash {
hash
} else {
self.block_hash(None)
.await?
.expect("didn't pass a block number; qed")
};
Ok(KeyIter {
client: self.clone(),
hash,
count: self.page_size,
start_key: None,
buffer: Default::default(),
_marker: PhantomData,
})
}
/// Fetch up to `count` keys for a storage map in lexicographic order.
///
/// Supports pagination by passing a value to `start_key`.
pub async fn fetch_keys<F: Store<T>>(
&self,
count: u32,
@@ -238,7 +317,10 @@ impl<T: Runtime> Client<T> {
hash: Option<T::Hash>,
) -> Result<Vec<StorageKey>, Error> {
let prefix = <F as Store<T>>::prefix(&self.metadata)?;
let keys = self.rpc.storage_keys_paged(Some(prefix), count, start_key, hash).await?;
let keys = self
.rpc
.storage_keys_paged(Some(prefix), count, start_key, hash)
.await?;
Ok(keys)
}
@@ -513,6 +595,7 @@ mod tests {
SubxtClient::from_config(config, test_node::service::new_full)
.expect("Error creating subxt client"),
)
.set_page_size(2)
.build()
.await
.expect("Error creating client");
@@ -630,7 +713,21 @@ mod tests {
#[async_std::test]
async fn test_fetch_keys() {
let (client, _) = test_client().await;
let keys = client.fetch_keys::<system::AccountStore<_>>(4, None, None).await.unwrap();
let keys = client
.fetch_keys::<system::AccountStore<_>>(4, None, None)
.await
.unwrap();
assert_eq!(keys.len(), 4)
}
#[async_std::test]
async fn test_iter() {
let (client, _) = test_client().await;
let mut iter = client.iter::<system::AccountStore<_>>(None).await.unwrap();
let mut i = 0;
while let Some(_) = iter.next().await.unwrap() {
i += 1;
}
assert_eq!(i, 4);
}
}
+16 -3
View File
@@ -122,7 +122,7 @@ impl<T: Runtime> Rpc<T> {
/// Fetch a storage key
pub async fn storage(
&self,
key: StorageKey,
key: &StorageKey,
hash: Option<T::Hash>,
) -> Result<Option<StorageData>, Error> {
let params = Params::Array(vec![to_json_value(key)?, to_json_value(hash)?]);
@@ -132,8 +132,8 @@ impl<T: Runtime> Rpc<T> {
}
/// Returns the keys with prefix with pagination support.
/// Up to `count` keys will be returned.
/// If `start_key` is passed, return next keys in storage in lexicographic order.
/// Up to `count` keys will be returned.
/// If `start_key` is passed, return next keys in storage in lexicographic order.
pub async fn storage_keys_paged(
&self,
prefix: Option<StorageKey>,
@@ -170,6 +170,19 @@ impl<T: Runtime> Rpc<T> {
.map_err(Into::into)
}
/// Query historical storage entries
pub async fn query_storage_at(
&self,
keys: &[StorageKey],
at: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<<T as System>::Hash>>, Error> {
let params = Params::Array(vec![to_json_value(keys)?, to_json_value(at)?]);
self.client
.request("state_queryStorage", params)
.await
.map_err(Into::into)
}
/// Fetch the genesis hash
pub async fn genesis_hash(&self) -> Result<T::Hash, Error> {
let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0)));