diff --git a/substrate/bin/node/cli/tests/common.rs b/substrate/bin/node/cli/tests/common.rs index 9c739c2cf2..3b83f43396 100644 --- a/substrate/bin/node/cli/tests/common.rs +++ b/substrate/bin/node/cli/tests/common.rs @@ -24,7 +24,7 @@ use nix::{ unistd::Pid, }; use node_primitives::Block; -use remote_externalities::rpc_api; +use remote_externalities::rpc_api::RpcService; use std::{ io::{BufRead, BufReader, Read}, ops::{Deref, DerefMut}, @@ -71,9 +71,10 @@ pub async fn wait_n_finalized_blocks( pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) { let mut built_blocks = std::collections::HashSet::new(); let mut interval = tokio::time::interval(Duration::from_secs(2)); + let rpc_service = RpcService::new(url, false).await.unwrap(); loop { - if let Ok(block) = rpc_api::get_finalized_head::(url.to_string()).await { + if let Ok(block) = rpc_service.get_finalized_head::().await { built_blocks.insert(block); if built_blocks.len() > n { break diff --git a/substrate/utils/frame/remote-externalities/src/rpc_api.rs b/substrate/utils/frame/remote-externalities/src/rpc_api.rs index 37555de480..3ea30a3022 100644 --- a/substrate/utils/frame/remote-externalities/src/rpc_api.rs +++ b/substrate/utils/frame/remote-externalities/src/rpc_api.rs @@ -19,82 +19,131 @@ // TODO: Consolidate one off RPC calls https://github.com/paritytech/substrate/issues/8988 use jsonrpsee::{ - core::client::ClientT, + core::client::{Client, ClientT}, rpc_params, + types::ParamsSer, ws_client::{WsClient, WsClientBuilder}, }; -use sp_runtime::{ - generic::SignedBlock, - traits::{Block as BlockT, Header as HeaderT}, -}; +use serde::de::DeserializeOwned; +use sp_runtime::{generic::SignedBlock, traits::Block as BlockT}; +use std::sync::Arc; -/// Get the header of the block identified by `at` -pub async fn get_header(from: S, at: Block::Hash) -> Result -where - Block: BlockT, - Block::Header: serde::de::DeserializeOwned, - S: AsRef, -{ - let client = build_client(from).await?; +enum RpcCall { + GetHeader, + GetFinalizedHead, + GetBlock, + GetRuntimeVersion, +} +impl RpcCall { + fn as_str(&self) -> &'static str { + match self { + RpcCall::GetHeader => "chain_getHeader", + RpcCall::GetFinalizedHead => "chain_getFinalizedHead", + RpcCall::GetBlock => "chain_getBlock", + RpcCall::GetRuntimeVersion => "state_getRuntimeVersion", + } + } +} + +/// General purpose method for making RPC calls. +async fn make_request<'a, T: DeserializeOwned>( + client: &Arc, + call: RpcCall, + params: Option>, +) -> Result { client - .request::("chain_getHeader", rpc_params!(at)) + .request::(call.as_str(), params) .await - .map_err(|e| format!("chain_getHeader request failed: {:?}", e)) + .map_err(|e| format!("{} request failed: {:?}", call.as_str(), e)) } -/// Get the finalized head -pub async fn get_finalized_head(from: S) -> Result -where - Block: BlockT, - S: AsRef, -{ - let client = build_client(from).await?; - - client - .request::("chain_getFinalizedHead", None) - .await - .map_err(|e| format!("chain_getFinalizedHead request failed: {:?}", e)) +enum ConnectionPolicy { + Reuse(Arc), + Reconnect, } -/// Get the signed block identified by `at`. -pub async fn get_block(from: S, at: Block::Hash) -> Result -where - S: AsRef, - Block: BlockT + serde::de::DeserializeOwned, - Block::Header: HeaderT, -{ - let client = build_client(from).await?; - let signed_block = client - .request::>("chain_getBlock", rpc_params!(at)) - .await - .map_err(|e| format!("chain_getBlock request failed: {:?}", e))?; - - Ok(signed_block.block) +/// Simple RPC service that is capable of keeping the connection. +/// +/// Service will connect to `uri` for the first time already during initialization. +/// +/// Be careful with reusing the connection in a multithreaded environment. +pub struct RpcService { + uri: String, + policy: ConnectionPolicy, } -/// Build a websocket client that connects to `from`. -async fn build_client>(from: S) -> Result { - WsClientBuilder::default() - .max_request_body_size(u32::MAX) - .build(from.as_ref()) - .await - .map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e)) -} +impl RpcService { + /// Creates a new RPC service. If `keep_connection`, then connects to `uri` right away. + pub async fn new>(uri: S, keep_connection: bool) -> Result { + let policy = if keep_connection { + ConnectionPolicy::Reuse(Arc::new(Self::build_client(uri.as_ref()).await?)) + } else { + ConnectionPolicy::Reconnect + }; + Ok(Self { uri: uri.as_ref().to_string(), policy }) + } -/// Get the runtime version of a given chain. -pub async fn get_runtime_version( - from: S, - at: Option, -) -> Result -where - S: AsRef, - Block: BlockT + serde::de::DeserializeOwned, - Block::Header: HeaderT, -{ - let client = build_client(from).await?; - client - .request::("state_getRuntimeVersion", rpc_params!(at)) - .await - .map_err(|e| format!("state_getRuntimeVersion request failed: {:?}", e)) + /// Returns the address at which requests are sent. + pub fn uri(&self) -> String { + self.uri.clone() + } + + /// Build a websocket client that connects to `self.uri`. + async fn build_client>(uri: S) -> Result { + WsClientBuilder::default() + .max_request_body_size(u32::MAX) + .build(uri) + .await + .map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e)) + } + + /// Generic method for making RPC requests. + async fn make_request<'a, T: DeserializeOwned>( + &self, + call: RpcCall, + params: Option>, + ) -> Result { + match self.policy { + // `self.keep_connection` must have been `true`. + ConnectionPolicy::Reuse(ref client) => make_request(client, call, params).await, + ConnectionPolicy::Reconnect => { + let client = Arc::new(Self::build_client(&self.uri).await?); + make_request(&client, call, params).await + }, + } + } + + /// Get the header of the block identified by `at`. + pub async fn get_header(&self, at: Block::Hash) -> Result + where + Block: BlockT, + Block::Header: DeserializeOwned, + { + self.make_request(RpcCall::GetHeader, rpc_params!(at)).await + } + + /// Get the finalized head. + pub async fn get_finalized_head(&self) -> Result { + self.make_request(RpcCall::GetFinalizedHead, None).await + } + + /// Get the signed block identified by `at`. + pub async fn get_block( + &self, + at: Block::Hash, + ) -> Result { + Ok(self + .make_request::>(RpcCall::GetBlock, rpc_params!(at)) + .await? + .block) + } + + /// Get the runtime version of a given chain. + pub async fn get_runtime_version( + &self, + at: Option, + ) -> Result { + self.make_request(RpcCall::GetRuntimeVersion, rpc_params!(at)).await + } } diff --git a/substrate/utils/frame/try-runtime/cli/src/commands/execute_block.rs b/substrate/utils/frame/try-runtime/cli/src/commands/execute_block.rs index 6a3ef24ff3..70ba3615f8 100644 --- a/substrate/utils/frame/try-runtime/cli/src/commands/execute_block.rs +++ b/substrate/utils/frame/try-runtime/cli/src/commands/execute_block.rs @@ -89,6 +89,8 @@ impl ExecuteBlockCmd { Block::Hash: FromStr, ::Err: Debug, { + let rpc_service = rpc_api::RpcService::new(ws_uri, false).await?; + match (&self.block_at, &self.state) { (Some(block_at), State::Snap { .. }) => hash_of::(block_at), (Some(block_at), State::Live { .. }) => { @@ -100,9 +102,7 @@ impl ExecuteBlockCmd { target: LOG_TARGET, "No --block-at or --at provided, using the latest finalized block instead" ); - remote_externalities::rpc_api::get_finalized_head::(ws_uri) - .await - .map_err(Into::into) + rpc_service.get_finalized_head::().await.map_err(Into::into) }, (None, State::Live { at: Some(at), .. }) => hash_of::(at), _ => { @@ -148,7 +148,8 @@ where let block_ws_uri = command.block_ws_uri::(); let block_at = command.block_at::(block_ws_uri.clone()).await?; - let block: Block = rpc_api::get_block::(block_ws_uri.clone(), block_at).await?; + let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false).await?; + let block: Block = rpc_service.get_block::(block_at).await?; let parent_hash = block.header().parent_hash(); log::info!( target: LOG_TARGET, diff --git a/substrate/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/substrate/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 9f598694d0..f493d5c10c 100644 --- a/substrate/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/substrate/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -27,13 +27,13 @@ use jsonrpsee::{ ws_client::WsClientBuilder, }; use parity_scale_codec::{Decode, Encode}; -use remote_externalities::{rpc_api, Builder, Mode, OnlineConfig}; +use remote_externalities::{rpc_api::RpcService, Builder, Mode, OnlineConfig}; use sc_executor::NativeExecutionDispatch; use sc_service::Configuration; use serde::de::DeserializeOwned; use sp_core::H256; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, str::FromStr}; +use std::{collections::VecDeque, fmt::Debug, str::FromStr}; const SUB: &str = "chain_subscribeFinalizedHeads"; const UN_SUB: &str = "chain_unsubscribeFinalizedHeads"; @@ -60,6 +60,10 @@ pub struct FollowChainCmd { /// round-robin fashion. #[clap(long, default_value = "none")] try_state: frame_try_runtime::TryStateSelect, + + /// If present, a single connection to a node will be kept and reused for fetching blocks. + #[clap(long)] + keep_connection: bool, } /// Start listening for with `SUB` at `url`. @@ -93,21 +97,16 @@ where Block::Header: HeaderT, { /// Awaits for the header of the block with hash `hash`. - async fn get_header(&mut self, hash: Block::Hash) -> Block::Header; -} - -struct RpcHeaderProvider { - uri: String, - _phantom: PhantomData, + async fn get_header(&self, hash: Block::Hash) -> Block::Header; } #[async_trait] -impl HeaderProvider for RpcHeaderProvider +impl HeaderProvider for RpcService where Block::Header: DeserializeOwned, { - async fn get_header(&mut self, hash: Block::Hash) -> Block::Header { - rpc_api::get_header::(&self.uri, hash).await.unwrap() + async fn get_header(&self, hash: Block::Hash) -> Block::Header { + self.get_header::(hash).await.unwrap() } } @@ -148,19 +147,20 @@ where /// /// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of /// them lack justification). -struct FinalizedHeaders, HS: HeaderSubscription> { - header_provider: HP, +struct FinalizedHeaders<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> +{ + header_provider: &'a HP, subscription: HS, fetched_headers: VecDeque, last_returned: Option<::Hash>, } -impl, HS: HeaderSubscription> - FinalizedHeaders +impl<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> + FinalizedHeaders<'a, Block, HP, HS> where ::Header: DeserializeOwned, { - pub fn new(header_provider: HP, subscription: HS) -> Self { + pub fn new(header_provider: &'a HP, subscription: HS) -> Self { Self { header_provider, subscription, @@ -229,19 +229,16 @@ where let executor = build_executor::(&shared, &config); let execution = shared.execution; - let header_provider: RpcHeaderProvider = - RpcHeaderProvider { uri: command.uri.clone(), _phantom: PhantomData {} }; - let mut finalized_headers: FinalizedHeaders< - Block, - RpcHeaderProvider, - Subscription, - > = FinalizedHeaders::new(header_provider, subscription); + let rpc_service = RpcService::new(&command.uri, command.keep_connection).await?; + + let mut finalized_headers: FinalizedHeaders> = + FinalizedHeaders::new(&rpc_service, subscription); while let Some(header) = finalized_headers.next().await { let hash = header.hash(); let number = header.number(); - let block = rpc_api::get_block::(&command.uri, hash).await.unwrap(); + let block = rpc_service.get_block::(hash).await.unwrap(); log::debug!( target: LOG_TARGET, @@ -333,12 +330,14 @@ where mod tests { use super::*; use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header}; + use std::sync::Arc; + use tokio::sync::Mutex; type Block = TBlock>; type BlockNumber = u64; type Hash = H256; - struct MockHeaderProvider(pub VecDeque); + struct MockHeaderProvider(pub Arc>>); fn headers() -> Vec
{ let mut headers = vec![Header::new_from_number(0)]; @@ -353,8 +352,8 @@ mod tests { #[async_trait] impl HeaderProvider for MockHeaderProvider { - async fn get_header(&mut self, _hash: Hash) -> Header { - let height = self.0.pop_front().unwrap(); + async fn get_header(&self, _hash: Hash) -> Header { + let height = self.0.lock().await.pop_front().unwrap(); headers()[height as usize].clone() } } @@ -372,9 +371,9 @@ mod tests { async fn finalized_headers_works_when_every_block_comes_from_subscription() { let heights = vec![4, 5, 6, 7]; - let provider = MockHeaderProvider(vec![].into()); + let provider = MockHeaderProvider(Default::default()); let subscription = MockHeaderSubscription(heights.clone().into()); - let mut headers = FinalizedHeaders::new(provider, subscription); + let mut headers = FinalizedHeaders::new(&provider, subscription); for h in heights { assert_eq!(h, headers.next().await.unwrap().number); @@ -389,9 +388,9 @@ mod tests { // Consecutive headers will be requested in the reversed order. let heights_not_in_subscription = vec![5, 9, 8, 7]; - let provider = MockHeaderProvider(heights_not_in_subscription.into()); + let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into()))); let subscription = MockHeaderSubscription(heights_in_subscription.into()); - let mut headers = FinalizedHeaders::new(provider, subscription); + let mut headers = FinalizedHeaders::new(&provider, subscription); for h in all_heights { assert_eq!(h, headers.next().await.unwrap().number); diff --git a/substrate/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs b/substrate/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs index 11ceb0a81c..a579692abd 100644 --- a/substrate/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs +++ b/substrate/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs @@ -119,7 +119,8 @@ where let header_at = command.header_at::()?; let header_ws_uri = command.header_ws_uri::(); - let header = rpc_api::get_header::(header_ws_uri.clone(), header_at).await?; + let rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false).await?; + let header = rpc_service.get_header::(header_at).await?; log::info!( target: LOG_TARGET, "fetched header from {:?}, block number: {:?}", diff --git a/substrate/utils/frame/try-runtime/cli/src/lib.rs b/substrate/utils/frame/try-runtime/cli/src/lib.rs index 76679c43f7..c71496e0b8 100644 --- a/substrate/utils/frame/try-runtime/cli/src/lib.rs +++ b/substrate/utils/frame/try-runtime/cli/src/lib.rs @@ -267,7 +267,8 @@ use parity_scale_codec::Decode; use remote_externalities::{ - Builder, Mode, OfflineConfig, OnlineConfig, SnapshotConfig, TestExternalities, + rpc_api::RpcService, Builder, Mode, OfflineConfig, OnlineConfig, SnapshotConfig, + TestExternalities, }; use sc_chain_spec::ChainSpec; use sc_cli::{ @@ -541,8 +542,8 @@ impl State { impl TryRuntimeCmd { pub async fn run(&self, config: Configuration) -> sc_cli::Result<()> where - Block: BlockT + serde::de::DeserializeOwned, - Block::Header: serde::de::DeserializeOwned, + Block: BlockT + DeserializeOwned, + Block::Header: DeserializeOwned, Block::Hash: FromStr, ::Err: Debug, NumberFor: FromStr, @@ -626,13 +627,15 @@ where /// /// If the spec names don't match, if `relaxed`, then it emits a warning, else it panics. /// If the spec versions don't match, it only ever emits a warning. -pub(crate) async fn ensure_matching_spec( +pub(crate) async fn ensure_matching_spec( uri: String, expected_spec_name: String, expected_spec_version: u32, relaxed: bool, ) { - match remote_externalities::rpc_api::get_runtime_version::(uri.clone(), None) + let rpc_service = RpcService::new(uri.clone(), false).await.unwrap(); + match rpc_service + .get_runtime_version::(None) .await .map(|version| (String::from(version.spec_name.clone()), version.spec_version)) .map(|(spec_name, spec_version)| (spec_name.to_lowercase(), spec_version))