// Copyright 2022 Parity Technologies (UK) Ltd. // This file is part of Cumulus. // Cumulus is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Cumulus is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . use futures::channel::mpsc::Receiver; use jsonrpsee::{core::params::ArrayParams, rpc_params}; use parity_scale_codec::{Decode, Encode}; use serde::de::DeserializeOwned; pub use url::Url; use sc_client_api::StorageData; use sc_rpc_api::{state::ReadProof, system::Health}; use sc_service::TaskManager; use sp_api::RuntimeVersion; use sp_consensus_babe::Epoch; use sp_core::sp_std::collections::btree_map::BTreeMap; use sp_storage::StorageKey; use cumulus_primitives_core::{ relay_chain::{ slashing, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, OccupiedCoreAssumption, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; use crate::reconnecting_ws_client::ReconnectingWsClient; const LOG_TARGET: &str = "relay-chain-rpc-client"; /// Client that maps RPC methods and deserializes results #[derive(Clone)] pub struct RelayChainRpcClient { /// Websocket client to make calls ws_client: ReconnectingWsClient, } /// Entry point to create [`RelayChainRpcClient`] and start a worker that distributes notifications. pub async fn create_client_and_start_worker( urls: Vec, task_manager: &mut TaskManager, ) -> RelayChainResult { let ws_client = ReconnectingWsClient::new(urls, task_manager).await?; let client = RelayChainRpcClient::new(ws_client).await?; Ok(client) } impl RelayChainRpcClient { /// Initialize new RPC Client. async fn new(ws_client: ReconnectingWsClient) -> RelayChainResult { let client = RelayChainRpcClient { ws_client }; Ok(client) } /// Call a call to `state_call` rpc method. pub async fn call_remote_runtime_function( &self, method_name: &str, hash: RelayHash, payload: Option, ) -> RelayChainResult { let payload_bytes = payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode())); let params = rpc_params! { method_name, payload_bytes, hash }; let res = self .request_tracing::("state_call", params, |err| { tracing::trace!( target: LOG_TARGET, %method_name, %hash, error = %err, "Error during call to 'state_call'.", ); }) .await?; Decode::decode(&mut &*res.0).map_err(Into::into) } /// Perform RPC request async fn request<'a, R>( &self, method: &'a str, params: ArrayParams, ) -> Result where R: DeserializeOwned + std::fmt::Debug, { self.request_tracing( method, params, |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"), ) .await } /// Perform RPC request async fn request_tracing<'a, R, OR>( &self, method: &'a str, params: ArrayParams, trace_error: OR, ) -> Result where R: DeserializeOwned + std::fmt::Debug, OR: Fn(&RelayChainError), { self.ws_client.request(method, params).await.map_err(|err| { trace_error(&err); RelayChainError::RpcCallError(method.to_string()) }) } /// Returns information regarding the current epoch. pub async fn babe_api_current_epoch(&self, at: RelayHash) -> Result { self.call_remote_runtime_function("BabeApi_current_epoch", at, None::<()>).await } /// Scrape dispute relevant from on-chain, backing votes and resolved disputes. pub async fn parachain_host_on_chain_votes( &self, at: RelayHash, ) -> Result>, RelayChainError> { self.call_remote_runtime_function("ParachainHost_on_chain_votes", at, None::<()>) .await } /// Returns code hashes of PVFs that require pre-checking by validators in the active set. pub async fn parachain_host_pvfs_require_precheck( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("ParachainHost_pvfs_require_precheck", at, None::<()>) .await } /// Submits a PVF pre-checking statement into the transaction pool. pub async fn parachain_host_submit_pvf_check_statement( &self, at: RelayHash, stmt: PvfCheckStatement, signature: ValidatorSignature, ) -> Result<(), RelayChainError> { self.call_remote_runtime_function( "ParachainHost_submit_pvf_check_statement", at, Some((stmt, signature)), ) .await } /// Get system health information pub async fn system_health(&self) -> Result { self.request("system_health", rpc_params![]).await } /// Get read proof for `storage_keys` pub async fn state_get_read_proof( &self, storage_keys: Vec, at: Option, ) -> Result, RelayChainError> { let params = rpc_params![storage_keys, at]; self.request("state_getReadProof", params).await } /// Retrieve storage item at `storage_key` pub async fn state_get_storage( &self, storage_key: StorageKey, at: Option, ) -> Result, RelayChainError> { let params = rpc_params![storage_key, at]; self.request("state_getStorage", params).await } /// Get hash of the n-th block in the canon chain. /// /// By default returns latest block hash. pub async fn chain_get_head(&self, at: Option) -> Result { let params = rpc_params![at]; self.request("chain_getHead", params).await } /// Returns the validator groups and rotation info localized based on the hypothetical child /// of a block whose state this is invoked on. Note that `now` in the `GroupRotationInfo` /// should be the successor of the number of the block. pub async fn parachain_host_validator_groups( &self, at: RelayHash, ) -> Result<(Vec>, GroupRotationInfo), RelayChainError> { self.call_remote_runtime_function("ParachainHost_validator_groups", at, None::<()>) .await } /// Get a vector of events concerning candidates that occurred within a block. pub async fn parachain_host_candidate_events( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("ParachainHost_candidate_events", at, None::<()>) .await } /// Checks if the given validation outputs pass the acceptance criteria. pub async fn parachain_host_check_validation_outputs( &self, at: RelayHash, para_id: ParaId, outputs: CandidateCommitments, ) -> Result { self.call_remote_runtime_function( "ParachainHost_check_validation_outputs", at, Some((para_id, outputs)), ) .await } /// Returns the persisted validation data for the given `ParaId` along with the corresponding /// validation code hash. Instead of accepting assumption about the para, matches the validation /// data hash against an expected one and yields `None` if they're not equal. pub async fn parachain_host_assumed_validation_data( &self, at: RelayHash, para_id: ParaId, expected_hash: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_persisted_assumed_validation_data", at, Some((para_id, expected_hash)), ) .await } /// Get hash of last finalized block. pub async fn chain_get_finalized_head(&self) -> Result { self.request("chain_getFinalizedHead", rpc_params![]).await } /// Get hash of n-th block. pub async fn chain_get_block_hash( &self, block_number: Option, ) -> Result, RelayChainError> { let params = rpc_params![block_number]; self.request("chain_getBlockHash", params).await } /// Yields the persisted validation data for the given `ParaId` along with an assumption that /// should be used if the para currently occupies a core. /// /// Returns `None` if either the para is not registered or the assumption is `Freed` /// and the para already occupies a core. pub async fn parachain_host_persisted_validation_data( &self, at: RelayHash, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_persisted_validation_data", at, Some((para_id, occupied_core_assumption)), ) .await } /// Get the validation code from its hash. pub async fn parachain_host_validation_code_by_hash( &self, at: RelayHash, validation_code_hash: ValidationCodeHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_validation_code_by_hash", at, Some(validation_code_hash), ) .await } /// Yields information on all availability cores as relevant to the child block. /// Cores are either free or occupied. Free cores can have paras assigned to them. pub async fn parachain_host_availability_cores( &self, at: RelayHash, ) -> Result>, RelayChainError> { self.call_remote_runtime_function("ParachainHost_availability_cores", at, None::<()>) .await } /// Get runtime version pub async fn runtime_version(&self, at: RelayHash) -> Result { let params = rpc_params![at]; self.request("state_getRuntimeVersion", params).await } /// Returns all onchain disputes. pub async fn parachain_host_disputes( &self, at: RelayHash, ) -> Result)>, RelayChainError> { self.call_remote_runtime_function("ParachainHost_disputes", at, None::<()>) .await } /// Returns a list of validators that lost a past session dispute and need to be slashed. /// /// This is a staging method! Do not use on production runtimes! pub async fn parachain_host_unapplied_slashes( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("ParachainHost_unapplied_slashes", at, None::<()>) .await } /// Returns a merkle proof of a validator session key in a past session. /// /// This is a staging method! Do not use on production runtimes! pub async fn parachain_host_key_ownership_proof( &self, at: RelayHash, validator_id: ValidatorId, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_key_ownership_proof", at, Some(validator_id), ) .await } /// Submits an unsigned extrinsic to slash validators who lost a dispute about /// a candidate of a past session. /// /// This is a staging method! Do not use on production runtimes! pub async fn parachain_host_submit_report_dispute_lost( &self, at: RelayHash, dispute_proof: slashing::DisputeProof, key_ownership_proof: slashing::OpaqueKeyOwnershipProof, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_submit_report_dispute_lost", at, Some((dispute_proof, key_ownership_proof)), ) .await } pub async fn authority_discovery_authorities( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("AuthorityDiscoveryApi_authorities", at, None::<()>) .await } /// Fetch the validation code used by a para, making the given `OccupiedCoreAssumption`. /// /// Returns `None` if either the para is not registered or the assumption is `Freed` /// and the para already occupies a core. pub async fn parachain_host_validation_code( &self, at: RelayHash, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_validation_code", at, Some((para_id, occupied_core_assumption)), ) .await } /// Fetch the hash of the validation code used by a para, making the given /// `OccupiedCoreAssumption`. pub async fn parachain_host_validation_code_hash( &self, at: RelayHash, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_validation_code_hash", at, Some((para_id, occupied_core_assumption)), ) .await } /// Get the session info for the given session, if stored. pub async fn parachain_host_session_info( &self, at: RelayHash, index: SessionIndex, ) -> Result, RelayChainError> { self.call_remote_runtime_function("ParachainHost_session_info", at, Some(index)) .await } /// Get the executor parameters for the given session, if stored pub async fn parachain_host_session_executor_params( &self, at: RelayHash, session_index: SessionIndex, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_session_executor_params", at, Some(session_index), ) .await } /// Get header at specified hash pub async fn chain_get_header( &self, hash: Option, ) -> Result, RelayChainError> { let params = rpc_params![hash]; self.request("chain_getHeader", params).await } /// Get the receipt of a candidate pending availability. This returns `Some` for any paras /// assigned to occupied cores in `availability_cores` and `None` otherwise. pub async fn parachain_host_candidate_pending_availability( &self, at: RelayHash, para_id: ParaId, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_candidate_pending_availability", at, Some(para_id), ) .await } /// Returns the session index expected at a child of the block. /// /// This can be used to instantiate a `SigningContext`. pub async fn parachain_host_session_index_for_child( &self, at: RelayHash, ) -> Result { self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>) .await } /// Get the current validators. pub async fn parachain_host_validators( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>) .await } /// Get the contents of all channels addressed to the given recipient. Channels that have no /// messages in them are also included. pub async fn parachain_host_inbound_hrmp_channels_contents( &self, para_id: ParaId, at: RelayHash, ) -> Result>, RelayChainError> { self.call_remote_runtime_function( "ParachainHost_inbound_hrmp_channels_contents", at, Some(para_id), ) .await } /// Get all the pending inbound messages in the downward message queue for a para. pub async fn parachain_host_dmq_contents( &self, para_id: ParaId, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id)) .await } /// Get a stream of all imported relay chain headers pub fn get_imported_heads_stream(&self) -> Result, RelayChainError> { self.ws_client.get_imported_heads_stream() } /// Get a stream of new best relay chain headers pub fn get_best_heads_stream(&self) -> Result, RelayChainError> { self.ws_client.get_best_heads_stream() } /// Get a stream of finalized relay chain headers pub fn get_finalized_heads_stream(&self) -> Result, RelayChainError> { self.ws_client.get_finalized_heads_stream() } }