diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index edde0fed26..7c0c3da0c1 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -1181,6 +1181,19 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width", + "windows-sys 0.42.0", +] + [[package]] name = "const-oid" version = "0.9.2" @@ -2034,6 +2047,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "enum-as-inner" version = "0.5.1" @@ -2507,6 +2526,7 @@ dependencies = [ "async-recursion", "frame-support", "futures", + "indicatif", "jsonrpsee", "log", "pallet-elections-phragmen", @@ -2515,6 +2535,7 @@ dependencies = [ "sp-core", "sp-io", "sp-runtime", + "spinners", "substrate-rpc-client", "tokio", "tracing-subscriber 0.3.16", @@ -3411,6 +3432,18 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e04e2fd2b8188ea827b32ef11de88377086d690286ab35747ef7f9bf3ccb590" +[[package]] +name = "indicatif" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "unicode-width", +] + [[package]] name = "inout" version = "0.1.3" @@ -4588,6 +4621,12 @@ dependencies = [ "libc", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" + [[package]] name = "match_cfg" version = "0.1.0" @@ -5522,6 +5561,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.29.0" @@ -7617,6 +7662,12 @@ dependencies = [ "universal-hash 0.5.0", ] +[[package]] +name = "portable-atomic" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -11245,6 +11296,17 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dccf47db1b41fa1573ed27ccf5e08e3ca771cb994f776668c5ebda893b248fc" +[[package]] +name = "spinners" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08615eea740067d9899969bc2891c68a19c315cb1f66640af9a9ecb91b13bcab" +dependencies = [ + "lazy_static", + "maplit", + "strum", +] + [[package]] name = "spki" version = "0.6.0" diff --git a/substrate/primitives/state-machine/src/testing.rs b/substrate/primitives/state-machine/src/testing.rs index 1921287a64..78fec43cd7 100644 --- a/substrate/primitives/state-machine/src/testing.rs +++ b/substrate/primitives/state-machine/src/testing.rs @@ -27,7 +27,7 @@ use crate::{ StorageTransactionCache, StorageValue, TrieBackendBuilder, }; -use hash_db::Hasher; +use hash_db::{HashDB, Hasher}; use sp_core::{ offchain::testing::TestPersistentOffchainDB, storage::{ @@ -160,6 +160,34 @@ where self.extensions.register(ext); } + /// Sets raw storage key/values and a root. + /// + /// This can be used as a fast way to restore the storage state from a backup because the trie + /// does not need to be computed. + pub fn from_raw_snapshot(&mut self, raw_storage: Vec<(H::Out, Vec)>, storage_root: H::Out) { + for (k, v) in raw_storage { + self.backend.backend_storage_mut().emplace(k, hash_db::EMPTY_PREFIX, v); + } + self.backend.set_root(storage_root); + } + + /// Drains the underlying raw storage key/values and returns the root hash. + /// + /// Useful for backing up the storage in a format that can be quickly re-loaded. + /// + /// Note: This DB will be inoperable after this call. + pub fn into_raw_snapshot(mut self) -> (Vec<(H::Out, Vec)>, H::Out) { + let raw_key_values = self + .backend + .backend_storage_mut() + .drain() + .into_iter() + .map(|(k, v)| (k, v.0)) + .collect::)>>(); + + (raw_key_values, *self.backend.root()) + } + /// Return a new backend with all pending changes. /// /// In contrast to [`commit_all`](Self::commit_all) this will not panic if there are open @@ -362,6 +390,46 @@ mod tests { assert_eq!(H256::from_slice(ext.storage_root(Default::default()).as_slice()), root); } + #[test] + fn raw_storage_drain_and_restore() { + // Create a TestExternalities with some data in it. + let mut original_ext = + TestExternalities::::from((Default::default(), Default::default())); + original_ext.insert(b"doe".to_vec(), b"reindeer".to_vec()); + original_ext.insert(b"dog".to_vec(), b"puppy".to_vec()); + original_ext.insert(b"dogglesworth".to_vec(), b"cat".to_vec()); + let child_info = ChildInfo::new_default(&b"test_child"[..]); + original_ext.insert_child(child_info.clone(), b"cattytown".to_vec(), b"is_dark".to_vec()); + original_ext.insert_child(child_info.clone(), b"doggytown".to_vec(), b"is_sunny".to_vec()); + + // Drain the raw storage and root. + let root = *original_ext.backend.root(); + let (raw_storage, storage_root) = original_ext.into_raw_snapshot(); + + // Load the raw storage and root into a new TestExternalities. + let mut recovered_ext = + TestExternalities::::from((Default::default(), Default::default())); + recovered_ext.from_raw_snapshot(raw_storage, storage_root); + + // Check the storage root is the same as the original + assert_eq!(root, *recovered_ext.backend.root()); + + // Check the original storage key/values were recovered correctly + assert_eq!(recovered_ext.backend.storage(b"doe").unwrap(), Some(b"reindeer".to_vec())); + assert_eq!(recovered_ext.backend.storage(b"dog").unwrap(), Some(b"puppy".to_vec())); + assert_eq!(recovered_ext.backend.storage(b"dogglesworth").unwrap(), Some(b"cat".to_vec())); + + // Check the original child storage key/values were recovered correctly + assert_eq!( + recovered_ext.backend.child_storage(&child_info, b"cattytown").unwrap(), + Some(b"is_dark".to_vec()) + ); + assert_eq!( + recovered_ext.backend.child_storage(&child_info, b"doggytown").unwrap(), + Some(b"is_sunny".to_vec()) + ); + } + #[test] fn set_and_retrieve_code() { let mut ext = TestExternalities::::default(); diff --git a/substrate/primitives/state-machine/src/trie_backend.rs b/substrate/primitives/state-machine/src/trie_backend.rs index afbe6cbbea..abd58b3839 100644 --- a/substrate/primitives/state-machine/src/trie_backend.rs +++ b/substrate/primitives/state-machine/src/trie_backend.rs @@ -242,11 +242,21 @@ where &self.essence } + /// Get backend storage reference. + pub fn backend_storage_mut(&mut self) -> &mut S { + self.essence.backend_storage_mut() + } + /// Get backend storage reference. pub fn backend_storage(&self) -> &S { self.essence.backend_storage() } + /// Set trie root. + pub fn set_root(&mut self, root: H::Out) { + self.essence.set_root(root) + } + /// Get trie root. pub fn root(&self) -> &H::Out { self.essence.root() diff --git a/substrate/utils/frame/remote-externalities/Cargo.toml b/substrate/utils/frame/remote-externalities/Cargo.toml index d3909af344..21b652014f 100644 --- a/substrate/utils/frame/remote-externalities/Cargo.toml +++ b/substrate/utils/frame/remote-externalities/Cargo.toml @@ -24,6 +24,8 @@ tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread"] } substrate-rpc-client = { path = "../rpc/client" } futures = "0.3" async-recursion = "1.0.4" +indicatif = "0.17.3" +spinners = "4.1.0" [dev-dependencies] frame-support = { version = "4.0.0-dev", path = "../../../frame/support" } diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 283d2c280b..01c6738847 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -22,7 +22,7 @@ use async_recursion::async_recursion; use codec::{Decode, Encode}; -use futures::{channel::mpsc, stream::StreamExt}; +use indicatif::{ProgressBar, ProgressStyle}; use jsonrpsee::{ core::params::ArrayParams, http_client::{HttpClient, HttpClientBuilder}, @@ -36,17 +36,17 @@ use sp_core::{ well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX}, ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey, }, + H256, }; pub use sp_io::TestExternalities; use sp_runtime::{traits::Block as BlockT, StateVersion}; +use spinners::{Spinner, Spinners}; use std::{ - cmp::{max, min}, + cmp::max, fs, - num::NonZeroUsize, ops::{Deref, DerefMut}, path::{Path, PathBuf}, - sync::Arc, - thread, + time::{Duration, Instant}, }; use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi}; @@ -61,8 +61,8 @@ const DEFAULT_HTTP_ENDPOINT: &str = "https://rpc.polkadot.io:443"; struct Snapshot { state_version: StateVersion, block_hash: B::Hash, - top: TopKeyValues, - child: ChildKeyValues, + raw_storage: Vec<(H256, Vec)>, + storage_root: H256, } /// An externalities that acts exactly the same as [`sp_io::TestExternalities`] but has a few extra @@ -119,7 +119,7 @@ pub enum Transport { /// Use the `URI` to open a new WebSocket connection. Uri(String), /// Use HTTP connection. - RemoteClient(Arc), + RemoteClient(HttpClient), } impl Transport { @@ -130,13 +130,6 @@ impl Transport { } } - fn as_client_cloned(&self) -> Option> { - match self { - Self::RemoteClient(client) => Some(client.clone()), - _ => None, - } - } - // Build an HttpClient from a URI. async fn init(&mut self) -> Result<(), &'static str> { if let Self::Uri(uri) = self { @@ -166,7 +159,7 @@ impl Transport { "failed to build http client" })?; - *self = Self::RemoteClient(Arc::new(http_client)) + *self = Self::RemoteClient(http_client) } Ok(()) @@ -179,8 +172,8 @@ impl From for Transport { } } -impl From> for Transport { - fn from(client: Arc) -> Self { +impl From for Transport { + fn from(client: HttpClient) -> Self { Transport::RemoteClient(client) } } @@ -216,13 +209,6 @@ impl OnlineConfig { .expect("http client must have been initialized by now; qed.") } - /// Return a cloned rpc (http) client, suitable for being moved to threads. - fn rpc_client_cloned(&self) -> Arc { - self.transport - .as_client_cloned() - .expect("http client must have been initialized by now; qed.") - } - fn at_expected(&self) -> B::Hash { self.at.expect("block at must be initialized; qed") } @@ -327,31 +313,13 @@ where B::Hash: DeserializeOwned, B::Header: DeserializeOwned, { - const DEFAULT_PARALLELISM: usize = 4; + const PARALLEL_REQUESTS: usize = 4; const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10; const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50; const INITIAL_BATCH_SIZE: usize = 5000; // NOTE: increasing this value does not seem to impact speed all that much. const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000; - /// Get the number of threads to use. - /// Cap the number of threads. Performance improvement beyond a small number of threads is - /// negligible, and too many threads can create issues with the HttpClient. - fn threads() -> NonZeroUsize { - let avaliable = thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(1usize).expect("1 is non-zero; qed")) - .get(); - assert!(avaliable > 0, "avaliable parallelism must be greater than 0"); - - let requested: usize = match std::env::var("TRY_RUNTIME_MAX_THREADS") { - Ok(n) => n.parse::().expect("TRY_RUNTIME_MAX_THREADS must be a number"), - Err(_) => Self::DEFAULT_PARALLELISM, - }; - assert!(requested > 0, "TRY_RUNTIME_MAX_THREADS must be greater than 0"); - return NonZeroUsize::new(min(requested, avaliable)) - .expect("requested and avaliable are non-zero; qed") - } - async fn rpc_get_storage( &self, key: StorageKey, @@ -455,7 +423,7 @@ where /// use std::sync::Arc; /// /// async fn example() { - /// let client = Arc::new(HttpClient::new()); + /// let client = HttpClient::new(); /// let payloads = vec![ /// ("some_method".to_string(), ArrayParams::new(vec![])), /// ("another_method".to_string(), ArrayParams::new(vec![])), @@ -471,9 +439,10 @@ where /// ``` #[async_recursion] async fn get_storage_data_dynamic_batch_size( - client: &Arc, + client: &HttpClient, payloads: Vec<(String, ArrayParams)>, batch_size: usize, + bar: &ProgressBar, ) -> Result>, String> { // All payloads have been processed if payloads.is_empty() { @@ -514,6 +483,7 @@ where client, payloads, max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize), + bar, ) .await }, @@ -521,19 +491,22 @@ where // Collect the data from this batch let mut data: Vec> = vec![]; + let batch_response_len = batch_response.len(); for item in batch_response.into_iter() { match item { Ok(x) => data.push(x), Err(e) => return Err(e.message().to_string()), } } + bar.inc(batch_response_len as u64); // Return this data joined with the remaining keys - let payloads = payloads.iter().skip(batch_size).cloned().collect::>(); + let remaining_payloads = payloads.iter().skip(batch_size).cloned().collect::>(); let mut rest = Self::get_storage_data_dynamic_batch_size( client, - payloads, + remaining_payloads, max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize), + bar, ) .await?; data.append(&mut rest); @@ -550,154 +523,91 @@ where at: B::Hash, pending_ext: &mut TestExternalities, ) -> Result, &'static str> { - let keys = self.rpc_get_keys_paged(prefix.clone(), at).await?; + let start = Instant::now(); + let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); + let keys = self + .rpc_get_keys_paged(prefix.clone(), at) + .await? + .into_iter() + .filter(|k| !is_default_child_storage_key(&k.0)) + .collect::>(); + sp.stop_with_message(format!( + "✅ Found {} keys ({:.2}s)", + keys.len(), + start.elapsed().as_secs_f32() + )); if keys.is_empty() { return Ok(Default::default()) } - let client = self.as_online().rpc_client_cloned(); - let threads = Self::threads().get(); - let thread_chunk_size = (keys.len() + threads - 1) / threads; + let client = self.as_online().rpc_client(); + let payloads = keys + .iter() + .map(|key| ("state_getStorage".to_string(), rpc_params!(key, at))) + .collect::>(); - log::info!( - target: LOG_TARGET, - "Querying a total of {} keys from prefix {:?}, splitting among {} threads, {} keys per thread", - keys.len(), - HexDisplay::from(&prefix), - threads, - thread_chunk_size, + let bar = ProgressBar::new(payloads.len() as u64); + bar.enable_steady_tick(Duration::from_secs(1)); + bar.set_message("Downloading key values".to_string()); + bar.set_style( + ProgressStyle::with_template( + "[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})", + ) + .unwrap() + .progress_chars("=>-"), ); + let payloads_chunked = payloads.chunks(&payloads.len() / Self::PARALLEL_REQUESTS); + let requests = payloads_chunked.map(|payload_chunk| { + Self::get_storage_data_dynamic_batch_size( + &client, + payload_chunk.to_vec(), + Self::INITIAL_BATCH_SIZE, + &bar, + ) + }); + // Execute the requests and move the Result outside. + let storage_data_result: Result, _> = + futures::future::join_all(requests).await.into_iter().collect(); + // Handle the Result. + let storage_data = match storage_data_result { + Ok(storage_data) => storage_data.into_iter().flatten().collect::>(), + Err(e) => { + log::error!(target: LOG_TARGET, "Error while getting storage data: {}", e); + return Err("Error while getting storage data") + }, + }; + bar.finish_with_message("✅ Downloaded key values"); + print!("\n"); - let mut handles = Vec::new(); - let keys_chunked: Vec> = - keys.chunks(thread_chunk_size).map(|s| s.into()).collect::>(); + // Check if we got responses for all submitted requests. + assert_eq!(keys.len(), storage_data.len()); - enum Message { - /// This thread completed the assigned work. - Terminated, - /// The thread produced the following batch response. - Batch(Vec<(Vec, Vec)>), - /// A request from the batch failed. - BatchFailed(String), - } - - let (tx, mut rx) = mpsc::unbounded::(); - - for thread_keys in keys_chunked { - let thread_sender = tx.clone(); - let thread_client = client.clone(); - let handle = std::thread::spawn(move || { - // Process the payloads in chunks so each thread can pass kvs back to the main - // thread to start inserting before all of the data has been queried from the node. - // Inserting data takes a very long time, so the earlier it can start the better. - let mut thread_key_values = vec![]; - let chunk_size = thread_keys.len() / 1; - for thread_keys_chunk in thread_keys.chunks(chunk_size) { - let mut thread_key_chunk_values = Vec::with_capacity(thread_keys_chunk.len()); - - let payloads = thread_keys_chunk - .iter() - .map(|key| ("state_getStorage".to_string(), rpc_params!(key, at))) - .collect::>(); - - let rt = tokio::runtime::Runtime::new().unwrap(); - let storage_data = match rt.block_on(Self::get_storage_data_dynamic_batch_size( - &thread_client, - payloads, - Self::INITIAL_BATCH_SIZE, - )) { - Ok(storage_data) => storage_data, - Err(e) => { - thread_sender.unbounded_send(Message::BatchFailed(e)).unwrap(); - return Default::default() - }, - }; - - // Check if we got responses for all submitted requests. - assert_eq!(thread_keys_chunk.len(), storage_data.len()); - - let mut batch_kv = Vec::with_capacity(thread_keys_chunk.len()); - for (key, maybe_value) in thread_keys_chunk.iter().zip(storage_data) { - match maybe_value { - Some(data) => { - thread_key_chunk_values.push((key.clone(), data.clone())); - batch_kv.push((key.clone().0, data.0)); - }, - None => { - log::warn!( - target: LOG_TARGET, - "key {:?} had none corresponding value.", - &key - ); - let data = StorageData(vec![]); - thread_key_chunk_values.push((key.clone(), data.clone())); - batch_kv.push((key.clone().0, data.0)); - }, - }; - } - - // Send this chunk to the main thread to start inserting. - thread_sender.unbounded_send(Message::Batch(batch_kv)).unwrap(); - thread_key_values.extend(thread_key_chunk_values); - } - - thread_sender.unbounded_send(Message::Terminated).unwrap(); - thread_key_values - }); - - handles.push(handle); - } - - // first, wait until all threads send a `Terminated` message, in the meantime populate - // `pending_ext`. - let mut terminated = 0usize; - let mut batch_failed = false; - let mut processed = 0usize; - loop { - match rx.next().await.unwrap() { - Message::Batch(kvs) => { - let kvs = kvs - .into_iter() - .filter(|(k, _)| !is_default_child_storage_key(k)) - .collect::>(); - processed += kvs.len(); - pending_ext.batch_insert(kvs); - log::info!( - target: LOG_TARGET, - "inserting keys progress = {:.0}% [{} / {}]", - (processed as f32 / keys.len() as f32) * 100f32, - processed, - keys.len(), - ); + let key_values = keys + .iter() + .zip(storage_data) + .map(|(key, maybe_value)| match maybe_value { + Some(data) => (key.clone(), data), + None => { + log::warn!(target: LOG_TARGET, "key {:?} had none corresponding value.", &key); + let data = StorageData(vec![]); + (key.clone(), data) }, - Message::BatchFailed(error) => { - log::error!(target: LOG_TARGET, "Batch processing failed: {:?}", error); - batch_failed = true; - break - }, - Message::Terminated => { - terminated += 1; - if terminated == handles.len() { - break - } - }, - } - } + }) + .collect::>(); - // Ensure all threads finished execution before returning. - let keys_and_values = - handles.into_iter().flat_map(|h| h.join().unwrap()).collect::>(); - - if batch_failed { - return Err("Batch failed.") - } - - Ok(keys_and_values) + let mut sp = Spinner::with_timer(Spinners::Dots, "Inserting keys into DB...".into()); + let start = Instant::now(); + pending_ext.batch_insert(key_values.clone().into_iter().map(|(k, v)| (k.0, v.0))); + sp.stop_with_message(format!( + "✅ Inserted keys into DB ({:.2}s)", + start.elapsed().as_secs_f32() + )); + Ok(key_values) } /// Get the values corresponding to `child_keys` at the given `prefixed_top_key`. pub(crate) async fn rpc_child_get_storage_paged( - client: &Arc, + client: &HttpClient, prefixed_top_key: &StorageKey, child_keys: Vec, at: B::Hash, @@ -718,10 +628,12 @@ where }) .collect::>(); + let bar = ProgressBar::new(payloads.len() as u64); let storage_data = match Self::get_storage_data_dynamic_batch_size( client, payloads, Self::INITIAL_BATCH_SIZE, + &bar, ) .await { @@ -814,19 +726,15 @@ where let at = self.as_online().at_expected(); - let arc_client = self.as_online().rpc_client_cloned(); + let client = self.as_online().rpc_client(); let mut child_kv = vec![]; for prefixed_top_key in child_roots { - let child_keys = Self::rpc_child_get_keys( - arc_client.as_ref(), - &prefixed_top_key, - StorageKey(vec![]), - at, - ) - .await?; + let child_keys = + Self::rpc_child_get_keys(&client, &prefixed_top_key, StorageKey(vec![]), at) + .await?; let child_kv_inner = - Self::rpc_child_get_storage_paged(&arc_client, &prefixed_top_key, child_keys, at) + Self::rpc_child_get_storage_paged(&client, &prefixed_top_key, child_keys, at) .await?; let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0); @@ -873,9 +781,9 @@ where let elapsed = now.elapsed(); log::info!( target: LOG_TARGET, - "adding data for hashed prefix: {:?}, took {:?}s", + "adding data for hashed prefix: {:?}, took {:.2}s", HexDisplay::from(prefix), - elapsed.as_secs() + elapsed.as_secs_f32() ); keys_and_values.extend(additional_key_values); } @@ -970,18 +878,22 @@ where Default::default(), self.overwrite_state_version.unwrap_or(state_version), ); - let top_kv = self.load_top_remote(&mut pending_ext).await?; - let child_kv = self.load_child_remote(&top_kv, &mut pending_ext).await?; + // Load data from the remote into `pending_ext`. + let top_kv = self.load_top_remote(&mut pending_ext).await?; + self.load_child_remote(&top_kv, &mut pending_ext).await?; + + // If we need to save a snapshot, save the raw storage and root hash to the snapshot. if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) { + let (raw_storage, storage_root) = pending_ext.into_raw_snapshot(); let snapshot = Snapshot:: { state_version, - top: top_kv, - child: child_kv, block_hash: self .as_online() .at .expect("set to `Some` in `init_remote_client`; must be called before; qed"), + raw_storage: raw_storage.clone(), + storage_root, }; let encoded = snapshot.encode(); log::info!( @@ -991,6 +903,15 @@ where path ); std::fs::write(path, encoded).map_err(|_| "fs::write failed")?; + + // pending_ext was consumed when creating the snapshot, need to reinitailize it + let mut pending_ext = TestExternalities::new_with_code_and_state( + Default::default(), + Default::default(), + self.overwrite_state_version.unwrap_or(state_version), + ); + pending_ext.from_raw_snapshot(raw_storage, storage_root); + return Ok(pending_ext) } Ok(pending_ext) @@ -1013,7 +934,9 @@ where &mut self, config: OfflineConfig, ) -> Result, &'static str> { - let Snapshot { block_hash, top, child, state_version } = + let mut sp = Spinner::with_timer(Spinners::Dots, "Loading snapshot...".into()); + let start = Instant::now(); + let Snapshot { block_hash, state_version, raw_storage, storage_root } = self.load_snapshot(config.state_snapshot.path.clone())?; let mut inner_ext = TestExternalities::new_with_code_and_state( @@ -1021,26 +944,8 @@ where Default::default(), self.overwrite_state_version.unwrap_or(state_version), ); - - info!(target: LOG_TARGET, "injecting a total of {} top keys", top.len()); - let top = top - .into_iter() - .filter(|(k, _)| !is_default_child_storage_key(k.as_ref())) - .map(|(k, v)| (k.0, v.0)) - .collect::>(); - inner_ext.batch_insert(top); - - info!( - target: LOG_TARGET, - "injecting a total of {} child keys", - child.iter().flat_map(|(_, kv)| kv).count() - ); - - for (info, key_values) in child { - for (k, v) in key_values { - inner_ext.insert_child(info.clone(), k.0, v.0); - } - } + inner_ext.from_raw_snapshot(raw_storage, storage_root); + sp.stop_with_message(format!("✅ Loaded snapshot ({:.2}s)", start.elapsed().as_secs_f32())); Ok(RemoteExternalities { inner_ext, block_hash }) } @@ -1156,7 +1061,7 @@ mod test_prelude { mod tests { use super::test_prelude::*; - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn can_load_state_snapshot() { init_logger(); Builder::::new() @@ -1169,7 +1074,7 @@ mod tests { .execute_with(|| {}); } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn can_exclude_from_snapshot() { init_logger(); @@ -1205,7 +1110,7 @@ mod remote_tests { use super::test_prelude::*; use std::os::unix::fs::MetadataExt; - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn state_version_is_kept_and_can_be_altered() { const CACHE: &'static str = "state_version_is_kept_and_can_be_altered"; init_logger(); @@ -1246,7 +1151,7 @@ mod remote_tests { assert_eq!(cached_ext.state_version, other); } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn snapshot_block_hash_works() { const CACHE: &'static str = "snapshot_block_hash_works"; init_logger(); @@ -1273,7 +1178,7 @@ mod remote_tests { assert_eq!(ext.block_hash, cached_ext.block_hash); } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn offline_else_online_works() { const CACHE: &'static str = "offline_else_online_works_data"; init_logger(); @@ -1318,7 +1223,7 @@ mod remote_tests { std::fs::remove_file(to_delete[0].path()).unwrap(); } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn can_build_one_small_pallet() { init_logger(); Builder::::new() @@ -1333,7 +1238,7 @@ mod remote_tests { .execute_with(|| {}); } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn can_build_few_pallet() { init_logger(); Builder::::new() @@ -1373,7 +1278,7 @@ mod remote_tests { .collect::>(); let snap: Snapshot = Builder::::new().load_snapshot(CACHE.into()).unwrap(); - assert!(matches!(snap, Snapshot { top, child, .. } if top.len() > 0 && child.len() == 0)); + assert!(matches!(snap, Snapshot { raw_storage, .. } if raw_storage.len() > 0)); assert!(to_delete.len() == 1); let to_delete = to_delete.first().unwrap(); @@ -1381,7 +1286,7 @@ mod remote_tests { std::fs::remove_file(to_delete.path()).unwrap(); } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn can_create_child_snapshot() { const CACHE: &'static str = "can_create_child_snapshot"; init_logger(); @@ -1405,7 +1310,7 @@ mod remote_tests { .collect::>(); let snap: Snapshot = Builder::::new().load_snapshot(CACHE.into()).unwrap(); - assert!(matches!(snap, Snapshot { top, child, .. } if top.len() > 0 && child.len() > 0)); + assert!(matches!(snap, Snapshot { raw_storage, .. } if raw_storage.len() > 0)); assert!(to_delete.len() == 1); let to_delete = to_delete.first().unwrap(); @@ -1413,7 +1318,7 @@ mod remote_tests { std::fs::remove_file(to_delete.path()).unwrap(); } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn can_build_big_pallet() { if std::option_env!("TEST_WS").is_none() { return @@ -1432,7 +1337,7 @@ mod remote_tests { .execute_with(|| {}); } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn can_fetch_all() { if std::option_env!("TEST_WS").is_none() { return diff --git a/substrate/utils/frame/remote-externalities/test_data/proxy_test b/substrate/utils/frame/remote-externalities/test_data/proxy_test index 6673bd6765..f749531a8a 100644 Binary files a/substrate/utils/frame/remote-externalities/test_data/proxy_test and b/substrate/utils/frame/remote-externalities/test_data/proxy_test differ