frame/utils: introduce substrate-rpc-client crate for RPC utils (#12212)

* hack together a PoC

* Update utils/frame/rpc-utils/Cargo.toml

Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com>

* Update utils/frame/rpc-utils/src/lib.rs

Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com>

* rpc_utils -> substrate_rpc_client

* try runtime: remove keep connection

* make CI happy

* cargo fmt

* fix ci

* update lock file

* fix

* fix

Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com>
Co-authored-by: kianenigma <kian@parity.io>
This commit is contained in:
Niklas Adolfsson
2022-10-18 18:39:19 +02:00
committed by GitHub
parent 096553cb59
commit e3b269ab0f
19 changed files with 437 additions and 497 deletions
+17 -3
View File
@@ -4503,7 +4503,6 @@ dependencies = [
"platforms",
"rand 0.8.5",
"regex",
"remote-externalities",
"sc-authority-discovery",
"sc-basic-authorship",
"sc-block-builder",
@@ -4551,6 +4550,7 @@ dependencies = [
"sp-trie",
"substrate-build-script-utils",
"substrate-frame-cli",
"substrate-rpc-client",
"tempfile",
"tokio",
"try-runtime-cli",
@@ -7278,7 +7278,6 @@ version = "0.10.0-dev"
dependencies = [
"env_logger",
"frame-support",
"jsonrpsee",
"log",
"pallet-elections-phragmen",
"parity-scale-codec",
@@ -7288,6 +7287,7 @@ dependencies = [
"sp-io",
"sp-runtime",
"sp-version",
"substrate-rpc-client",
"tokio",
]
@@ -10349,6 +10349,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "substrate-rpc-client"
version = "0.10.0-dev"
dependencies = [
"async-trait",
"jsonrpsee",
"log",
"sc-rpc-api",
"serde",
"sp-core",
"sp-runtime",
"tokio",
]
[[package]]
name = "substrate-state-trie-migration-rpc"
version = "4.0.0-dev"
@@ -11028,7 +11042,6 @@ version = "0.10.0-dev"
dependencies = [
"clap 4.0.11",
"frame-try-runtime",
"jsonrpsee",
"log",
"parity-scale-codec",
"remote-externalities",
@@ -11045,6 +11058,7 @@ dependencies = [
"sp-state-machine",
"sp-version",
"sp-weights",
"substrate-rpc-client",
"tokio",
"zstd",
]
+1
View File
@@ -232,6 +232,7 @@ members = [
"utils/frame/rpc/system",
"utils/frame/generate-bags",
"utils/frame/generate-bags/node-runtime",
"utils/frame/rpc/client",
"utils/prometheus",
"utils/wasm-builder",
]
+1 -1
View File
@@ -132,7 +132,7 @@ soketto = "0.7.1"
criterion = { version = "0.3.5", features = ["async_tokio"] }
tokio = { version = "1.17.0", features = ["macros", "time", "parking_lot"] }
wait-timeout = "0.2"
remote-externalities = { path = "../../../utils/frame/remote-externalities" }
substrate-rpc-client = { path = "../../../utils/frame/rpc/client" }
pallet-timestamp = { version = "4.0.0-dev", path = "../../../frame/timestamp" }
[build-dependencies]
+5 -4
View File
@@ -23,8 +23,7 @@ use nix::{
sys::signal::{kill, Signal::SIGINT},
unistd::Pid,
};
use node_primitives::Block;
use remote_externalities::rpc_api::RpcService;
use node_primitives::{Hash, Header};
use std::{
io::{BufRead, BufReader, Read},
ops::{Deref, DerefMut},
@@ -69,12 +68,14 @@ pub async fn wait_n_finalized_blocks(
/// Wait for at least n blocks to be finalized from a specified node
pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) {
use substrate_rpc_client::{ws_client, ChainApi};
let mut built_blocks = std::collections::HashSet::new();
let mut interval = tokio::time::interval(Duration::from_secs(2));
let rpc_service = RpcService::new(url, false).await.unwrap();
let rpc = ws_client(url).await.unwrap();
loop {
if let Ok(block) = rpc_service.get_finalized_head::<Block>().await {
if let Ok(block) = ChainApi::<(), Hash, Header, ()>::finalized_head(&rpc).await {
built_blocks.insert(block);
if built_blocks.len() > n {
break
@@ -24,14 +24,15 @@ use sp_runtime::{traits::Block as BlockT, DeserializeOwned};
/// Test voter bags migration. `currency_unit` is the number of planks per the the runtimes `UNITS`
/// (i.e. number of decimal places per DOT, KSM etc)
pub async fn execute<
Runtime: RuntimeT<pallet_bags_list::Instance1>,
Block: BlockT + DeserializeOwned,
>(
pub async fn execute<Runtime, Block>(
currency_unit: u64,
currency_name: &'static str,
ws_url: String,
) {
) where
Runtime: RuntimeT<pallet_bags_list::Instance1>,
Block: BlockT,
Block::Header: DeserializeOwned,
{
let mut ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: ws_url.to_string().into(),
@@ -22,14 +22,12 @@ use remote_externalities::{Builder, Mode, OnlineConfig};
use sp_runtime::{traits::Block as BlockT, DeserializeOwned};
/// Execute create a snapshot from pallet-staking.
pub async fn execute<
pub async fn execute<Runtime, Block>(voter_limit: Option<usize>, currency_unit: u64, ws_url: String)
where
Runtime: crate::RuntimeT<pallet_bags_list::Instance1>,
Block: BlockT + DeserializeOwned,
>(
voter_limit: Option<usize>,
currency_unit: u64,
ws_url: String,
) {
Block: BlockT,
Block::Header: DeserializeOwned,
{
use frame_support::storage::generator::StorageMap;
let mut ext = Builder::<Block>::new()
@@ -25,14 +25,15 @@ use remote_externalities::{Builder, Mode, OnlineConfig};
use sp_runtime::{traits::Block as BlockT, DeserializeOwned};
/// Execute the sanity check of the bags-list.
pub async fn execute<
Runtime: crate::RuntimeT<pallet_bags_list::Instance1>,
Block: BlockT + DeserializeOwned,
>(
pub async fn execute<Runtime, Block>(
currency_unit: u64,
currency_name: &'static str,
ws_url: String,
) {
) where
Runtime: crate::RuntimeT<pallet_bags_list::Instance1>,
Block: BlockT,
Block::Header: DeserializeOwned,
{
let mut ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: ws_url.to_string().into(),
@@ -1640,13 +1640,12 @@ pub(crate) mod remote_tests {
///
/// This will print some very useful statistics, make sure [`crate::LOG_TARGET`] is enabled.
#[allow(dead_code)]
pub(crate) async fn run_with_limits<
pub(crate) async fn run_with_limits<Runtime, Block>(limits: MigrationLimits, mode: Mode<Block>)
where
Runtime: crate::Config<Hash = H256>,
Block: BlockT<Hash = H256> + serde::de::DeserializeOwned,
>(
limits: MigrationLimits,
mode: Mode<Block>,
) {
Block: BlockT<Hash = H256>,
Block::Header: serde::de::DeserializeOwned,
{
let mut ext = remote_externalities::Builder::<Block>::new()
.mode(mode)
.state_version(sp_core::storage::StateVersion::V0)
@@ -6,7 +6,7 @@ edition = "2021"
license = "Apache-2.0"
homepage = "https://substrate.io"
repository = "https://github.com/paritytech/substrate/"
description = "An externalities provided environemnt that can load itself from remote nodes or cache files"
description = "An externalities provided environment that can load itself from remote nodes or cached files"
readme = "README.md"
[package.metadata.docs.rs]
@@ -15,7 +15,6 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "3.0.0" }
env_logger = "0.9"
jsonrpsee = { version = "0.15.1", features = ["ws-client", "macros"] }
log = "0.4.17"
serde = "1.0.136"
serde_json = "1.0"
@@ -24,6 +23,7 @@ sp-core = { version = "6.0.0", path = "../../../primitives/core" }
sp-io = { version = "6.0.0", path = "../../../primitives/io" }
sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" }
sp-version = { version = "5.0.0", path = "../../../primitives/version" }
substrate-rpc-client = { path = "../rpc/client" }
[dev-dependencies]
tokio = { version = "1.17.0", features = ["macros", "rt-multi-thread"] }
@@ -22,13 +22,6 @@
use codec::{Decode, Encode};
use jsonrpsee::{
core::{client::ClientT, Error as RpcError},
proc_macros::rpc,
rpc_params,
ws_client::{WsClient, WsClientBuilder},
};
use log::*;
use serde::de::DeserializeOwned;
use sp_core::{
@@ -46,8 +39,7 @@ use std::{
path::{Path, PathBuf},
sync::Arc,
};
pub mod rpc_api;
use substrate_rpc_client::{rpc_params, ws_client, ChainApi, ClientT, StateApi, WsClient};
type KeyValue = (StorageKey, StorageData);
type TopKeyValues = Vec<KeyValue>;
@@ -58,40 +50,6 @@ const DEFAULT_TARGET: &str = "wss://rpc.polkadot.io:443";
const BATCH_SIZE: usize = 1000;
const PAGE: u32 = 1000;
#[rpc(client)]
pub trait RpcApi<Hash> {
#[method(name = "childstate_getKeys")]
fn child_get_keys(
&self,
child_key: PrefixedStorageKey,
prefix: StorageKey,
hash: Option<Hash>,
) -> Result<Vec<StorageKey>, RpcError>;
#[method(name = "childstate_getStorage")]
fn child_get_storage(
&self,
child_key: PrefixedStorageKey,
prefix: StorageKey,
hash: Option<Hash>,
) -> Result<StorageData, RpcError>;
#[method(name = "state_getStorage")]
fn get_storage(&self, prefix: StorageKey, hash: Option<Hash>) -> Result<StorageData, RpcError>;
#[method(name = "state_getKeysPaged")]
fn get_keys_paged(
&self,
prefix: Option<StorageKey>,
count: u32,
start_key: Option<StorageKey>,
hash: Option<Hash>,
) -> Result<Vec<StorageKey>, RpcError>;
#[method(name = "chain_getFinalizedHead")]
fn finalized_head(&self) -> Result<Hash, RpcError>;
}
/// The execution mode.
#[derive(Clone)]
pub enum Mode<B: BlockT> {
@@ -140,14 +98,10 @@ impl Transport {
if let Self::Uri(uri) = self {
log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri);
let ws_client = WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.build(&uri)
.await
.map_err(|e| {
log::error!(target: LOG_TARGET, "error: {:?}", e);
"failed to build ws client"
})?;
let ws_client = ws_client(uri).await.map_err(|e| {
log::error!(target: LOG_TARGET, "error: {:?}", e);
"failed to build ws client"
})?;
*self = Self::RemoteClient(Arc::new(ws_client))
}
@@ -258,7 +212,7 @@ pub struct Builder<B: BlockT> {
// NOTE: ideally we would use `DefaultNoBound` here, but not worth bringing in frame-support for
// that.
impl<B: BlockT + DeserializeOwned> Default for Builder<B> {
impl<B: BlockT> Default for Builder<B> {
fn default() -> Self {
Self {
mode: Default::default(),
@@ -272,7 +226,7 @@ impl<B: BlockT + DeserializeOwned> Default for Builder<B> {
}
// Mode methods
impl<B: BlockT + DeserializeOwned> Builder<B> {
impl<B: BlockT> Builder<B> {
fn as_online(&self) -> &OnlineConfig<B> {
match &self.mode {
Mode::Online(config) => config,
@@ -291,26 +245,38 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
}
// RPC methods
impl<B: BlockT + DeserializeOwned> Builder<B> {
impl<B: BlockT> Builder<B>
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
async fn rpc_get_storage(
&self,
key: StorageKey,
maybe_at: Option<B::Hash>,
) -> Result<StorageData, &'static str> {
trace!(target: LOG_TARGET, "rpc: get_storage");
self.as_online().rpc_client().get_storage(key, maybe_at).await.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_storage failed."
})
match self.as_online().rpc_client().storage(key, maybe_at).await {
Ok(Some(res)) => Ok(res),
Ok(None) => Err("get_storage not found"),
Err(e) => {
error!(target: LOG_TARGET, "Error = {:?}", e);
Err("rpc get_storage failed.")
},
}
}
/// Get the latest finalized head.
async fn rpc_get_head(&self) -> Result<B::Hash, &'static str> {
trace!(target: LOG_TARGET, "rpc: finalized_head");
self.as_online().rpc_client().finalized_head().await.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc finalized_head failed."
})
// sadly this pretty much unreadable...
ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc finalized_head failed."
})
}
/// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods.
@@ -325,7 +291,7 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
let page = self
.as_online()
.rpc_client()
.get_keys_paged(Some(prefix.clone()), PAGE, last_key.clone(), Some(at))
.storage_keys_paged(Some(prefix.clone()), PAGE, last_key.clone(), Some(at))
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
@@ -471,19 +437,19 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
child_prefix: StorageKey,
at: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
let child_keys = self
.as_online()
.rpc_client()
.child_get_keys(
PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
child_prefix,
Some(at),
)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc child_get_keys failed."
})?;
// This is deprecated and will generate a warning which causes the CI to fail.
#[allow(warnings)]
let child_keys = substrate_rpc_client::ChildStateApi::storage_keys(
self.as_online().rpc_client(),
PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
child_prefix,
Some(at),
)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc child_get_keys failed."
})?;
debug!(
target: LOG_TARGET,
@@ -497,7 +463,11 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
}
// Internal methods
impl<B: BlockT + DeserializeOwned> Builder<B> {
impl<B: BlockT> Builder<B>
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
/// Save the given data to the top keys snapshot.
fn save_top_snapshot(&self, data: &[KeyValue], path: &PathBuf) -> Result<(), &'static str> {
let mut path = path.clone();
@@ -726,12 +696,13 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
let child_kv = match self.mode.clone() {
Mode::Online(_) => self.load_child_remote_and_maybe_save(&top_kv).await?,
Mode::OfflineOrElseOnline(offline_config, _) =>
Mode::OfflineOrElseOnline(offline_config, _) => {
if let Ok(kv) = self.load_child_snapshot(&offline_config.state_snapshot.path) {
kv
} else {
self.load_child_remote_and_maybe_save(&top_kv).await?
},
}
},
Mode::Offline(ref config) => self
.load_child_snapshot(&config.state_snapshot.path)
.map_err(|why| {
@@ -749,7 +720,7 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
}
// Public methods
impl<B: BlockT + DeserializeOwned> Builder<B> {
impl<B: BlockT> Builder<B> {
/// Create a new builder.
pub fn new() -> Self {
Default::default()
@@ -824,7 +795,13 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
}
self
}
}
// Public methods
impl<B: BlockT> Builder<B>
where
B::Header: DeserializeOwned,
{
/// Build the test externalities.
pub async fn build(self) -> Result<TestExternalities, &'static str> {
let state_version = self.state_version;
@@ -1,149 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! WS RPC API for one off RPC calls to a substrate node.
// TODO: Consolidate one off RPC calls https://github.com/paritytech/substrate/issues/8988
use jsonrpsee::{
core::client::{Client, ClientT},
rpc_params,
types::ParamsSer,
ws_client::{WsClient, WsClientBuilder},
};
use serde::de::DeserializeOwned;
use sp_runtime::{generic::SignedBlock, traits::Block as BlockT};
use std::sync::Arc;
enum RpcCall {
GetHeader,
GetFinalizedHead,
GetBlock,
GetRuntimeVersion,
}
impl RpcCall {
fn as_str(&self) -> &'static str {
match self {
RpcCall::GetHeader => "chain_getHeader",
RpcCall::GetFinalizedHead => "chain_getFinalizedHead",
RpcCall::GetBlock => "chain_getBlock",
RpcCall::GetRuntimeVersion => "state_getRuntimeVersion",
}
}
}
/// General purpose method for making RPC calls.
async fn make_request<'a, T: DeserializeOwned>(
client: &Arc<Client>,
call: RpcCall,
params: Option<ParamsSer<'a>>,
) -> Result<T, String> {
client
.request::<T>(call.as_str(), params)
.await
.map_err(|e| format!("{} request failed: {:?}", call.as_str(), e))
}
enum ConnectionPolicy {
Reuse(Arc<Client>),
Reconnect,
}
/// Simple RPC service that is capable of keeping the connection.
///
/// Service will connect to `uri` for the first time already during initialization.
///
/// Be careful with reusing the connection in a multithreaded environment.
pub struct RpcService {
uri: String,
policy: ConnectionPolicy,
}
impl RpcService {
/// Creates a new RPC service. If `keep_connection`, then connects to `uri` right away.
pub async fn new<S: AsRef<str>>(uri: S, keep_connection: bool) -> Result<Self, String> {
let policy = if keep_connection {
ConnectionPolicy::Reuse(Arc::new(Self::build_client(uri.as_ref()).await?))
} else {
ConnectionPolicy::Reconnect
};
Ok(Self { uri: uri.as_ref().to_string(), policy })
}
/// Returns the address at which requests are sent.
pub fn uri(&self) -> String {
self.uri.clone()
}
/// Build a websocket client that connects to `self.uri`.
async fn build_client<S: AsRef<str>>(uri: S) -> Result<WsClient, String> {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.build(uri)
.await
.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
}
/// Generic method for making RPC requests.
async fn make_request<'a, T: DeserializeOwned>(
&self,
call: RpcCall,
params: Option<ParamsSer<'a>>,
) -> Result<T, String> {
match self.policy {
// `self.keep_connection` must have been `true`.
ConnectionPolicy::Reuse(ref client) => make_request(client, call, params).await,
ConnectionPolicy::Reconnect => {
let client = Arc::new(Self::build_client(&self.uri).await?);
make_request(&client, call, params).await
},
}
}
/// Get the header of the block identified by `at`.
pub async fn get_header<Block>(&self, at: Block::Hash) -> Result<Block::Header, String>
where
Block: BlockT,
Block::Header: DeserializeOwned,
{
self.make_request(RpcCall::GetHeader, rpc_params!(at)).await
}
/// Get the finalized head.
pub async fn get_finalized_head<Block: BlockT>(&self) -> Result<Block::Hash, String> {
self.make_request(RpcCall::GetFinalizedHead, None).await
}
/// Get the signed block identified by `at`.
pub async fn get_block<Block: BlockT + DeserializeOwned>(
&self,
at: Block::Hash,
) -> Result<Block, String> {
Ok(self
.make_request::<SignedBlock<Block>>(RpcCall::GetBlock, rpc_params!(at))
.await?
.block)
}
/// Get the runtime version of a given chain.
pub async fn get_runtime_version<Block: BlockT + DeserializeOwned>(
&self,
at: Option<Block::Hash>,
) -> Result<sp_version::RuntimeVersion, String> {
self.make_request(RpcCall::GetRuntimeVersion, rpc_params!(at)).await
}
}
@@ -0,0 +1,25 @@
[package]
name = "substrate-rpc-client"
version = "0.10.0-dev"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2021"
license = "Apache-2.0"
homepage = "https://substrate.io"
repository = "https://github.com/paritytech/substrate/"
description = "Shared JSON-RPC client"
readme = "README.md"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
jsonrpsee = { version = "0.15.1", features = ["ws-client"] }
sc-rpc-api = { version = "0.10.0-dev", path = "../../../../client/rpc-api" }
async-trait = "0.1.57"
serde = "1"
sp-runtime = { version = "6.0.0", path = "../../../../primitives/runtime" }
log = "0.4"
[dev-dependencies]
tokio = { version = "1.17.0", features = ["macros", "rt-multi-thread", "sync"] }
sp-core = { path = "../../../../primitives/core" }
+265
View File
@@ -0,0 +1,265 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # Shared JSON-RPC client related code and abstractions.
//!
//! It exposes a `WebSocket JSON-RPC` client that implements the RPC interface in [`sc-rpc-api`]
//! along with some abstractions.
//!
//! ## Usage
//!
//! ```no_run
//! # use substrate_rpc_client::{ws_client, StateApi};
//! # use sp_core::{H256, storage::StorageKey};
//!
//! #[tokio::main]
//! async fn main() {
//!
//! let client = ws_client("ws://127.0.0.1:9944").await.unwrap();
//! client.storage(StorageKey(vec![]), Some(H256::zero())).await.unwrap();
//!
//! // if all type params are not known you need to provide type params
//! StateApi::<H256>::storage(&client, StorageKey(vec![]), None).await.unwrap();
//! }
//! ```
use async_trait::async_trait;
use serde::de::DeserializeOwned;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::collections::VecDeque;
pub use jsonrpsee::{
core::client::{ClientT, Subscription, SubscriptionClientT},
rpc_params,
ws_client::{WsClient, WsClientBuilder},
};
pub use sc_rpc_api::{
author::AuthorApiClient as AuthorApi, chain::ChainApiClient as ChainApi,
child_state::ChildStateApiClient as ChildStateApi, dev::DevApiClient as DevApi,
offchain::OffchainApiClient as OffchainApi, state::StateApiClient as StateApi,
system::SystemApiClient as SystemApi,
};
/// Create a new `WebSocket` connection with shared settings.
pub async fn ws_client(uri: impl AsRef<str>) -> Result<WsClient, String> {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.request_timeout(std::time::Duration::from_secs(60 * 10))
.connection_timeout(std::time::Duration::from_secs(60))
.max_notifs_per_subscription(1024)
.build(uri)
.await
.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
}
/// Abstraction over RPC calling for headers.
#[async_trait]
pub trait HeaderProvider<Block: BlockT>
where
Block::Header: HeaderT,
{
/// Awaits for the header of the block with hash `hash`.
async fn get_header(&self, hash: Block::Hash) -> Block::Header;
}
#[async_trait]
impl<Block: BlockT> HeaderProvider<Block> for WsClient
where
Block::Header: DeserializeOwned,
{
async fn get_header(&self, hash: Block::Hash) -> Block::Header {
ChainApi::<(), Block::Hash, Block::Header, ()>::header(self, Some(hash))
.await
.unwrap()
.unwrap()
}
}
/// Abstraction over RPC subscription for finalized headers.
#[async_trait]
pub trait HeaderSubscription<Block: BlockT>
where
Block::Header: HeaderT,
{
/// Await for the next finalized header from the subscription.
///
/// Returns `None` if either the subscription has been closed or there was an error when reading
/// an object from the client.
async fn next_header(&mut self) -> Option<Block::Header>;
}
#[async_trait]
impl<Block: BlockT> HeaderSubscription<Block> for Subscription<Block::Header>
where
Block::Header: DeserializeOwned,
{
async fn next_header(&mut self) -> Option<Block::Header> {
match self.next().await {
Some(Ok(header)) => Some(header),
None => {
log::warn!("subscription closed");
None
},
Some(Err(why)) => {
log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why);
None
},
}
}
}
/// Stream of all finalized headers.
///
/// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of
/// them lack justification).
pub struct FinalizedHeaders<
'a,
Block: BlockT,
HP: HeaderProvider<Block>,
HS: HeaderSubscription<Block>,
> {
header_provider: &'a HP,
subscription: HS,
fetched_headers: VecDeque<Block::Header>,
last_returned: Option<<Block::Header as HeaderT>::Hash>,
}
impl<'a, Block: BlockT, HP: HeaderProvider<Block>, HS: HeaderSubscription<Block>>
FinalizedHeaders<'a, Block, HP, HS>
where
<Block as BlockT>::Header: DeserializeOwned,
{
pub fn new(header_provider: &'a HP, subscription: HS) -> Self {
Self {
header_provider,
subscription,
fetched_headers: VecDeque::new(),
last_returned: None,
}
}
/// Reads next finalized header from the subscription. If some headers (without justification)
/// have been skipped, fetches them as well. Returns number of headers that have been fetched.
///
/// All fetched headers are stored in `self.fetched_headers`.
async fn fetch(&mut self) -> usize {
let last_finalized = match self.subscription.next_header().await {
Some(header) => header,
None => return 0,
};
self.fetched_headers.push_front(last_finalized.clone());
let mut last_finalized_parent = *last_finalized.parent_hash();
let last_returned = self.last_returned.unwrap_or(last_finalized_parent);
while last_finalized_parent != last_returned {
let parent_header = self.header_provider.get_header(last_finalized_parent).await;
self.fetched_headers.push_front(parent_header.clone());
last_finalized_parent = *parent_header.parent_hash();
}
self.fetched_headers.len()
}
/// Get the next finalized header.
pub async fn next(&mut self) -> Option<Block::Header> {
if self.fetched_headers.is_empty() {
self.fetch().await;
}
if let Some(header) = self.fetched_headers.pop_front() {
self.last_returned = Some(header.hash());
Some(header)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header, H256};
use std::sync::Arc;
use tokio::sync::Mutex;
type Block = TBlock<ExtrinsicWrapper<()>>;
type BlockNumber = u64;
type Hash = H256;
struct MockHeaderProvider(pub Arc<Mutex<VecDeque<BlockNumber>>>);
fn headers() -> Vec<Header> {
let mut headers = vec![Header::new_from_number(0)];
for n in 1..11 {
headers.push(Header {
parent_hash: headers.last().unwrap().hash(),
..Header::new_from_number(n)
})
}
headers
}
#[async_trait]
impl HeaderProvider<Block> for MockHeaderProvider {
async fn get_header(&self, _hash: Hash) -> Header {
let height = self.0.lock().await.pop_front().unwrap();
headers()[height as usize].clone()
}
}
struct MockHeaderSubscription(pub VecDeque<BlockNumber>);
#[async_trait]
impl HeaderSubscription<Block> for MockHeaderSubscription {
async fn next_header(&mut self) -> Option<Header> {
self.0.pop_front().map(|h| headers()[h as usize].clone())
}
}
#[tokio::test]
async fn finalized_headers_works_when_every_block_comes_from_subscription() {
let heights = vec![4, 5, 6, 7];
let provider = MockHeaderProvider(Default::default());
let subscription = MockHeaderSubscription(heights.clone().into());
let mut headers = FinalizedHeaders::new(&provider, subscription);
for h in heights {
assert_eq!(h, headers.next().await.unwrap().number);
}
assert_eq!(None, headers.next().await);
}
#[tokio::test]
async fn finalized_headers_come_from_subscription_and_provider_if_in_need() {
let all_heights = 3..11;
let heights_in_subscription = vec![3, 4, 6, 10];
// Consecutive headers will be requested in the reversed order.
let heights_not_in_subscription = vec![5, 9, 8, 7];
let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into())));
let subscription = MockHeaderSubscription(heights_in_subscription.into());
let mut headers = FinalizedHeaders::new(&provider, subscription);
for h in all_heights {
assert_eq!(h, headers.next().await.unwrap().number);
}
assert_eq!(None, headers.next().await);
}
}
@@ -19,7 +19,6 @@ parity-scale-codec = "3.0.0"
serde = "1.0.136"
zstd = { version = "0.11.2", default-features = false }
remote-externalities = { version = "0.10.0-dev", path = "../../remote-externalities" }
jsonrpsee = { version = "0.15.1", default-features = false, features = ["ws-client"] }
sc-chain-spec = { version = "4.0.0-dev", path = "../../../../client/chain-spec" }
sc-cli = { version = "0.10.0-dev", path = "../../../../client/cli" }
sc-executor = { version = "0.10.0-dev", path = "../../../../client/executor" }
@@ -33,6 +32,7 @@ sp-state-machine = { version = "0.12.0", path = "../../../../primitives/state-ma
sp-version = { version = "5.0.0", path = "../../../../primitives/version" }
sp-weights = { version = "4.0.0", path = "../../../../primitives/weights" }
frame-try-runtime = { path = "../../../../frame/try-runtime" }
substrate-rpc-client = { path = "../../rpc/client" }
[dev-dependencies]
tokio = "1.17.0"
@@ -20,11 +20,11 @@ use crate::{
state_machine_call_with_proof, SharedParams, State, LOG_TARGET,
};
use parity_scale_codec::Encode;
use remote_externalities::rpc_api;
use sc_service::{Configuration, NativeExecutionDispatch};
use sp_core::storage::well_known_keys;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{fmt::Debug, str::FromStr};
use substrate_rpc_client::{ws_client, ChainApi};
/// Configurations of the [`Command::ExecuteBlock`].
///
@@ -84,10 +84,11 @@ pub struct ExecuteBlockCmd {
impl ExecuteBlockCmd {
async fn block_at<Block: BlockT>(&self, ws_uri: String) -> sc_cli::Result<Block::Hash>
where
Block::Hash: FromStr,
Block::Hash: FromStr + serde::de::DeserializeOwned,
<Block::Hash as FromStr>::Err: Debug,
Block::Header: serde::de::DeserializeOwned,
{
let rpc_service = rpc_api::RpcService::new(ws_uri, false).await?;
let rpc = ws_client(&ws_uri).await?;
match (&self.block_at, &self.state) {
(Some(block_at), State::Snap { .. }) => hash_of::<Block>(block_at),
@@ -100,7 +101,9 @@ impl ExecuteBlockCmd {
target: LOG_TARGET,
"No --block-at or --at provided, using the latest finalized block instead"
);
rpc_service.get_finalized_head::<Block>().await.map_err(Into::into)
ChainApi::<(), Block::Hash, Block::Header, ()>::finalized_head(&rpc)
.await
.map_err(|e| e.to_string().into())
},
(None, State::Live { at: Some(at), .. }) => hash_of::<Block>(at),
_ => {
@@ -137,6 +140,8 @@ where
Block: BlockT + serde::de::DeserializeOwned,
Block::Hash: FromStr,
<Block::Hash as FromStr>::Err: Debug,
Block::Hash: serde::de::DeserializeOwned,
Block::Header: serde::de::DeserializeOwned,
NumberFor<Block>: FromStr,
<NumberFor<Block> as FromStr>::Err: Debug,
ExecDispatch: NativeExecutionDispatch + 'static,
@@ -146,8 +151,11 @@ where
let block_ws_uri = command.block_ws_uri::<Block>();
let block_at = command.block_at::<Block>(block_ws_uri.clone()).await?;
let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false).await?;
let block: Block = rpc_service.get_block::<Block>(block_at).await?;
let rpc = ws_client(&block_ws_uri).await?;
let block: Block = ChainApi::<(), Block::Hash, Block::Header, _>::block(&rpc, Some(block_at))
.await
.unwrap()
.unwrap();
let parent_hash = block.header().parent_hash();
log::info!(
target: LOG_TARGET,
@@ -19,22 +19,15 @@ use crate::{
build_executor, ensure_matching_spec, extract_code, full_extensions, local_spec, parse,
state_machine_call_with_proof, SharedParams, LOG_TARGET,
};
use jsonrpsee::{
core::{
async_trait,
client::{Client, Subscription, SubscriptionClientT},
},
ws_client::WsClientBuilder,
};
use parity_scale_codec::{Decode, Encode};
use remote_externalities::{rpc_api::RpcService, Builder, Mode, OnlineConfig};
use remote_externalities::{Builder, Mode, OnlineConfig};
use sc_executor::NativeExecutionDispatch;
use sc_service::Configuration;
use serde::de::DeserializeOwned;
use serde::{de::DeserializeOwned, Serialize};
use sp_core::H256;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use sp_weights::Weight;
use std::{collections::VecDeque, fmt::Debug, str::FromStr};
use std::{fmt::Debug, str::FromStr};
use substrate_rpc_client::{ws_client, ChainApi, FinalizedHeaders, Subscription, WsClient};
const SUB: &str = "chain_subscribeFinalizedHeads";
const UN_SUB: &str = "chain_unsubscribeFinalizedHeads";
@@ -71,144 +64,19 @@ pub struct FollowChainCmd {
///
/// Returns a pair `(client, subscription)` - `subscription` alone will be useless, because it
/// relies on the related alive `client`.
async fn start_subscribing<Header: DeserializeOwned>(
async fn start_subscribing<Header: DeserializeOwned + Serialize + Send + Sync + 'static>(
url: &str,
) -> sc_cli::Result<(Client, Subscription<Header>)> {
let client = WsClientBuilder::default()
.connection_timeout(std::time::Duration::new(20, 0))
.max_notifs_per_subscription(1024)
.max_request_body_size(u32::MAX)
.build(url)
.await
.map_err(|e| sc_cli::Error::Application(e.into()))?;
) -> sc_cli::Result<(WsClient, Subscription<Header>)> {
let client = ws_client(url).await.map_err(|e| sc_cli::Error::Application(e.into()))?;
log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", SUB, UN_SUB);
let sub = client
.subscribe(SUB, None, UN_SUB)
let sub = ChainApi::<(), (), Header, ()>::subscribe_finalized_heads(&client)
.await
.map_err(|e| sc_cli::Error::Application(e.into()))?;
Ok((client, sub))
}
/// Abstraction over RPC calling for headers.
#[async_trait]
trait HeaderProvider<Block: BlockT>
where
Block::Header: HeaderT,
{
/// Awaits for the header of the block with hash `hash`.
async fn get_header(&self, hash: Block::Hash) -> Block::Header;
}
#[async_trait]
impl<Block: BlockT> HeaderProvider<Block> for RpcService
where
Block::Header: DeserializeOwned,
{
async fn get_header(&self, hash: Block::Hash) -> Block::Header {
self.get_header::<Block>(hash).await.unwrap()
}
}
/// Abstraction over RPC subscription for finalized headers.
#[async_trait]
trait HeaderSubscription<Block: BlockT>
where
Block::Header: HeaderT,
{
/// Await for the next finalized header from the subscription.
///
/// Returns `None` if either the subscription has been closed or there was an error when reading
/// an object from the client.
async fn next_header(&mut self) -> Option<Block::Header>;
}
#[async_trait]
impl<Block: BlockT> HeaderSubscription<Block> for Subscription<Block::Header>
where
Block::Header: DeserializeOwned,
{
async fn next_header(&mut self) -> Option<Block::Header> {
match self.next().await {
Some(Ok(header)) => Some(header),
None => {
log::warn!("subscription closed");
None
},
Some(Err(why)) => {
log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why);
None
},
}
}
}
/// Stream of all finalized headers.
///
/// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of
/// them lack justification).
struct FinalizedHeaders<'a, Block: BlockT, HP: HeaderProvider<Block>, HS: HeaderSubscription<Block>>
{
header_provider: &'a HP,
subscription: HS,
fetched_headers: VecDeque<Block::Header>,
last_returned: Option<<Block::Header as HeaderT>::Hash>,
}
impl<'a, Block: BlockT, HP: HeaderProvider<Block>, HS: HeaderSubscription<Block>>
FinalizedHeaders<'a, Block, HP, HS>
where
<Block as BlockT>::Header: DeserializeOwned,
{
pub fn new(header_provider: &'a HP, subscription: HS) -> Self {
Self {
header_provider,
subscription,
fetched_headers: VecDeque::new(),
last_returned: None,
}
}
/// Reads next finalized header from the subscription. If some headers (without justification)
/// have been skipped, fetches them as well. Returns number of headers that have been fetched.
///
/// All fetched headers are stored in `self.fetched_headers`.
async fn fetch(&mut self) -> usize {
let last_finalized = match self.subscription.next_header().await {
Some(header) => header,
None => return 0,
};
self.fetched_headers.push_front(last_finalized.clone());
let mut last_finalized_parent = *last_finalized.parent_hash();
let last_returned = self.last_returned.unwrap_or(last_finalized_parent);
while last_finalized_parent != last_returned {
let parent_header = self.header_provider.get_header(last_finalized_parent).await;
self.fetched_headers.push_front(parent_header.clone());
last_finalized_parent = *parent_header.parent_hash();
}
self.fetched_headers.len()
}
/// Get the next finalized header.
pub async fn next(&mut self) -> Option<Block::Header> {
if self.fetched_headers.is_empty() {
self.fetch().await;
}
if let Some(header) = self.fetched_headers.pop_front() {
self.last_returned = Some(header.hash());
Some(header)
} else {
None
}
}
}
pub(crate) async fn follow_chain<Block, ExecDispatch>(
shared: SharedParams,
command: FollowChainCmd,
@@ -224,22 +92,23 @@ where
ExecDispatch: NativeExecutionDispatch + 'static,
{
let mut maybe_state_ext = None;
let (_client, subscription) = start_subscribing::<Block::Header>(&command.uri).await?;
let (rpc, subscription) = start_subscribing::<Block::Header>(&command.uri).await?;
let (code_key, code) = extract_code(&config.chain_spec)?;
let executor = build_executor::<ExecDispatch>(&shared, &config);
let execution = shared.execution;
let rpc_service = RpcService::new(&command.uri, command.keep_connection).await?;
let mut finalized_headers: FinalizedHeaders<Block, RpcService, Subscription<Block::Header>> =
FinalizedHeaders::new(&rpc_service, subscription);
let mut finalized_headers: FinalizedHeaders<Block, _, _> =
FinalizedHeaders::new(&rpc, subscription);
while let Some(header) = finalized_headers.next().await {
let hash = header.hash();
let number = header.number();
let block = rpc_service.get_block::<Block>(hash).await.unwrap();
let block: Block = ChainApi::<(), Block::Hash, Block::Header, _>::block(&rpc, Some(hash))
.await
.unwrap()
.unwrap();
log::debug!(
target: LOG_TARGET,
@@ -295,7 +164,7 @@ where
full_extensions(),
)?;
let consumed_weight = <Weight as Decode>::decode(&mut &*encoded_result)
let consumed_weight = <sp_weights::Weight as Decode>::decode(&mut &*encoded_result)
.map_err(|e| format!("failed to decode weight: {:?}", e))?;
let storage_changes = changes
@@ -326,76 +195,3 @@ where
log::error!(target: LOG_TARGET, "ws subscription must have terminated.");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header};
use std::sync::Arc;
use tokio::sync::Mutex;
type Block = TBlock<ExtrinsicWrapper<()>>;
type BlockNumber = u64;
type Hash = H256;
struct MockHeaderProvider(pub Arc<Mutex<VecDeque<BlockNumber>>>);
fn headers() -> Vec<Header> {
let mut headers = vec![Header::new_from_number(0)];
for n in 1..11 {
headers.push(Header {
parent_hash: headers.last().unwrap().hash(),
..Header::new_from_number(n)
})
}
headers
}
#[async_trait]
impl HeaderProvider<Block> for MockHeaderProvider {
async fn get_header(&self, _hash: Hash) -> Header {
let height = self.0.lock().await.pop_front().unwrap();
headers()[height as usize].clone()
}
}
struct MockHeaderSubscription(pub VecDeque<BlockNumber>);
#[async_trait]
impl HeaderSubscription<Block> for MockHeaderSubscription {
async fn next_header(&mut self) -> Option<Header> {
self.0.pop_front().map(|h| headers()[h as usize].clone())
}
}
#[tokio::test]
async fn finalized_headers_works_when_every_block_comes_from_subscription() {
let heights = vec![4, 5, 6, 7];
let provider = MockHeaderProvider(Default::default());
let subscription = MockHeaderSubscription(heights.clone().into());
let mut headers = FinalizedHeaders::new(&provider, subscription);
for h in heights {
assert_eq!(h, headers.next().await.unwrap().number);
}
assert_eq!(None, headers.next().await);
}
#[tokio::test]
async fn finalized_headers_come_from_subscription_and_provider_if_in_need() {
let all_heights = 3..11;
let heights_in_subscription = vec![3, 4, 6, 10];
// Consecutive headers will be requested in the reversed order.
let heights_not_in_subscription = vec![5, 9, 8, 7];
let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into())));
let subscription = MockHeaderSubscription(heights_in_subscription.into());
let mut headers = FinalizedHeaders::new(&provider, subscription);
for h in all_heights {
assert_eq!(h, headers.next().await.unwrap().number);
}
assert_eq!(None, headers.next().await);
}
}
@@ -20,12 +20,12 @@ use crate::{
parse, state_machine_call, SharedParams, State, LOG_TARGET,
};
use parity_scale_codec::Encode;
use remote_externalities::rpc_api;
use sc_executor::NativeExecutionDispatch;
use sc_service::Configuration;
use sp_core::storage::well_known_keys;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use std::{fmt::Debug, str::FromStr};
use substrate_rpc_client::{ws_client, ChainApi};
/// Configurations of the [`Command::OffchainWorker`].
#[derive(Debug, Clone, clap::Parser)]
@@ -117,8 +117,11 @@ where
let header_at = command.header_at::<Block>()?;
let header_ws_uri = command.header_ws_uri::<Block>();
let rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false).await?;
let header = rpc_service.get_header::<Block>(header_at).await?;
let rpc = ws_client(&header_ws_uri).await?;
let header = ChainApi::<(), Block::Hash, Block::Header, ()>::header(&rpc, Some(header_at))
.await
.unwrap()
.unwrap();
log::info!(
target: LOG_TARGET,
"fetched header from {:?}, block number: {:?}",
@@ -45,6 +45,7 @@ where
Block: BlockT + serde::de::DeserializeOwned,
Block::Hash: FromStr,
<Block::Hash as FromStr>::Err: Debug,
Block::Header: serde::de::DeserializeOwned,
NumberFor<Block>: FromStr,
<NumberFor<Block> as FromStr>::Err: Debug,
ExecDispatch: NativeExecutionDispatch + 'static,
@@ -267,8 +267,7 @@
use parity_scale_codec::Decode;
use remote_externalities::{
rpc_api::RpcService, Builder, Mode, OfflineConfig, OnlineConfig, SnapshotConfig,
TestExternalities,
Builder, Mode, OfflineConfig, OnlineConfig, SnapshotConfig, TestExternalities,
};
use sc_chain_spec::ChainSpec;
use sc_cli::{
@@ -297,6 +296,7 @@ use sp_runtime::{
use sp_state_machine::{OverlayedChanges, StateMachine, TrieBackendBuilder};
use sp_version::StateVersion;
use std::{fmt::Debug, path::PathBuf, str::FromStr};
use substrate_rpc_client::{ws_client, StateApi};
mod commands;
pub(crate) mod parse;
@@ -633,9 +633,8 @@ pub(crate) async fn ensure_matching_spec<Block: BlockT + DeserializeOwned>(
expected_spec_version: u32,
relaxed: bool,
) {
let rpc_service = RpcService::new(uri.clone(), false).await.unwrap();
match rpc_service
.get_runtime_version::<Block>(None)
let rpc = ws_client(&uri).await.unwrap();
match StateApi::<Block::Hash>::runtime_version(&rpc, None)
.await
.map(|version| (String::from(version.spec_name.clone()), version.spec_version))
.map(|(spec_name, spec_version)| (spec_name.to_lowercase(), spec_version))