try-runtime: dynamic storage query sizes (#13923)

* improve batch rpc error message

* wip aimd storage data fetch

* complete aimd function refactor

* make batch_request function async

* improve function name

* fix load_child_remote issue

* slight efficiency improvement

* improve logs and variable name

* remove redundant comment

* improve comment

* address pr comments

* Update utils/frame/remote-externalities/src/lib.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* simplify client handling

* fix type issue

* fix clippy issue

* try to trigger ci

* try to trigger ci

---------

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
Liam Aharon
2023-04-21 19:16:40 +10:00
committed by GitHub
parent eb17581e3c
commit bc1a599284
4 changed files with 318 additions and 239 deletions
@@ -20,8 +20,13 @@
//! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate
//! based chain, or a local state snapshot file.
use async_recursion::async_recursion;
use codec::{Decode, Encode};
use futures::{channel::mpsc, stream::StreamExt};
use jsonrpsee::{
core::params::ArrayParams,
http_client::{HttpClient, HttpClientBuilder},
};
use log::*;
use serde::de::DeserializeOwned;
use sp_core::{
@@ -35,6 +40,7 @@ use sp_core::{
pub use sp_io::TestExternalities;
use sp_runtime::{traits::Block as BlockT, StateVersion};
use std::{
cmp::max,
fs,
num::NonZeroUsize,
ops::{Deref, DerefMut},
@@ -42,19 +48,14 @@ use std::{
sync::Arc,
thread,
};
use substrate_rpc_client::{
rpc_params, ws_client, BatchRequestBuilder, ChainApi, ClientT, StateApi, WsClient,
};
use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
type KeyValue = (StorageKey, StorageData);
type TopKeyValues = Vec<KeyValue>;
type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
const LOG_TARGET: &str = "remote-ext";
const DEFAULT_WS_ENDPOINT: &str = "wss://rpc.polkadot.io:443";
const DEFAULT_VALUE_DOWNLOAD_BATCH: usize = 4096;
// NOTE: increasing this value does not seem to impact speed all that much.
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
const DEFAULT_HTTP_ENDPOINT: &str = "https://rpc.polkadot.io:443";
/// The snapshot that we store on disk.
#[derive(Decode, Encode)]
struct Snapshot<B: BlockT> {
@@ -117,36 +118,51 @@ pub struct OfflineConfig {
pub enum Transport {
/// Use the `URI` to open a new WebSocket connection.
Uri(String),
/// Use existing WebSocket connection.
RemoteClient(Arc<WsClient>),
/// Use HTTP connection.
RemoteClient(Arc<HttpClient>),
}
impl Transport {
fn as_client(&self) -> Option<&WsClient> {
fn as_client(&self) -> Option<&HttpClient> {
match self {
Self::RemoteClient(client) => Some(client),
_ => None,
}
}
fn as_client_cloned(&self) -> Option<Arc<WsClient>> {
fn as_client_cloned(&self) -> Option<Arc<HttpClient>> {
match self {
Self::RemoteClient(client) => Some(client.clone()),
_ => None,
}
}
// Open a new WebSocket connection if it's not connected.
async fn map_uri(&mut self) -> Result<(), &'static str> {
// Build an HttpClient from a URI.
async fn init(&mut self) -> Result<(), &'static str> {
if let Self::Uri(uri) = self {
log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri);
let ws_client = ws_client(uri).await.map_err(|e| {
// If we have a ws uri, try to convert it to an http uri.
// We use an HTTP client rather than WS because WS starts to choke with "accumulated
// message length exceeds maximum" errors after processing ~10k keys when fetching
// from a node running a default configuration.
let uri = if uri.starts_with("ws://") {
let uri = uri.replace("ws://", "http://");
log::info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
uri
} else if uri.starts_with("wss://") {
let uri = uri.replace("wss://", "https://");
log::info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
uri
} else {
uri.clone()
};
let http_client = HttpClientBuilder::default().build(uri).map_err(|e| {
log::error!(target: LOG_TARGET, "error: {:?}", e);
"failed to build ws client"
"failed to build http client"
})?;
*self = Self::RemoteClient(Arc::new(ws_client))
*self = Self::RemoteClient(Arc::new(http_client))
}
Ok(())
@@ -159,8 +175,8 @@ impl From<String> for Transport {
}
}
impl From<Arc<WsClient>> for Transport {
fn from(client: Arc<WsClient>) -> Self {
impl From<Arc<HttpClient>> for Transport {
fn from(client: Arc<HttpClient>) -> Self {
Transport::RemoteClient(client)
}
}
@@ -189,18 +205,18 @@ pub struct OnlineConfig<B: BlockT> {
}
impl<B: BlockT> OnlineConfig<B> {
/// Return rpc (ws) client reference.
fn rpc_client(&self) -> &WsClient {
/// Return rpc (http) client reference.
fn rpc_client(&self) -> &HttpClient {
self.transport
.as_client()
.expect("ws client must have been initialized by now; qed.")
.expect("http client must have been initialized by now; qed.")
}
/// Return a cloned rpc (ws) client, suitable for being moved to threads.
fn rpc_client_cloned(&self) -> Arc<WsClient> {
/// Return a cloned rpc (http) client, suitable for being moved to threads.
fn rpc_client_cloned(&self) -> Arc<HttpClient> {
self.transport
.as_client_cloned()
.expect("ws client must have been initialized by now; qed.")
.expect("http client must have been initialized by now; qed.")
}
fn at_expected(&self) -> B::Hash {
@@ -211,7 +227,7 @@ impl<B: BlockT> OnlineConfig<B> {
impl<B: BlockT> Default for OnlineConfig<B> {
fn default() -> Self {
Self {
transport: Transport::from(DEFAULT_WS_ENDPOINT.to_owned()),
transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()),
child_trie: true,
at: None,
state_snapshot: None,
@@ -307,10 +323,16 @@ where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
const INITIAL_BATCH_SIZE: usize = 5000;
// NOTE: increasing this value does not seem to impact speed all that much.
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
/// Get the number of threads to use.
fn threads() -> NonZeroUsize {
thread::available_parallelism()
.unwrap_or(NonZeroUsize::new(4usize).expect("4 is non-zero; qed"))
.unwrap_or(NonZeroUsize::new(1usize).expect("4 is non-zero; qed"))
}
async fn rpc_get_storage(
@@ -352,7 +374,7 @@ where
.rpc_client()
.storage_keys_paged(
Some(prefix.clone()),
DEFAULT_KEY_DOWNLOAD_PAGE,
Self::DEFAULT_KEY_DOWNLOAD_PAGE,
last_key.clone(),
Some(at),
)
@@ -365,7 +387,7 @@ where
all_keys.extend(page);
if page_len < DEFAULT_KEY_DOWNLOAD_PAGE as usize {
if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
break all_keys
} else {
@@ -384,6 +406,123 @@ where
Ok(keys)
}
/// Fetches storage data from a node using a dynamic batch size.
///
/// This function adjusts the batch size on the fly to help prevent overwhelming the node with
/// large batch requests, and stay within request size limits enforced by the node.
///
/// # Arguments
///
/// * `client` - An `Arc` wrapped `HttpClient` used for making the requests.
/// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams`
/// * `batch_size` - The initial batch size to use for the request. The batch size will be
/// adjusted dynamically in case of failure.
///
/// # Returns
///
/// Returns a `Result` with a vector of `Option<StorageData>`, where each element corresponds to
/// the storage data for the given method and parameters. The result will be an `Err` with a
/// `String` error message if the request fails.
///
/// # Errors
///
/// This function will return an error if:
/// * The batch request fails and the batch size is less than 2.
/// * There are invalid batch params.
/// * There is an error in the batch response.
///
/// # Example
///
/// ```ignore
/// use your_crate::{get_storage_data_dynamic_batch_size, HttpClient, ArrayParams};
/// use std::sync::Arc;
///
/// async fn example() {
/// let client = Arc::new(HttpClient::new());
/// let payloads = vec![
/// ("some_method".to_string(), ArrayParams::new(vec![])),
/// ("another_method".to_string(), ArrayParams::new(vec![])),
/// ];
/// let initial_batch_size = 10;
///
/// let storage_data = get_storage_data_dynamic_batch_size(client, payloads, batch_size).await;
/// match storage_data {
/// Ok(data) => println!("Storage data: {:?}", data),
/// Err(e) => eprintln!("Error fetching storage data: {}", e),
/// }
/// }
/// ```
#[async_recursion]
async fn get_storage_data_dynamic_batch_size(
client: &Arc<HttpClient>,
payloads: Vec<(String, ArrayParams)>,
batch_size: usize,
) -> Result<Vec<Option<StorageData>>, String> {
// All payloads have been processed
if payloads.is_empty() {
return Ok(vec![])
};
log::debug!(
target: LOG_TARGET,
"Remaining payloads: {} Batch request size: {}",
payloads.len(),
batch_size,
);
// Payloads to attempt to process this batch
let page = payloads.iter().take(batch_size).cloned().collect::<Vec<_>>();
// Build the batch request
let mut batch = BatchRequestBuilder::new();
for (method, params) in page.iter() {
batch
.insert(method, params.clone())
.map_err(|_| "Invalid batch method and/or params")?
}
let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
Ok(batch_response) => batch_response,
Err(e) => {
if batch_size < 2 {
return Err(e.to_string())
}
log::debug!(
target: LOG_TARGET,
"Batch request failed, trying again with smaller batch size. {}",
e.to_string()
);
return Self::get_storage_data_dynamic_batch_size(
client,
payloads,
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize),
)
.await
},
};
// Collect the data from this batch
let mut data: Vec<Option<StorageData>> = vec![];
for item in batch_response.into_iter() {
match item {
Ok(x) => data.push(x),
Err(e) => return Err(e.message().to_string()),
}
}
// Return this data joined with the remaining keys
let payloads = payloads.iter().skip(batch_size).cloned().collect::<Vec<_>>();
let mut rest = Self::get_storage_data_dynamic_batch_size(
client,
payloads,
max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize),
)
.await?;
data.append(&mut rest);
Ok(data)
}
/// Synonym of `getPairs` that uses paged queries to first get the keys, and then
/// map them to values one by one.
///
@@ -428,81 +567,61 @@ where
let (tx, mut rx) = mpsc::unbounded::<Message>();
for thread_keys in keys_chunked {
let thread_client = client.clone();
let thread_sender = tx.clone();
let thread_client = client.clone();
let handle = std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut thread_key_values = Vec::with_capacity(thread_keys.len());
// Process the payloads in chunks so each thread can pass kvs back to the main
// thread to start inserting before all of the data has been queried from the node.
// Inserting data takes a very long time, so the earlier it can start the better.
let mut thread_key_values = vec![];
let chunk_size = thread_keys.len() / 1;
for thread_keys_chunk in thread_keys.chunks(chunk_size) {
let mut thread_key_chunk_values = Vec::with_capacity(thread_keys_chunk.len());
for chunk_keys in thread_keys.chunks(DEFAULT_VALUE_DOWNLOAD_BATCH) {
let mut batch = BatchRequestBuilder::new();
let payloads = thread_keys_chunk
.iter()
.map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
.collect::<Vec<_>>();
for key in chunk_keys.iter() {
batch
.insert("state_getStorage", rpc_params![key, at])
.map_err(|_| "Invalid batch params")
.unwrap();
}
let batch_response = rt
.block_on(thread_client.batch_request::<Option<StorageData>>(batch))
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"failed to execute batch: {:?}. Error: {:?}",
chunk_keys.iter().map(HexDisplay::from).collect::<Vec<_>>(),
e
);
"batch failed."
})
.unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
let storage_data = match rt.block_on(Self::get_storage_data_dynamic_batch_size(
&thread_client,
payloads,
Self::INITIAL_BATCH_SIZE,
)) {
Ok(storage_data) => storage_data,
Err(e) => {
thread_sender.unbounded_send(Message::BatchFailed(e)).unwrap();
return Default::default()
},
};
// Check if we got responses for all submitted requests.
assert_eq!(chunk_keys.len(), batch_response.len());
assert_eq!(thread_keys_chunk.len(), storage_data.len());
let mut batch_kv = Vec::with_capacity(chunk_keys.len());
for (key, maybe_value) in chunk_keys.into_iter().zip(batch_response) {
let mut batch_kv = Vec::with_capacity(thread_keys_chunk.len());
for (key, maybe_value) in thread_keys_chunk.iter().zip(storage_data) {
match maybe_value {
Ok(Some(data)) => {
thread_key_values.push((key.clone(), data.clone()));
Some(data) => {
thread_key_chunk_values.push((key.clone(), data.clone()));
batch_kv.push((key.clone().0, data.0));
},
Ok(None) => {
None => {
log::warn!(
target: LOG_TARGET,
"key {:?} had none corresponding value.",
&key
);
let data = StorageData(vec![]);
thread_key_values.push((key.clone(), data.clone()));
thread_key_chunk_values.push((key.clone(), data.clone()));
batch_kv.push((key.clone().0, data.0));
},
Err(e) => {
let reason = format!("key {:?} failed: {:?}", &key, e);
log::error!(target: LOG_TARGET, "Reason: {}", reason);
// Signal failures to the main thread, stop aggregating (key, value)
// pairs and return immediately an error.
thread_sender.unbounded_send(Message::BatchFailed(reason)).unwrap();
return Default::default()
},
};
if thread_key_values.len() % (thread_keys.len() / 10).max(1) == 0 {
let ratio: f64 =
thread_key_values.len() as f64 / thread_keys.len() as f64;
log::debug!(
target: LOG_TARGET,
"[thread = {:?}] progress = {:.2} [{} / {}]",
std::thread::current().id(),
ratio,
thread_key_values.len(),
thread_keys.len(),
);
}
}
// Send this batch to the main thread to start inserting.
// Send this chunk to the main thread to start inserting.
thread_sender.unbounded_send(Message::Batch(batch_kv)).unwrap();
thread_key_values.extend(thread_key_chunk_values);
}
thread_sender.unbounded_send(Message::Terminated).unwrap();
@@ -516,10 +635,21 @@ where
// `pending_ext`.
let mut terminated = 0usize;
let mut batch_failed = false;
let mut processed = 0usize;
loop {
match rx.next().await.unwrap() {
Message::Batch(kv) => {
for (k, v) in kv {
processed += 1;
if processed % 50_000 == 0 || processed == keys.len() || processed == 1 {
log::info!(
target: LOG_TARGET,
"inserting keys progress = {:.0}% [{} / {}]",
(processed as f32 / keys.len() as f32) * 100f32,
processed,
keys.len(),
);
}
// skip writing the child root data.
if is_default_child_storage_key(k.as_ref()) {
continue
@@ -554,73 +684,58 @@ where
/// Get the values corresponding to `child_keys` at the given `prefixed_top_key`.
pub(crate) async fn rpc_child_get_storage_paged(
client: &WsClient,
client: &Arc<HttpClient>,
prefixed_top_key: &StorageKey,
child_keys: Vec<StorageKey>,
at: B::Hash,
) -> Result<Vec<KeyValue>, &'static str> {
let mut child_kv_inner = vec![];
let mut batch_success = true;
let child_keys_len = child_keys.len();
for batch_child_key in child_keys.chunks(DEFAULT_VALUE_DOWNLOAD_BATCH) {
let mut batch_request = BatchRequestBuilder::new();
let payloads = child_keys
.iter()
.map(|key| {
(
"childstate_getStorage".to_string(),
rpc_params![
PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
key,
at
],
)
})
.collect::<Vec<_>>();
for key in batch_child_key {
batch_request
.insert(
"childstate_getStorage",
rpc_params![
PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
key,
at
],
)
.map_err(|_| "Invalid batch params")?;
}
let storage_data = match Self::get_storage_data_dynamic_batch_size(
client,
payloads,
Self::INITIAL_BATCH_SIZE,
)
.await
{
Ok(storage_data) => storage_data,
Err(e) => {
log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e);
return Err("batch processing failed")
},
};
let batch_response =
client.batch_request::<Option<StorageData>>(batch_request).await.map_err(|e| {
log::error!(
target: LOG_TARGET,
"failed to execute batch: {:?}. Error: {:?}",
batch_child_key,
e
);
"batch failed."
})?;
assert_eq!(child_keys_len, storage_data.len());
assert_eq!(batch_child_key.len(), batch_response.len());
for (key, maybe_value) in batch_child_key.iter().zip(batch_response) {
match maybe_value {
Ok(Some(v)) => {
child_kv_inner.push((key.clone(), v));
},
Ok(None) => {
log::warn!(
target: LOG_TARGET,
"key {:?} had none corresponding value.",
&key
);
child_kv_inner.push((key.clone(), StorageData(vec![])));
},
Err(e) => {
log::error!(target: LOG_TARGET, "key {:?} failed: {:?}", &key, e);
batch_success = false;
},
};
}
}
if batch_success {
Ok(child_kv_inner)
} else {
Err("batch failed.")
}
Ok(child_keys
.iter()
.zip(storage_data)
.map(|(key, maybe_value)| match maybe_value {
Some(v) => (key.clone(), v),
None => {
log::warn!(target: LOG_TARGET, "key {:?} had no corresponding value.", &key);
(key.clone(), StorageData(vec![]))
},
})
.collect::<Vec<_>>())
}
pub(crate) async fn rpc_child_get_keys(
client: &WsClient,
client: &HttpClient,
prefixed_top_key: &StorageKey,
child_prefix: StorageKey,
at: B::Hash,
@@ -678,107 +793,47 @@ where
return Ok(Default::default())
}
// div-ceil simulation.
let threads = Self::threads().get();
let child_roots_per_thread = (child_roots.len() + threads - 1) / threads;
info!(
target: LOG_TARGET,
"👩‍👦 scraping child-tree data from {} top keys, split among {} threads, {} top keys per thread",
"👩‍👦 scraping child-tree data from {} top keys",
child_roots.len(),
threads,
child_roots_per_thread,
);
// NOTE: the threading done here is the simpler, yet slightly un-elegant because we are
// splitting child root among threads, and it is very common for these root to have vastly
// different child tries underneath them, causing some threads to finish way faster than
// others. Certainly still better than single thread though.
let mut handles = vec![];
let client = self.as_online().rpc_client_cloned();
let at = self.as_online().at_expected();
enum Message {
Terminated,
Batch((ChildInfo, Vec<(Vec<u8>, Vec<u8>)>)),
}
let (tx, mut rx) = mpsc::unbounded::<Message>();
let arc_client = self.as_online().rpc_client_cloned();
let mut child_kv = vec![];
for prefixed_top_key in child_roots {
let child_keys = Self::rpc_child_get_keys(
arc_client.as_ref(),
&prefixed_top_key,
StorageKey(vec![]),
at,
)
.await?;
for thread_child_roots in child_roots
.chunks(child_roots_per_thread)
.map(|x| x.into())
.collect::<Vec<Vec<_>>>()
{
let thread_client = client.clone();
let thread_sender = tx.clone();
let handle = thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut thread_child_kv = vec![];
for prefixed_top_key in thread_child_roots {
let child_keys = rt.block_on(Self::rpc_child_get_keys(
&thread_client,
&prefixed_top_key,
StorageKey(vec![]),
at,
))?;
let child_kv_inner = rt.block_on(Self::rpc_child_get_storage_paged(
&thread_client,
&prefixed_top_key,
child_keys,
at,
))?;
let child_kv_inner =
Self::rpc_child_get_storage_paged(&arc_client, &prefixed_top_key, child_keys, at)
.await?;
let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
Some((ChildType::ParentKeyId, storage_key)) => storage_key,
None => {
log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key);
return Err("Invalid child key")
},
};
thread_sender
.unbounded_send(Message::Batch((
ChildInfo::new_default(un_prefixed),
child_kv_inner
.iter()
.cloned()
.map(|(k, v)| (k.0, v.0))
.collect::<Vec<_>>(),
)))
.unwrap();
thread_child_kv.push((ChildInfo::new_default(un_prefixed), child_kv_inner));
}
thread_sender.unbounded_send(Message::Terminated).unwrap();
Ok(thread_child_kv)
});
handles.push(handle);
}
// first, wait until all threads send a `Terminated` message, in the meantime populate
// `pending_ext`.
let mut terminated = 0usize;
loop {
match rx.next().await.unwrap() {
Message::Batch((info, kvs)) =>
for (k, v) in kvs {
pending_ext.insert_child(info.clone(), k, v);
},
Message::Terminated => {
terminated += 1;
if terminated == handles.len() {
break
}
let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
Some((ChildType::ParentKeyId, storage_key)) => storage_key,
None => {
log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key);
return Err("Invalid child key")
},
};
let info = ChildInfo::new_default(un_prefixed);
let key_values =
child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::<Vec<_>>();
child_kv.push((info.clone(), child_kv_inner));
for (k, v) in key_values {
pending_ext.insert_child(info.clone(), k, v);
}
}
let child_kv = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.flatten()
.collect::<Vec<_>>();
Ok(child_kv)
}
@@ -841,8 +896,8 @@ where
///
/// initializes the remote client in `transport`, and sets the `at` field, if not specified.
async fn init_remote_client(&mut self) -> Result<(), &'static str> {
// First, initialize the ws client.
self.as_online_mut().transport.map_uri().await?;
// First, initialize the http client.
self.as_online_mut().transport.init().await?;
// Then, if `at` is not set, set it.
if self.as_online().at.is_none() {