// Copyright (C) Parity Technologies (UK) Ltd. // This file is part of Pezcumulus. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // Pezcumulus is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Pezcumulus is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Pezcumulus. If not, see . use futures::channel::{ mpsc::{Receiver, Sender}, oneshot::Sender as OneshotSender, }; use jsonrpsee::{ core::{params::ArrayParams, ClientError as JsonRpseeError}, rpc_params, }; use prometheus::Registry; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value as JsonValue; use std::collections::{btree_map::BTreeMap, VecDeque}; use tokio::sync::mpsc::Sender as TokioSender; use codec::{Decode, Encode}; use pezcumulus_primitives_core::{ relay_chain::{ async_backing::{AsyncBackingParams, BackingState, Constraints}, slashing, ApprovalVotingParams, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, NodeFeatures, OccupiedCoreAssumption, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; use pezcumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; use pezsc_client_api::StorageData; use pezsc_rpc_api::{state::ReadProof, system::Health}; use pezsc_service::TaskManager; use pezsp_consensus_babe::Epoch; use pezsp_storage::StorageKey; use pezsp_version::RuntimeVersion; use crate::{metrics::RelaychainRpcMetrics, reconnecting_ws_client::ReconnectingWebsocketWorker}; pub use url::Url; const LOG_TARGET: &str = "relay-chain-rpc-client"; const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20; /// Messages for communication between [`RelayChainRpcClient`] and the RPC workers. #[derive(Debug)] pub enum RpcDispatcherMessage { /// Register new listener for the best headers stream. Contains a sender which will be used /// to send incoming headers. RegisterBestHeadListener(Sender), /// Register new listener for the import headers stream. Contains a sender which will be used /// to send incoming headers. RegisterImportListener(Sender), /// Register new listener for the finalized headers stream. Contains a sender which will be /// used to send incoming headers. RegisterFinalizationListener(Sender), /// Register new listener for the finalized headers stream. /// Contains the following: /// - [`String`] representing the RPC method to be called /// - [`ArrayParams`] for the parameters to the RPC call /// - [`OneshotSender`] for the return value of the request Request(String, ArrayParams, OneshotSender>), } /// Entry point to create [`RelayChainRpcClient`] and start a worker that communicates /// to JsonRPC servers over the network. pub async fn create_client_and_start_worker( urls: Vec, task_manager: &mut TaskManager, prometheus_registry: Option<&Registry>, ) -> RelayChainResult { let (worker, sender) = ReconnectingWebsocketWorker::new(urls).await; task_manager .spawn_essential_handle() .spawn("relay-chain-rpc-worker", None, worker.run()); let client = RelayChainRpcClient::new(sender, prometheus_registry); Ok(client) } #[derive(Serialize)] struct PayloadToHex<'a>(#[serde(with = "pezsp_core::bytes")] &'a [u8]); /// Client that maps RPC methods and deserializes results #[derive(Clone)] pub struct RelayChainRpcClient { /// Sender to send messages to the worker. worker_channel: TokioSender, metrics: Option, } impl RelayChainRpcClient { /// Initialize new RPC Client. /// /// This client expects a channel connected to a worker that processes /// requests sent via this channel. pub(crate) fn new( worker_channel: TokioSender, prometheus_registry: Option<&Registry>, ) -> Self { RelayChainRpcClient { worker_channel, metrics: prometheus_registry .and_then(|inner| RelaychainRpcMetrics::register(inner).map_err(|err| { tracing::warn!(target: LOG_TARGET, error = %err, "Unable to instantiate the RPC client metrics, continuing w/o metrics setup."); }).ok()), } } /// Same as `call_remote_runtime_function` but work on encoded data pub async fn call_remote_runtime_function_encoded( &self, method_name: &str, hash: RelayHash, payload: &[u8], ) -> RelayChainResult { let payload = PayloadToHex(payload); let params = rpc_params! { method_name, payload, hash }; self.request_tracing::("state_call", params, |err| { tracing::trace!( target: LOG_TARGET, %method_name, %hash, error = %err, "Error during call to 'state_call'.", ); }) .await } /// Call a call to `state_call` rpc method. pub async fn call_remote_runtime_function( &self, method_name: &str, hash: RelayHash, payload: Option, ) -> RelayChainResult { let payload_bytes = payload.map_or(pezsp_core::Bytes(Vec::new()), |v| pezsp_core::Bytes(v.encode())); let res = self .call_remote_runtime_function_encoded(method_name, hash, &payload_bytes) .await?; Decode::decode(&mut &*res.0).map_err(Into::into) } /// Perform RPC request async fn request<'a, R>( &self, method: &'a str, params: ArrayParams, ) -> Result where R: DeserializeOwned + std::fmt::Debug, { self.request_tracing( method, params, |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"), ) .await } /// Perform RPC request async fn request_tracing<'a, R, OR>( &self, method: &'a str, params: ArrayParams, trace_error: OR, ) -> Result where R: DeserializeOwned + std::fmt::Debug, OR: Fn(&RelayChainError), { let _timer = self.metrics.as_ref().map(|inner| inner.start_request_timer(method)); let (tx, rx) = futures::channel::oneshot::channel(); let message = RpcDispatcherMessage::Request(method.into(), params, tx); self.worker_channel.send(message).await.map_err(|err| { RelayChainError::WorkerCommunicationError(format!( "Unable to send message to RPC worker: {}", err )) })?; let value = rx.await.map_err(|err| { RelayChainError::WorkerCommunicationError(format!( "RPC worker channel closed. This can hint and connectivity issues with the supplied RPC endpoints. Message: {}", err )) })??; serde_json::from_value(value).map_err(|_| { trace_error(&RelayChainError::GenericError("Unable to deserialize value".to_string())); RelayChainError::RpcCallError(method.to_string()) }) } /// Returns information regarding the current epoch. pub async fn babe_api_current_epoch(&self, at: RelayHash) -> Result { self.call_remote_runtime_function("BabeApi_current_epoch", at, None::<()>).await } /// Scrape dispute relevant from on-chain, backing votes and resolved disputes. pub async fn teyrchain_host_on_chain_votes( &self, at: RelayHash, ) -> Result>, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_on_chain_votes", at, None::<()>) .await } /// Returns code hashes of PVFs that require pre-checking by validators in the active set. pub async fn teyrchain_host_pvfs_require_precheck( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_pvfs_require_precheck", at, None::<()>) .await } /// Submits a PVF pre-checking statement into the transaction pool. pub async fn teyrchain_host_submit_pvf_check_statement( &self, at: RelayHash, stmt: PvfCheckStatement, signature: ValidatorSignature, ) -> Result<(), RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_submit_pvf_check_statement", at, Some((stmt, signature)), ) .await } /// Get system health information pub async fn system_health(&self) -> Result { self.request("system_health", rpc_params![]).await } /// Get read proof for `storage_keys` pub async fn state_get_read_proof( &self, storage_keys: Vec, at: Option, ) -> Result, RelayChainError> { let params = rpc_params![storage_keys, at]; self.request("state_getReadProof", params).await } /// Retrieve storage item at `storage_key` pub async fn state_get_storage( &self, storage_key: StorageKey, at: Option, ) -> Result, RelayChainError> { let params = rpc_params![storage_key, at]; self.request("state_getStorage", params).await } /// Get hash of the n-th block in the canon chain. /// /// By default returns latest block hash. pub async fn chain_get_head(&self, at: Option) -> Result { let params = rpc_params![at]; self.request("chain_getHead", params).await } /// Returns the validator groups and rotation info localized based on the hypothetical child /// of a block whose state this is invoked on. Note that `now` in the `GroupRotationInfo` /// should be the successor of the number of the block. pub async fn teyrchain_host_validator_groups( &self, at: RelayHash, ) -> Result<(Vec>, GroupRotationInfo), RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_validator_groups", at, None::<()>) .await } /// Get a vector of events concerning candidates that occurred within a block. pub async fn teyrchain_host_candidate_events( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_candidate_events", at, None::<()>) .await } /// Checks if the given validation outputs pass the acceptance criteria. pub async fn teyrchain_host_check_validation_outputs( &self, at: RelayHash, para_id: ParaId, outputs: CandidateCommitments, ) -> Result { self.call_remote_runtime_function( "TeyrchainHost_check_validation_outputs", at, Some((para_id, outputs)), ) .await } /// Returns the persisted validation data for the given `ParaId` along with the corresponding /// validation code hash. Instead of accepting assumption about the para, matches the validation /// data hash against an expected one and yields `None` if they're not equal. pub async fn teyrchain_host_assumed_validation_data( &self, at: RelayHash, para_id: ParaId, expected_hash: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_persisted_assumed_validation_data", at, Some((para_id, expected_hash)), ) .await } /// Get hash of last finalized block. pub async fn chain_get_finalized_head(&self) -> Result { self.request("chain_getFinalizedHead", rpc_params![]).await } /// Get hash of n-th block. pub async fn chain_get_block_hash( &self, block_number: Option, ) -> Result, RelayChainError> { let params = rpc_params![block_number]; self.request("chain_getBlockHash", params).await } /// Yields the persisted validation data for the given `ParaId` along with an assumption that /// should be used if the para currently occupies a core. /// /// Returns `None` if either the para is not registered or the assumption is `Freed` /// and the para already occupies a core. pub async fn teyrchain_host_persisted_validation_data( &self, at: RelayHash, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_persisted_validation_data", at, Some((para_id, occupied_core_assumption)), ) .await } /// Get the validation code from its hash. pub async fn teyrchain_host_validation_code_by_hash( &self, at: RelayHash, validation_code_hash: ValidationCodeHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_validation_code_by_hash", at, Some(validation_code_hash), ) .await } /// Yields information on all availability cores as relevant to the child block. /// Cores are either free or occupied. Free cores can have paras assigned to them. pub async fn teyrchain_host_availability_cores( &self, at: RelayHash, ) -> Result>, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_availability_cores", at, None::<()>) .await } /// Get runtime version pub async fn runtime_version(&self, at: RelayHash) -> Result { let params = rpc_params![at]; self.request("state_getRuntimeVersion", params).await } /// Returns all onchain disputes. pub async fn teyrchain_host_disputes( &self, at: RelayHash, ) -> Result)>, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_disputes", at, None::<()>) .await } /// Returns a list of validators that lost a past session dispute and need to be slashed. /// /// This is a staging method! Do not use on production runtimes! pub async fn teyrchain_host_unapplied_slashes( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_unapplied_slashes", at, None::<()>) .await } /// Returns a list of validators that lost a past session dispute and need to be slashed. pub async fn teyrchain_host_unapplied_slashes_v2( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_unapplied_slashes_v2", at, None::<()>) .await } /// Returns a merkle proof of a validator session key in a past session. /// /// This is a staging method! Do not use on production runtimes! pub async fn teyrchain_host_key_ownership_proof( &self, at: RelayHash, validator_id: ValidatorId, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_key_ownership_proof", at, Some(validator_id), ) .await } /// Submits an unsigned extrinsic to slash validators who lost a dispute about /// a candidate of a past session. /// /// This is a staging method! Do not use on production runtimes! pub async fn teyrchain_host_submit_report_dispute_lost( &self, at: RelayHash, dispute_proof: slashing::DisputeProof, key_ownership_proof: slashing::OpaqueKeyOwnershipProof, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_submit_report_dispute_lost", at, Some((dispute_proof, key_ownership_proof)), ) .await } pub async fn authority_discovery_authorities( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("AuthorityDiscoveryApi_authorities", at, None::<()>) .await } /// Fetch the validation code used by a para, making the given `OccupiedCoreAssumption`. /// /// Returns `None` if either the para is not registered or the assumption is `Freed` /// and the para already occupies a core. pub async fn teyrchain_host_validation_code( &self, at: RelayHash, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_validation_code", at, Some((para_id, occupied_core_assumption)), ) .await } /// Fetch the hash of the validation code used by a para, making the given /// `OccupiedCoreAssumption`. pub async fn teyrchain_host_validation_code_hash( &self, at: RelayHash, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_validation_code_hash", at, Some((para_id, occupied_core_assumption)), ) .await } /// Get the session info for the given session, if stored. pub async fn teyrchain_host_session_info( &self, at: RelayHash, index: SessionIndex, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_session_info", at, Some(index)) .await } /// Get the executor parameters for the given session, if stored pub async fn teyrchain_host_session_executor_params( &self, at: RelayHash, session_index: SessionIndex, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_session_executor_params", at, Some(session_index), ) .await } /// Get header at specified hash pub async fn chain_get_header( &self, hash: Option, ) -> Result, RelayChainError> { let params = rpc_params![hash]; self.request("chain_getHeader", params).await } /// Get the receipt of a candidate pending availability. This returns `Some` for any paras /// assigned to occupied cores in `availability_cores` and `None` otherwise. pub async fn teyrchain_host_candidate_pending_availability( &self, at: RelayHash, para_id: ParaId, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_candidate_pending_availability", at, Some(para_id), ) .await } /// Returns the session index expected at a child of the block. /// /// This can be used to instantiate a `SigningContext`. pub async fn teyrchain_host_session_index_for_child( &self, at: RelayHash, ) -> Result { self.call_remote_runtime_function("TeyrchainHost_session_index_for_child", at, None::<()>) .await } /// Get the current validators. pub async fn teyrchain_host_validators( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_validators", at, None::<()>) .await } /// Get the contents of all channels addressed to the given recipient. Channels that have no /// messages in them are also included. pub async fn teyrchain_host_inbound_hrmp_channels_contents( &self, para_id: ParaId, at: RelayHash, ) -> Result>, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_inbound_hrmp_channels_contents", at, Some(para_id), ) .await } /// Get all the pending inbound messages in the downward message queue for a para. pub async fn teyrchain_host_dmq_contents( &self, para_id: ParaId, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_dmq_contents", at, Some(para_id)) .await } /// Get the minimum number of backing votes for a candidate. pub async fn teyrchain_host_minimum_backing_votes( &self, at: RelayHash, _session_index: SessionIndex, ) -> Result { self.call_remote_runtime_function("TeyrchainHost_minimum_backing_votes", at, None::<()>) .await } pub async fn teyrchain_host_node_features( &self, at: RelayHash, ) -> Result { self.call_remote_runtime_function("TeyrchainHost_node_features", at, None::<()>) .await } pub async fn teyrchain_host_disabled_validators( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_disabled_validators", at, None::<()>) .await } #[allow(missing_docs)] pub async fn teyrchain_host_async_backing_params( &self, at: RelayHash, ) -> Result { self.call_remote_runtime_function("TeyrchainHost_async_backing_params", at, None::<()>) .await } #[allow(missing_docs)] pub async fn teyrchain_host_staging_approval_voting_params( &self, at: RelayHash, _session_index: SessionIndex, ) -> Result { self.call_remote_runtime_function( "TeyrchainHost_staging_approval_voting_params", at, None::<()>, ) .await } pub async fn teyrchain_host_para_backing_state( &self, at: RelayHash, para_id: ParaId, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_para_backing_state", at, Some(para_id)) .await } pub async fn teyrchain_host_claim_queue( &self, at: RelayHash, ) -> Result>, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_claim_queue", at, None::<()>) .await } /// Get the receipt of all candidates pending availability. pub async fn teyrchain_host_candidates_pending_availability( &self, at: RelayHash, para_id: ParaId, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_candidates_pending_availability", at, Some(para_id), ) .await } pub async fn teyrchain_host_scheduling_lookahead( &self, at: RelayHash, ) -> Result { self.call_remote_runtime_function("TeyrchainHost_scheduling_lookahead", at, None::<()>) .await } pub async fn teyrchain_host_validation_code_bomb_limit( &self, at: RelayHash, ) -> Result { self.call_remote_runtime_function( "TeyrchainHost_validation_code_bomb_limit", at, None::<()>, ) .await } pub async fn validation_code_hash( &self, at: RelayHash, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> Result, RelayChainError> { self.call_remote_runtime_function( "TeyrchainHost_validation_code_hash", at, Some((para_id, occupied_core_assumption)), ) .await } pub async fn teyrchain_host_backing_constraints( &self, at: RelayHash, para_id: ParaId, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_backing_constraints", at, Some(para_id)) .await } fn send_register_message_to_worker( &self, message: RpcDispatcherMessage, ) -> Result<(), RelayChainError> { self.worker_channel .try_send(message) .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string())) } /// Get a stream of all imported relay chain headers pub fn get_imported_heads_stream(&self) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); self.send_register_message_to_worker(RpcDispatcherMessage::RegisterImportListener(tx))?; Ok(rx) } /// Get a stream of new best relay chain headers pub fn get_best_heads_stream(&self) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); self.send_register_message_to_worker(RpcDispatcherMessage::RegisterBestHeadListener(tx))?; Ok(rx) } /// Get a stream of finalized relay chain headers pub fn get_finalized_heads_stream(&self) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); self.send_register_message_to_worker(RpcDispatcherMessage::RegisterFinalizationListener( tx, ))?; Ok(rx) } pub async fn teyrchain_host_para_ids( &self, at: RelayHash, ) -> Result, RelayChainError> { self.call_remote_runtime_function("TeyrchainHost_para_ids", at, None::<()>) .await } } /// Send `header` through all channels contained in `senders`. /// If no one is listening to the sender, it is removed from the vector. pub fn distribute_header(header: RelayHeader, senders: &mut Vec>) { senders.retain_mut(|e| { match e.try_send(header.clone()) { // Receiver has been dropped, remove Sender from list. Err(error) if error.is_disconnected() => false, // Channel is full. This should not happen. // TODO: Improve error handling here // https://github.com/pezkuwichain/pezkuwi-sdk/issues/90 Err(error) => { tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications."); true }, _ => true, } }); }