try-runtime-cli: 'instant' snapshots, threading refactor, better progress logs (#14057)

* remote externalities refactor

* remove redundant logs

* use const for parallel requests

* prefer functional

* improve variable naming

* handle requests error

* use overlayedchanges

* Revert "use overlayedchanges"

This reverts commit c0ddb87a5abdd52207597f5df66cbbdf9d79badc.

* Revert "Revert "use overlayedchanges""

This reverts commit 1d49362d9b999c045c8f970a0ab8b486bc47a90a.

* Revert "Revert "Revert "use overlayedchanges"""

This reverts commit 06df786488d94f249e9abccffac4af445f76e5a7.

* backup/load raw storage values

* test raw storage drain and restore

* update snapshot tests

* improve logs

* clippy suggestions

* address comments

* fix example

* fix test

* clippy
This commit is contained in:
Liam Aharon
2023-05-05 17:15:40 +10:00
committed by GitHub
parent e2547f5064
commit ead46b9efd
6 changed files with 278 additions and 231 deletions
@@ -22,7 +22,7 @@
use async_recursion::async_recursion;
use codec::{Decode, Encode};
use futures::{channel::mpsc, stream::StreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use jsonrpsee::{
core::params::ArrayParams,
http_client::{HttpClient, HttpClientBuilder},
@@ -36,17 +36,17 @@ use sp_core::{
well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
},
H256,
};
pub use sp_io::TestExternalities;
use sp_runtime::{traits::Block as BlockT, StateVersion};
use spinners::{Spinner, Spinners};
use std::{
cmp::{max, min},
cmp::max,
fs,
num::NonZeroUsize,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::Arc,
thread,
time::{Duration, Instant},
};
use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
@@ -61,8 +61,8 @@ const DEFAULT_HTTP_ENDPOINT: &str = "https://rpc.polkadot.io:443";
struct Snapshot<B: BlockT> {
state_version: StateVersion,
block_hash: B::Hash,
top: TopKeyValues,
child: ChildKeyValues,
raw_storage: Vec<(H256, Vec<u8>)>,
storage_root: H256,
}
/// An externalities that acts exactly the same as [`sp_io::TestExternalities`] but has a few extra
@@ -119,7 +119,7 @@ pub enum Transport {
/// Use the `URI` to open a new WebSocket connection.
Uri(String),
/// Use HTTP connection.
RemoteClient(Arc<HttpClient>),
RemoteClient(HttpClient),
}
impl Transport {
@@ -130,13 +130,6 @@ impl Transport {
}
}
fn as_client_cloned(&self) -> Option<Arc<HttpClient>> {
match self {
Self::RemoteClient(client) => Some(client.clone()),
_ => None,
}
}
// Build an HttpClient from a URI.
async fn init(&mut self) -> Result<(), &'static str> {
if let Self::Uri(uri) = self {
@@ -166,7 +159,7 @@ impl Transport {
"failed to build http client"
})?;
*self = Self::RemoteClient(Arc::new(http_client))
*self = Self::RemoteClient(http_client)
}
Ok(())
@@ -179,8 +172,8 @@ impl From<String> for Transport {
}
}
impl From<Arc<HttpClient>> for Transport {
fn from(client: Arc<HttpClient>) -> Self {
impl From<HttpClient> for Transport {
fn from(client: HttpClient) -> Self {
Transport::RemoteClient(client)
}
}
@@ -216,13 +209,6 @@ impl<B: BlockT> OnlineConfig<B> {
.expect("http client must have been initialized by now; qed.")
}
/// Return a cloned rpc (http) client, suitable for being moved to threads.
fn rpc_client_cloned(&self) -> Arc<HttpClient> {
self.transport
.as_client_cloned()
.expect("http client must have been initialized by now; qed.")
}
fn at_expected(&self) -> B::Hash {
self.at.expect("block at must be initialized; qed")
}
@@ -327,31 +313,13 @@ where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
const DEFAULT_PARALLELISM: usize = 4;
const PARALLEL_REQUESTS: usize = 4;
const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
const INITIAL_BATCH_SIZE: usize = 5000;
// NOTE: increasing this value does not seem to impact speed all that much.
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
/// Get the number of threads to use.
/// Cap the number of threads. Performance improvement beyond a small number of threads is
/// negligible, and too many threads can create issues with the HttpClient.
fn threads() -> NonZeroUsize {
let avaliable = thread::available_parallelism()
.unwrap_or(NonZeroUsize::new(1usize).expect("1 is non-zero; qed"))
.get();
assert!(avaliable > 0, "avaliable parallelism must be greater than 0");
let requested: usize = match std::env::var("TRY_RUNTIME_MAX_THREADS") {
Ok(n) => n.parse::<usize>().expect("TRY_RUNTIME_MAX_THREADS must be a number"),
Err(_) => Self::DEFAULT_PARALLELISM,
};
assert!(requested > 0, "TRY_RUNTIME_MAX_THREADS must be greater than 0");
return NonZeroUsize::new(min(requested, avaliable))
.expect("requested and avaliable are non-zero; qed")
}
async fn rpc_get_storage(
&self,
key: StorageKey,
@@ -455,7 +423,7 @@ where
/// use std::sync::Arc;
///
/// async fn example() {
/// let client = Arc::new(HttpClient::new());
/// let client = HttpClient::new();
/// let payloads = vec![
/// ("some_method".to_string(), ArrayParams::new(vec![])),
/// ("another_method".to_string(), ArrayParams::new(vec![])),
@@ -471,9 +439,10 @@ where
/// ```
#[async_recursion]
async fn get_storage_data_dynamic_batch_size(
client: &Arc<HttpClient>,
client: &HttpClient,
payloads: Vec<(String, ArrayParams)>,
batch_size: usize,
bar: &ProgressBar,
) -> Result<Vec<Option<StorageData>>, String> {
// All payloads have been processed
if payloads.is_empty() {
@@ -514,6 +483,7 @@ where
client,
payloads,
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize),
bar,
)
.await
},
@@ -521,19 +491,22 @@ where
// Collect the data from this batch
let mut data: Vec<Option<StorageData>> = vec![];
let batch_response_len = batch_response.len();
for item in batch_response.into_iter() {
match item {
Ok(x) => data.push(x),
Err(e) => return Err(e.message().to_string()),
}
}
bar.inc(batch_response_len as u64);
// Return this data joined with the remaining keys
let payloads = payloads.iter().skip(batch_size).cloned().collect::<Vec<_>>();
let remaining_payloads = payloads.iter().skip(batch_size).cloned().collect::<Vec<_>>();
let mut rest = Self::get_storage_data_dynamic_batch_size(
client,
payloads,
remaining_payloads,
max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize),
bar,
)
.await?;
data.append(&mut rest);
@@ -550,154 +523,91 @@ where
at: B::Hash,
pending_ext: &mut TestExternalities,
) -> Result<Vec<KeyValue>, &'static str> {
let keys = self.rpc_get_keys_paged(prefix.clone(), at).await?;
let start = Instant::now();
let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
let keys = self
.rpc_get_keys_paged(prefix.clone(), at)
.await?
.into_iter()
.filter(|k| !is_default_child_storage_key(&k.0))
.collect::<Vec<_>>();
sp.stop_with_message(format!(
"✅ Found {} keys ({:.2}s)",
keys.len(),
start.elapsed().as_secs_f32()
));
if keys.is_empty() {
return Ok(Default::default())
}
let client = self.as_online().rpc_client_cloned();
let threads = Self::threads().get();
let thread_chunk_size = (keys.len() + threads - 1) / threads;
let client = self.as_online().rpc_client();
let payloads = keys
.iter()
.map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
.collect::<Vec<_>>();
log::info!(
target: LOG_TARGET,
"Querying a total of {} keys from prefix {:?}, splitting among {} threads, {} keys per thread",
keys.len(),
HexDisplay::from(&prefix),
threads,
thread_chunk_size,
let bar = ProgressBar::new(payloads.len() as u64);
bar.enable_steady_tick(Duration::from_secs(1));
bar.set_message("Downloading key values".to_string());
bar.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
)
.unwrap()
.progress_chars("=>-"),
);
let payloads_chunked = payloads.chunks(&payloads.len() / Self::PARALLEL_REQUESTS);
let requests = payloads_chunked.map(|payload_chunk| {
Self::get_storage_data_dynamic_batch_size(
&client,
payload_chunk.to_vec(),
Self::INITIAL_BATCH_SIZE,
&bar,
)
});
// Execute the requests and move the Result outside.
let storage_data_result: Result<Vec<_>, _> =
futures::future::join_all(requests).await.into_iter().collect();
// Handle the Result.
let storage_data = match storage_data_result {
Ok(storage_data) => storage_data.into_iter().flatten().collect::<Vec<_>>(),
Err(e) => {
log::error!(target: LOG_TARGET, "Error while getting storage data: {}", e);
return Err("Error while getting storage data")
},
};
bar.finish_with_message("✅ Downloaded key values");
print!("\n");
let mut handles = Vec::new();
let keys_chunked: Vec<Vec<StorageKey>> =
keys.chunks(thread_chunk_size).map(|s| s.into()).collect::<Vec<_>>();
// Check if we got responses for all submitted requests.
assert_eq!(keys.len(), storage_data.len());
enum Message {
/// This thread completed the assigned work.
Terminated,
/// The thread produced the following batch response.
Batch(Vec<(Vec<u8>, Vec<u8>)>),
/// A request from the batch failed.
BatchFailed(String),
}
let (tx, mut rx) = mpsc::unbounded::<Message>();
for thread_keys in keys_chunked {
let thread_sender = tx.clone();
let thread_client = client.clone();
let handle = std::thread::spawn(move || {
// Process the payloads in chunks so each thread can pass kvs back to the main
// thread to start inserting before all of the data has been queried from the node.
// Inserting data takes a very long time, so the earlier it can start the better.
let mut thread_key_values = vec![];
let chunk_size = thread_keys.len() / 1;
for thread_keys_chunk in thread_keys.chunks(chunk_size) {
let mut thread_key_chunk_values = Vec::with_capacity(thread_keys_chunk.len());
let payloads = thread_keys_chunk
.iter()
.map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
.collect::<Vec<_>>();
let rt = tokio::runtime::Runtime::new().unwrap();
let storage_data = match rt.block_on(Self::get_storage_data_dynamic_batch_size(
&thread_client,
payloads,
Self::INITIAL_BATCH_SIZE,
)) {
Ok(storage_data) => storage_data,
Err(e) => {
thread_sender.unbounded_send(Message::BatchFailed(e)).unwrap();
return Default::default()
},
};
// Check if we got responses for all submitted requests.
assert_eq!(thread_keys_chunk.len(), storage_data.len());
let mut batch_kv = Vec::with_capacity(thread_keys_chunk.len());
for (key, maybe_value) in thread_keys_chunk.iter().zip(storage_data) {
match maybe_value {
Some(data) => {
thread_key_chunk_values.push((key.clone(), data.clone()));
batch_kv.push((key.clone().0, data.0));
},
None => {
log::warn!(
target: LOG_TARGET,
"key {:?} had none corresponding value.",
&key
);
let data = StorageData(vec![]);
thread_key_chunk_values.push((key.clone(), data.clone()));
batch_kv.push((key.clone().0, data.0));
},
};
}
// Send this chunk to the main thread to start inserting.
thread_sender.unbounded_send(Message::Batch(batch_kv)).unwrap();
thread_key_values.extend(thread_key_chunk_values);
}
thread_sender.unbounded_send(Message::Terminated).unwrap();
thread_key_values
});
handles.push(handle);
}
// first, wait until all threads send a `Terminated` message, in the meantime populate
// `pending_ext`.
let mut terminated = 0usize;
let mut batch_failed = false;
let mut processed = 0usize;
loop {
match rx.next().await.unwrap() {
Message::Batch(kvs) => {
let kvs = kvs
.into_iter()
.filter(|(k, _)| !is_default_child_storage_key(k))
.collect::<Vec<_>>();
processed += kvs.len();
pending_ext.batch_insert(kvs);
log::info!(
target: LOG_TARGET,
"inserting keys progress = {:.0}% [{} / {}]",
(processed as f32 / keys.len() as f32) * 100f32,
processed,
keys.len(),
);
let key_values = keys
.iter()
.zip(storage_data)
.map(|(key, maybe_value)| match maybe_value {
Some(data) => (key.clone(), data),
None => {
log::warn!(target: LOG_TARGET, "key {:?} had none corresponding value.", &key);
let data = StorageData(vec![]);
(key.clone(), data)
},
Message::BatchFailed(error) => {
log::error!(target: LOG_TARGET, "Batch processing failed: {:?}", error);
batch_failed = true;
break
},
Message::Terminated => {
terminated += 1;
if terminated == handles.len() {
break
}
},
}
}
})
.collect::<Vec<_>>();
// Ensure all threads finished execution before returning.
let keys_and_values =
handles.into_iter().flat_map(|h| h.join().unwrap()).collect::<Vec<_>>();
if batch_failed {
return Err("Batch failed.")
}
Ok(keys_and_values)
let mut sp = Spinner::with_timer(Spinners::Dots, "Inserting keys into DB...".into());
let start = Instant::now();
pending_ext.batch_insert(key_values.clone().into_iter().map(|(k, v)| (k.0, v.0)));
sp.stop_with_message(format!(
"✅ Inserted keys into DB ({:.2}s)",
start.elapsed().as_secs_f32()
));
Ok(key_values)
}
/// Get the values corresponding to `child_keys` at the given `prefixed_top_key`.
pub(crate) async fn rpc_child_get_storage_paged(
client: &Arc<HttpClient>,
client: &HttpClient,
prefixed_top_key: &StorageKey,
child_keys: Vec<StorageKey>,
at: B::Hash,
@@ -718,10 +628,12 @@ where
})
.collect::<Vec<_>>();
let bar = ProgressBar::new(payloads.len() as u64);
let storage_data = match Self::get_storage_data_dynamic_batch_size(
client,
payloads,
Self::INITIAL_BATCH_SIZE,
&bar,
)
.await
{
@@ -814,19 +726,15 @@ where
let at = self.as_online().at_expected();
let arc_client = self.as_online().rpc_client_cloned();
let client = self.as_online().rpc_client();
let mut child_kv = vec![];
for prefixed_top_key in child_roots {
let child_keys = Self::rpc_child_get_keys(
arc_client.as_ref(),
&prefixed_top_key,
StorageKey(vec![]),
at,
)
.await?;
let child_keys =
Self::rpc_child_get_keys(&client, &prefixed_top_key, StorageKey(vec![]), at)
.await?;
let child_kv_inner =
Self::rpc_child_get_storage_paged(&arc_client, &prefixed_top_key, child_keys, at)
Self::rpc_child_get_storage_paged(&client, &prefixed_top_key, child_keys, at)
.await?;
let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
@@ -873,9 +781,9 @@ where
let elapsed = now.elapsed();
log::info!(
target: LOG_TARGET,
"adding data for hashed prefix: {:?}, took {:?}s",
"adding data for hashed prefix: {:?}, took {:.2}s",
HexDisplay::from(prefix),
elapsed.as_secs()
elapsed.as_secs_f32()
);
keys_and_values.extend(additional_key_values);
}
@@ -970,18 +878,22 @@ where
Default::default(),
self.overwrite_state_version.unwrap_or(state_version),
);
let top_kv = self.load_top_remote(&mut pending_ext).await?;
let child_kv = self.load_child_remote(&top_kv, &mut pending_ext).await?;
// Load data from the remote into `pending_ext`.
let top_kv = self.load_top_remote(&mut pending_ext).await?;
self.load_child_remote(&top_kv, &mut pending_ext).await?;
// If we need to save a snapshot, save the raw storage and root hash to the snapshot.
if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
let (raw_storage, storage_root) = pending_ext.into_raw_snapshot();
let snapshot = Snapshot::<B> {
state_version,
top: top_kv,
child: child_kv,
block_hash: self
.as_online()
.at
.expect("set to `Some` in `init_remote_client`; must be called before; qed"),
raw_storage: raw_storage.clone(),
storage_root,
};
let encoded = snapshot.encode();
log::info!(
@@ -991,6 +903,15 @@ where
path
);
std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
// pending_ext was consumed when creating the snapshot, need to reinitailize it
let mut pending_ext = TestExternalities::new_with_code_and_state(
Default::default(),
Default::default(),
self.overwrite_state_version.unwrap_or(state_version),
);
pending_ext.from_raw_snapshot(raw_storage, storage_root);
return Ok(pending_ext)
}
Ok(pending_ext)
@@ -1013,7 +934,9 @@ where
&mut self,
config: OfflineConfig,
) -> Result<RemoteExternalities<B>, &'static str> {
let Snapshot { block_hash, top, child, state_version } =
let mut sp = Spinner::with_timer(Spinners::Dots, "Loading snapshot...".into());
let start = Instant::now();
let Snapshot { block_hash, state_version, raw_storage, storage_root } =
self.load_snapshot(config.state_snapshot.path.clone())?;
let mut inner_ext = TestExternalities::new_with_code_and_state(
@@ -1021,26 +944,8 @@ where
Default::default(),
self.overwrite_state_version.unwrap_or(state_version),
);
info!(target: LOG_TARGET, "injecting a total of {} top keys", top.len());
let top = top
.into_iter()
.filter(|(k, _)| !is_default_child_storage_key(k.as_ref()))
.map(|(k, v)| (k.0, v.0))
.collect::<Vec<_>>();
inner_ext.batch_insert(top);
info!(
target: LOG_TARGET,
"injecting a total of {} child keys",
child.iter().flat_map(|(_, kv)| kv).count()
);
for (info, key_values) in child {
for (k, v) in key_values {
inner_ext.insert_child(info.clone(), k.0, v.0);
}
}
inner_ext.from_raw_snapshot(raw_storage, storage_root);
sp.stop_with_message(format!("✅ Loaded snapshot ({:.2}s)", start.elapsed().as_secs_f32()));
Ok(RemoteExternalities { inner_ext, block_hash })
}
@@ -1156,7 +1061,7 @@ mod test_prelude {
mod tests {
use super::test_prelude::*;
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn can_load_state_snapshot() {
init_logger();
Builder::<Block>::new()
@@ -1169,7 +1074,7 @@ mod tests {
.execute_with(|| {});
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn can_exclude_from_snapshot() {
init_logger();
@@ -1205,7 +1110,7 @@ mod remote_tests {
use super::test_prelude::*;
use std::os::unix::fs::MetadataExt;
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn state_version_is_kept_and_can_be_altered() {
const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
init_logger();
@@ -1246,7 +1151,7 @@ mod remote_tests {
assert_eq!(cached_ext.state_version, other);
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn snapshot_block_hash_works() {
const CACHE: &'static str = "snapshot_block_hash_works";
init_logger();
@@ -1273,7 +1178,7 @@ mod remote_tests {
assert_eq!(ext.block_hash, cached_ext.block_hash);
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn offline_else_online_works() {
const CACHE: &'static str = "offline_else_online_works_data";
init_logger();
@@ -1318,7 +1223,7 @@ mod remote_tests {
std::fs::remove_file(to_delete[0].path()).unwrap();
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn can_build_one_small_pallet() {
init_logger();
Builder::<Block>::new()
@@ -1333,7 +1238,7 @@ mod remote_tests {
.execute_with(|| {});
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn can_build_few_pallet() {
init_logger();
Builder::<Block>::new()
@@ -1373,7 +1278,7 @@ mod remote_tests {
.collect::<Vec<_>>();
let snap: Snapshot<Block> = Builder::<Block>::new().load_snapshot(CACHE.into()).unwrap();
assert!(matches!(snap, Snapshot { top, child, .. } if top.len() > 0 && child.len() == 0));
assert!(matches!(snap, Snapshot { raw_storage, .. } if raw_storage.len() > 0));
assert!(to_delete.len() == 1);
let to_delete = to_delete.first().unwrap();
@@ -1381,7 +1286,7 @@ mod remote_tests {
std::fs::remove_file(to_delete.path()).unwrap();
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn can_create_child_snapshot() {
const CACHE: &'static str = "can_create_child_snapshot";
init_logger();
@@ -1405,7 +1310,7 @@ mod remote_tests {
.collect::<Vec<_>>();
let snap: Snapshot<Block> = Builder::<Block>::new().load_snapshot(CACHE.into()).unwrap();
assert!(matches!(snap, Snapshot { top, child, .. } if top.len() > 0 && child.len() > 0));
assert!(matches!(snap, Snapshot { raw_storage, .. } if raw_storage.len() > 0));
assert!(to_delete.len() == 1);
let to_delete = to_delete.first().unwrap();
@@ -1413,7 +1318,7 @@ mod remote_tests {
std::fs::remove_file(to_delete.path()).unwrap();
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn can_build_big_pallet() {
if std::option_env!("TEST_WS").is_none() {
return
@@ -1432,7 +1337,7 @@ mod remote_tests {
.execute_with(|| {});
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn can_fetch_all() {
if std::option_env!("TEST_WS").is_none() {
return