diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 9019acfcbb..e4ff69448b 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -342,6 +342,19 @@ version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" +[[package]] +name = "async-tls" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f23d769dbf1838d5df5156e7b1ad404f4c463d1ac2c6aeb6cd943630f8a8400" +dependencies = [ + "futures-core", + "futures-io", + "rustls 0.19.0", + "webpki 0.21.4", + "webpki-roots", +] + [[package]] name = "async-trait" version = "0.1.48" @@ -2103,7 +2116,7 @@ checksum = "3a1387e07917c711fb4ee4f48ea0adb04a3c9739e53ef85bf43ae1edc2937a8b" dependencies = [ "futures-io", "rustls 0.19.0", - "webpki", + "webpki 0.21.4", ] [[package]] @@ -2579,7 +2592,7 @@ dependencies = [ "rustls-native-certs", "tokio 0.2.25", "tokio-rustls", - "webpki", + "webpki 0.21.4", ] [[package]] @@ -2910,25 +2923,6 @@ dependencies = [ "slab", ] -[[package]] -name = "jsonrpsee-http-client" -version = "0.2.0-alpha.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2737440f37efa10e5ef7beeec43d059d29dc92640978be21fcdcef481a2edb0d" -dependencies = [ - "async-trait", - "fnv", - "hyper 0.13.10", - "hyper-rustls", - "jsonrpsee-types", - "jsonrpsee-utils", - "log", - "serde", - "serde_json", - "thiserror", - "url 2.2.1", -] - [[package]] name = "jsonrpsee-proc-macros" version = "0.2.0-alpha.6" @@ -2959,14 +2953,25 @@ dependencies = [ ] [[package]] -name = "jsonrpsee-utils" +name = "jsonrpsee-ws-client" version = "0.2.0-alpha.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d63cf4d423614e71fd144a8691208539d2b23d8373e069e2fbe023c5eba5e922" +checksum = "d6fdb4390bd25358c62e8b778652a564a1723ba07dca0feb3da439c2253fe59f" dependencies = [ - "futures-util", - "hyper 0.13.10", + "async-std", + "async-tls", + "async-trait", + "fnv", + "futures 0.3.13", "jsonrpsee-types", + "log", + "pin-project 1.0.5", + "serde", + "serde_json", + "soketto", + "thiserror", + "url 2.2.1", + "webpki 0.22.0", ] [[package]] @@ -6660,14 +6665,14 @@ version = "0.9.0" dependencies = [ "env_logger 0.8.3", "hex-literal", - "jsonrpsee-http-client", "jsonrpsee-proc-macros", + "jsonrpsee-ws-client", "log", "parity-scale-codec", "sp-core", "sp-io", "sp-runtime", - "tokio 0.2.25", + "tokio 1.6.0", ] [[package]] @@ -6779,7 +6784,7 @@ dependencies = [ "log", "ring", "sct", - "webpki", + "webpki 0.21.4", ] [[package]] @@ -6792,7 +6797,7 @@ dependencies = [ "log", "ring", "sct", - "webpki", + "webpki 0.21.4", ] [[package]] @@ -9989,10 +9994,21 @@ dependencies = [ "pin-project-lite 0.1.12", "signal-hook-registry", "slab", - "tokio-macros", + "tokio-macros 0.2.6", "winapi 0.3.9", ] +[[package]] +name = "tokio" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd3076b5c8cc18138b8f8814895c11eb4de37114a5d127bafdc5e55798ceef37" +dependencies = [ + "autocfg", + "pin-project-lite 0.2.6", + "tokio-macros 1.2.0", +] + [[package]] name = "tokio-buf" version = "0.1.1" @@ -10068,6 +10084,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-macros" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-named-pipes" version = "0.1.0" @@ -10109,7 +10136,7 @@ dependencies = [ "futures-core", "rustls 0.18.1", "tokio 0.2.25", - "webpki", + "webpki 0.21.4", ] [[package]] @@ -11104,13 +11131,23 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "webpki-roots" version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82015b7e0b8bad8185994674a13a93306bea76cf5a16c5a181382fd3a5ec2376" dependencies = [ - "webpki", + "webpki 0.21.4", ] [[package]] diff --git a/substrate/utils/frame/remote-externalities/Cargo.toml b/substrate/utils/frame/remote-externalities/Cargo.toml index 7d372e8648..0d6336f60d 100644 --- a/substrate/utils/frame/remote-externalities/Cargo.toml +++ b/substrate/utils/frame/remote-externalities/Cargo.toml @@ -13,7 +13,7 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -jsonrpsee-http-client = { version = "=0.2.0-alpha.6", default-features = false, features = ["tokio02"] } +jsonrpsee-ws-client = { version = "=0.2.0-alpha.6", default-features = false } jsonrpsee-proc-macros = "=0.2.0-alpha.6" hex-literal = "0.3.1" @@ -26,7 +26,7 @@ sp-core = { version = "3.0.0", path = "../../../primitives/core" } sp-runtime = { version = "3.0.0", path = "../../../primitives/runtime" } [dev-dependencies] -tokio = { version = "0.2", features = ["macros"] } +tokio = { version = "1.6.0", features = ["macros", "rt"] } [features] remote-test = [] diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 3ec16ea198..077892baab 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -113,20 +113,26 @@ use sp_core::{ storage::{StorageKey, StorageData}, }; use codec::{Encode, Decode}; -use jsonrpsee_http_client::{HttpClient, HttpClientBuilder}; - use sp_runtime::traits::Block as BlockT; +use jsonrpsee_ws_client::{WsClientBuilder, WsClient}; type KeyPair = (StorageKey, StorageData); const LOG_TARGET: &str = "remote-ext"; -const TARGET: &str = "http://localhost:9933"; +const DEFAULT_TARGET: &str = "wss://rpc.polkadot.io"; jsonrpsee_proc_macros::rpc_client_api! { RpcApi { - #[rpc(method = "state_getPairs", positional_params)] - fn storage_pairs(prefix: StorageKey, hash: Option) -> Vec<(StorageKey, StorageData)>; - #[rpc(method = "chain_getFinalizedHead")] + #[rpc(method = "state_getStorage", positional_params)] + fn get_storage(prefix: StorageKey, hash: Option) -> StorageData; + #[rpc(method = "state_getKeysPaged", positional_params)] + fn get_keys_paged( + prefix: Option, + count: u32, + start_key: Option, + hash: Option, + ) -> Vec; + #[rpc(method = "chain_getFinalizedHead", positional_params)] fn finalized_head() -> B::Hash; } } @@ -140,6 +146,12 @@ pub enum Mode { Offline(OfflineConfig), } +impl Default for Mode { + fn default() -> Self { + Mode::Online(OnlineConfig::default()) + } +} + /// configuration of the online execution. /// /// A state snapshot config must be present. @@ -149,32 +161,55 @@ pub struct OfflineConfig { pub state_snapshot: SnapshotConfig, } +/// Description of the transport protocol. +#[derive(Debug)] +pub struct Transport { + uri: String, + client: Option, +} + +impl Clone for Transport { + fn clone(&self) -> Self { + Self { uri: self.uri.clone(), client: None } + } +} + +impl From for Transport { + fn from(t: String) -> Self { + Self { uri: t, client: None } + } +} + /// Configuration of the online execution. /// /// A state snapshot config may be present and will be written to in that case. #[derive(Clone)] pub struct OnlineConfig { - /// The HTTP uri to use. - pub uri: String, /// The block number at which to connect. Will be latest finalized head if not provided. pub at: Option, /// An optional state snapshot file to WRITE to, not for reading. Not written if set to `None`. pub state_snapshot: Option, /// The modules to scrape. If empty, entire chain state will be scraped. pub modules: Vec, + /// Transport config. + pub transport: Transport, } impl Default for OnlineConfig { fn default() -> Self { - Self { uri: TARGET.to_owned(), at: None, state_snapshot: None, modules: Default::default() } + Self { + transport: Transport { uri: DEFAULT_TARGET.to_string(), client: None }, + at: None, + state_snapshot: None, + modules: vec![], + } } } impl OnlineConfig { - /// Return a new http rpc client. - fn rpc(&self) -> HttpClient { - HttpClientBuilder::default().max_request_body_size(u32::MAX).build(&self.uri) - .expect("valid HTTP url; qed") + /// Return rpc (ws) client. + fn rpc_client(&self) -> &WsClient { + self.transport.client.as_ref().expect("ws client must have been initialized by now; qed.") } } @@ -199,16 +234,17 @@ impl Default for SnapshotConfig { /// Builder for remote-externalities. pub struct Builder { + /// Pallets to inject their prefix into the externalities. inject: Vec, + /// connectivity mode, online or offline. mode: Mode, } +// NOTE: ideally we would use `DefaultNoBound` here, but not worth bringing in frame-support for +// that. impl Default for Builder { fn default() -> Self { - Self { - inject: Default::default(), - mode: Mode::Online(OnlineConfig::default()), - } + Self { inject: Default::default(), mode: Default::default() } } } @@ -233,25 +269,92 @@ impl Builder { impl Builder { async fn rpc_get_head(&self) -> Result { trace!(target: LOG_TARGET, "rpc: finalized_head"); - RpcApi::::finalized_head(&self.as_online().rpc()).await.map_err(|e| { + RpcApi::::finalized_head(self.as_online().rpc_client()).await.map_err(|e| { error!("Error = {:?}", e); "rpc finalized_head failed." }) } - /// Relay the request to `state_getPairs` rpc endpoint. + /// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods. + async fn get_keys_paged( + &self, + prefix: StorageKey, + hash: B::Hash, + ) -> Result, &'static str> { + const PAGE: u32 = 512; + let mut last_key: Option = None; + let mut all_keys: Vec = vec![]; + let keys = loop { + let page = RpcApi::::get_keys_paged( + self.as_online().rpc_client(), + Some(prefix.clone()), + PAGE, + last_key.clone(), + Some(hash), + ) + .await + .map_err(|e| { + error!(target: LOG_TARGET, "Error = {:?}", e); + "rpc get_keys failed" + })?; + let page_len = page.len(); + all_keys.extend(page); + + if page_len < PAGE as usize { + debug!(target: LOG_TARGET, "last page received: {}", page_len); + break all_keys; + } else { + let new_last_key = + all_keys.last().expect("all_keys is populated; has .last(); qed"); + debug!( + target: LOG_TARGET, + "new total = {}, full page received: {:?}", + all_keys.len(), + HexDisplay::from(new_last_key) + ); + last_key = Some(new_last_key.clone()); + } + }; + + Ok(keys) + } + + /// Synonym of `rpc_get_pairs_unsafe` that uses paged queries to first get the keys, and then + /// map them to values one by one. /// - /// Note that this is an unsafe RPC. - async fn rpc_get_pairs( + /// This can work with public nodes. But, expect it to be darn slow. + pub(crate) async fn rpc_get_pairs_paged( &self, prefix: StorageKey, at: B::Hash, ) -> Result, &'static str> { - trace!(target: LOG_TARGET, "rpc: storage_pairs: {:?} / {:?}", prefix, at); - RpcApi::::storage_pairs(&self.as_online().rpc(), prefix, Some(at)).await.map_err(|e| { - error!("Error = {:?}", e); - "rpc storage_pairs failed" - }) + let keys = self.get_keys_paged(prefix, at).await?; + let keys_count = keys.len(); + info!(target: LOG_TARGET, "Querying a total of {} keys", keys.len()); + + let mut key_values: Vec = vec![]; + for key in keys { + let value = + RpcApi::::get_storage(self.as_online().rpc_client(), key.clone(), Some(at)) + .await + .map_err(|e| { + error!(target: LOG_TARGET, "Error = {:?}", e); + "rpc get_storage failed" + })?; + key_values.push((key, value)); + if key_values.len() % 1000 == 0 { + let ratio: f64 = key_values.len() as f64 / keys_count as f64; + debug!( + target: LOG_TARGET, + "progress = {:.2} [{} / {}]", + ratio, + key_values.len(), + keys_count, + ); + } + } + + Ok(key_values) } } @@ -279,13 +382,13 @@ impl Builder { .at .expect("online config must be initialized by this point; qed.") .clone(); - info!(target: LOG_TARGET, "scraping keypairs from remote node {} @ {:?}", config.uri, at); + info!(target: LOG_TARGET, "scraping keypairs from remote @ {:?}", at); let keys_and_values = if config.modules.len() > 0 { let mut filtered_kv = vec![]; for f in config.modules.iter() { let hashed_prefix = StorageKey(twox_128(f.as_bytes()).to_vec()); - let module_kv = self.rpc_get_pairs(hashed_prefix.clone(), at).await?; + let module_kv = self.rpc_get_pairs_paged(hashed_prefix.clone(), at).await?; info!( target: LOG_TARGET, "downloaded data for module {} (count: {} / prefix: {:?}).", @@ -298,22 +401,34 @@ impl Builder { filtered_kv } else { info!(target: LOG_TARGET, "downloading data for all modules."); - self.rpc_get_pairs(StorageKey(vec![]), at).await?.into_iter().collect::>() + self.rpc_get_pairs_paged(StorageKey(vec![]), at).await? }; Ok(keys_and_values) } - async fn init_remote_client(&mut self) -> Result<(), &'static str> { - info!(target: LOG_TARGET, "initializing remote client to {:?}", self.as_online().uri); + pub(crate) async fn init_remote_client(&mut self) -> Result<(), &'static str> { + let mut online = self.as_online_mut(); + info!(target: LOG_TARGET, "initializing remote client to {:?}", online.transport.uri); + + // First, initialize the ws client. + let ws_client = WsClientBuilder::default() + .max_request_body_size(u32::MAX) + .build(&online.transport.uri) + .await + .map_err(|_| "failed to build ws client")?; + online.transport.client = Some(ws_client); + + // Then, if `at` is not set, set it. if self.as_online().at.is_none() { let at = self.rpc_get_head().await?; self.as_online_mut().at = Some(at); } + Ok(()) } - async fn pre_build(mut self) -> Result, &'static str> { + pub(crate) async fn pre_build(mut self) -> Result, &'static str> { let mut base_kv = match self.mode.clone() { Mode::Offline(config) => self.load_state_snapshot(&config.state_snapshot.path)?, Mode::Online(config) => { @@ -380,8 +495,9 @@ mod test_prelude { pub(crate) fn init_logger() { let _ = env_logger::Builder::from_default_env() - .format_module_path(false) + .format_module_path(true) .format_level(true) + .filter_module(LOG_TARGET, log::LevelFilter::Debug) .try_init(); } } @@ -395,7 +511,7 @@ mod tests { init_logger(); Builder::::new() .mode(Mode::Offline(OfflineConfig { - state_snapshot: SnapshotConfig { path: "test_data/proxy_test".into() }, + state_snapshot: SnapshotConfig::new("test_data/proxy_test"), })) .build() .await @@ -413,7 +529,7 @@ mod remote_tests { init_logger(); Builder::::new() .mode(Mode::Online(OnlineConfig { - modules: vec!["Proxy".into()], + modules: vec!["Proxy".to_owned()], ..Default::default() })) .build() @@ -427,19 +543,16 @@ mod remote_tests { init_logger(); Builder::::new() .mode(Mode::Online(OnlineConfig { - state_snapshot: Some(SnapshotConfig { - name: "test_snapshot_to_remove.bin".into(), - ..Default::default() - }), + state_snapshot: Some(SnapshotConfig::new("test_snapshot_to_remove.bin")), + modules: vec!["Balances".to_owned()], ..Default::default() })) .build() .await .expect("Can't reach the remote node. Is it running?") - .unwrap() .execute_with(|| {}); - let to_delete = std::fs::read_dir(SnapshotConfig::default().directory) + let to_delete = std::fs::read_dir(SnapshotConfig::default().path) .unwrap() .into_iter() .map(|d| d.unwrap()) @@ -454,7 +567,7 @@ mod remote_tests { } #[tokio::test] - async fn can_build_all() { + async fn can_fetch_all() { init_logger(); Builder::::new() .build() diff --git a/substrate/utils/frame/try-runtime/cli/src/lib.rs b/substrate/utils/frame/try-runtime/cli/src/lib.rs index 9e41a3fd87..c4adab3ce8 100644 --- a/substrate/utils/frame/try-runtime/cli/src/lib.rs +++ b/substrate/utils/frame/try-runtime/cli/src/lib.rs @@ -86,7 +86,7 @@ pub enum State { modules: Option>, /// The url to connect to. - #[structopt(default_value = "http://localhost:9933", parse(try_from_str = parse_url))] + #[structopt(default_value = "ws://localhost:9944", parse(try_from_str = parse_url))] url: String, }, } @@ -109,11 +109,11 @@ fn parse_hash(block_number: &str) -> Result { } fn parse_url(s: &str) -> Result { - if s.starts_with("http://") { + if s.starts_with("ws://") || s.starts_with("wss://") { // could use Url crate as well, but lets keep it simple for now. Ok(s.to_string()) } else { - Err("not a valid HTTP url: must start with 'http://'") + Err("not a valid WS(S) url: must start with 'ws://' or 'wss://'") } } @@ -166,9 +166,9 @@ impl TryRuntimeCmd { block_at, modules } => Builder::::new().mode(Mode::Online(OnlineConfig { - uri: url.into(), + transport: url.to_owned().into(), state_snapshot: snapshot_path.as_ref().map(SnapshotConfig::new), - modules: modules.clone().unwrap_or_default(), + modules: modules.to_owned().unwrap_or_default(), at: block_at.as_ref() .map(|b| b.parse().map_err(|e| format!("Could not parse hash: {:?}", e))).transpose()?, ..Default::default()