Batch fetching storage values again (#1199)

* re-batchify fetching storage values

* cargo fmt

* make page size static again; probably makes more sense to make it configurable if that is needed enough

* clippy
This commit is contained in:
James Wilson
2023-10-11 14:33:37 +02:00
committed by GitHub
parent 2e908025f0
commit d44941fb1b
2 changed files with 82 additions and 55 deletions
+73 -51
View File
@@ -71,15 +71,29 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
key: Vec<u8>, key: Vec<u8>,
at: T::Hash, at: T::Hash,
) -> Result<StreamOfResults<Vec<u8>>, Error> { ) -> Result<StreamOfResults<Vec<u8>>, Error> {
Ok(StreamOf(Box::pin(StorageFetchDescendantKeysStream { let keys = StorageFetchDescendantKeysStream {
at, at,
key, key,
methods: self.methods.clone(), methods: self.methods.clone(),
done: Default::default(), done: Default::default(),
keys: Default::default(),
keys_fut: Default::default(), keys_fut: Default::default(),
pagination_start_key: None, pagination_start_key: None,
}))) };
let keys = keys.flat_map(|keys| {
match keys {
Err(e) => {
// If there's an error, return that next:
Either::Left(stream::iter(std::iter::once(Err(e))))
}
Ok(keys) => {
// Or, stream each "ok" value:
Either::Right(stream::iter(keys.into_iter().map(Ok)))
}
}
});
Ok(StreamOf(Box::pin(keys)))
} }
async fn storage_fetch_descendant_values( async fn storage_fetch_descendant_values(
@@ -92,15 +106,14 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
key, key,
methods: self.methods.clone(), methods: self.methods.clone(),
done: Default::default(), done: Default::default(),
keys: Default::default(),
keys_fut: Default::default(), keys_fut: Default::default(),
pagination_start_key: Default::default(), pagination_start_key: None,
}; };
Ok(StreamOf(Box::pin(StorageFetchDescendantValuesStream { Ok(StreamOf(Box::pin(StorageFetchDescendantValuesStream {
keys: keys_stream, keys: keys_stream,
next_key: None, results_fut: None,
value_fut: Default::default(), results: Default::default(),
}))) })))
} }
@@ -319,6 +332,9 @@ where
}) })
} }
/// How many keys/values to fetch at once.
const STORAGE_PAGE_SIZE: u32 = 32;
/// This provides a stream of values given some prefix `key`. It /// This provides a stream of values given some prefix `key`. It
/// internally manages pagination and such. /// internally manages pagination and such.
pub struct StorageFetchDescendantKeysStream<T: Config> { pub struct StorageFetchDescendantKeysStream<T: Config> {
@@ -329,33 +345,23 @@ pub struct StorageFetchDescendantKeysStream<T: Config> {
pagination_start_key: Option<Vec<u8>>, pagination_start_key: Option<Vec<u8>>,
// Keys, future and cached: // Keys, future and cached:
keys_fut: Option<Pin<Box<dyn Future<Output = Result<Vec<Vec<u8>>, Error>> + Send + 'static>>>, keys_fut: Option<Pin<Box<dyn Future<Output = Result<Vec<Vec<u8>>, Error>> + Send + 'static>>>,
keys: VecDeque<Vec<u8>>,
// Set to true when we're done: // Set to true when we're done:
done: bool, done: bool,
} }
impl<T: Config> std::marker::Unpin for StorageFetchDescendantKeysStream<T> {} impl<T: Config> std::marker::Unpin for StorageFetchDescendantKeysStream<T> {}
// How many storage keys to ask for each time.
const STORAGE_FETCH_PAGE_SIZE: u32 = 32;
impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> { impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
type Item = Result<Vec<u8>, Error>; type Item = Result<Vec<Vec<u8>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let mut this = self.as_mut(); let mut this = self.as_mut();
loop {
// We're already done. // We're already done.
if this.done { if this.done {
return Poll::Ready(None); return Poll::Ready(None);
} }
// We have some keys to hand back already, so do that. // Poll future to fetch next keys.
if let Some(key) = this.keys.pop_front() {
return Poll::Ready(Some(Ok(key)));
}
// Else, we don't have any keys, but we have a fut to get more so poll it.
if let Some(mut keys_fut) = this.keys_fut.take() { if let Some(mut keys_fut) = this.keys_fut.take() {
let Poll::Ready(keys) = keys_fut.poll_unpin(cx) else { let Poll::Ready(keys) = keys_fut.poll_unpin(cx) else {
this.keys_fut = Some(keys_fut); this.keys_fut = Some(keys_fut);
@@ -371,9 +377,8 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
} }
// The last key is where we want to paginate from next time. // The last key is where we want to paginate from next time.
this.pagination_start_key = keys.last().cloned(); this.pagination_start_key = keys.last().cloned();
// Got new keys; loop around to start returning them. // return all of the keys from this run.
this.keys = keys.into_iter().collect(); return Poll::Ready(Some(Ok(keys)));
continue;
} }
Err(e) => { Err(e) => {
// Error getting keys? Return it. // Error getting keys? Return it.
@@ -391,7 +396,7 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
methods methods
.state_get_keys_paged( .state_get_keys_paged(
&key, &key,
STORAGE_FETCH_PAGE_SIZE, STORAGE_PAGE_SIZE,
pagination_start_key.as_deref(), pagination_start_key.as_deref(),
Some(at), Some(at),
) )
@@ -406,10 +411,18 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
pub struct StorageFetchDescendantValuesStream<T: Config> { pub struct StorageFetchDescendantValuesStream<T: Config> {
// Stream of keys. // Stream of keys.
keys: StorageFetchDescendantKeysStream<T>, keys: StorageFetchDescendantKeysStream<T>,
next_key: Option<Vec<u8>>, // Then we track the future to get the values back for each key:
// Then we track the next value: results_fut: Option<
value_fut: Pin<
Option<Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, Error>> + Send + 'static>>>, Box<
dyn Future<Output = Result<Option<VecDeque<(Vec<u8>, Vec<u8>)>>, Error>>
+ Send
+ 'static,
>,
>,
>,
// And finally we return each result back one at a time:
results: VecDeque<(Vec<u8>, Vec<u8>)>,
} }
impl<T: Config> Stream for StorageFetchDescendantValuesStream<T> { impl<T: Config> Stream for StorageFetchDescendantValuesStream<T> {
@@ -417,47 +430,56 @@ impl<T: Config> Stream for StorageFetchDescendantValuesStream<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut(); let mut this = self.as_mut();
loop { loop {
// If we're waiting on the next value then poll that future: // If we have results back, return them one by one
if let Some(mut value_fut) = this.value_fut.take() { if let Some((key, value)) = this.results.pop_front() {
match value_fut.poll_unpin(cx) { let res = StorageResponse { key, value };
Poll::Ready(Ok(Some(value))) => { return Poll::Ready(Some(Ok(res)));
let key = this.next_key.take().expect("key should exist"); }
return Poll::Ready(Some(Ok(StorageResponse { key, value })));
// If we're waiting on the next results then poll that future:
if let Some(mut results_fut) = this.results_fut.take() {
match results_fut.poll_unpin(cx) {
Poll::Ready(Ok(Some(results))) => {
this.results = results;
continue;
} }
Poll::Ready(Ok(None)) => { Poll::Ready(Ok(None)) => {
// No value back for some key? Skip. // No values back for some keys? Skip.
continue; continue;
} }
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))), Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => { Poll::Pending => {
this.value_fut = Some(value_fut); this.results_fut = Some(results_fut);
return Poll::Pending; return Poll::Pending;
} }
} }
} }
// Else, if we have the next key then let's start waiting on the next value. match this.keys.poll_next_unpin(cx) {
if let Some(key) = &this.next_key { Poll::Ready(Some(Ok(keys))) => {
let key = key.clone();
let methods = this.keys.methods.clone(); let methods = this.keys.methods.clone();
let at = this.keys.at; let at = this.keys.at;
let fut = async move { methods.state_get_storage(&key, Some(at)).await }; let results_fut = async move {
let keys = keys.iter().map(|k| &**k);
let values = methods.state_query_storage_at(keys, Some(at)).await?;
let values: VecDeque<_> = values
.into_iter()
.flat_map(|v| {
v.changes.into_iter().filter_map(|(k, v)| {
let v = v?;
Some((k.0, v.0))
})
})
.collect();
Ok(Some(values))
};
this.value_fut = Some(Box::pin(fut)); this.results_fut = Some(Box::pin(results_fut));
continue;
}
// Else, poll the keys stream to get the next key.
match this.keys.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(key))) => {
this.next_key = Some(key);
continue; continue;
} }
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => { Poll::Pending => return Poll::Pending,
return Poll::Pending;
}
} }
} }
} }
+7 -2
View File
@@ -73,7 +73,10 @@ impl<T: Config> LegacyRpcMethods<T> {
Ok(data.into_iter().map(|b| b.0).collect()) Ok(data.into_iter().map(|b| b.0).collect())
} }
/// Query historical storage entries /// Query historical storage entries in the range from the start block to the end block,
/// defaulting the end block to the current best block if it's not given. The first
/// [`StorageChangeSet`] returned has all of the values for each key, and subsequent ones
/// only contain values for any keys which have changed since the last.
pub async fn state_query_storage( pub async fn state_query_storage(
&self, &self,
keys: impl IntoIterator<Item = &[u8]>, keys: impl IntoIterator<Item = &[u8]>,
@@ -88,7 +91,9 @@ impl<T: Config> LegacyRpcMethods<T> {
.map_err(Into::into) .map_err(Into::into)
} }
/// Query historical storage entries /// Query storage entries at some block, using the best block if none is given.
/// This essentially provides a way to ask for a batch of values given a batch of keys,
/// despite the name of the [`StorageChangeSet`] type.
pub async fn state_query_storage_at( pub async fn state_query_storage_at(
&self, &self,
keys: impl IntoIterator<Item = &[u8]>, keys: impl IntoIterator<Item = &[u8]>,