diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index c62d71348f..046e5991ba 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2503,7 +2503,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util", + "tokio-util 0.7.4", "tracing", ] @@ -2928,7 +2928,7 @@ dependencies = [ "thiserror", "tokio", "tokio-rustls", - "tokio-util", + "tokio-util 0.7.4", "tracing", "webpki-roots", ] @@ -2991,7 +2991,7 @@ dependencies = [ "soketto", "tokio", "tokio-stream", - "tokio-util", + "tokio-util 0.7.4", "tower", "tracing", ] @@ -4279,7 +4279,7 @@ dependencies = [ "substrate-rpc-client", "tempfile", "tokio", - "tokio-util", + "tokio-util 0.7.4", "try-runtime-cli", "wait-timeout", ] @@ -7774,7 +7774,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tokio-util", + "tokio-util 0.7.4", "unsigned-varint", "zeroize", ] @@ -8085,20 +8085,33 @@ dependencies = [ name = "sc-rpc-spec-v2" version = "0.10.0-dev" dependencies = [ + "array-bytes", + "assert_matches", "futures", + "futures-util", "hex", "jsonrpsee", + "log", "parity-scale-codec", + "parking_lot 0.12.1", + "sc-block-builder", "sc-chain-spec", + "sc-client-api", "sc-transaction-pool-api", "serde", "serde_json", "sp-api", "sp-blockchain", + "sp-consensus", "sp-core", + "sp-maybe-compressed-blob", "sp-runtime", + "sp-version", + "substrate-test-runtime", + "substrate-test-runtime-client", "thiserror", "tokio", + "tokio-stream", ] [[package]] @@ -10264,6 +10277,7 @@ dependencies = [ "futures-core", "pin-project-lite 0.2.9", "tokio", + "tokio-util 0.6.10", ] [[package]] @@ -10279,6 +10293,20 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "tokio-util" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite 0.2.9", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.4" diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index 930aeb4bd8..f9598b3c1a 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -22,12 +22,25 @@ sp-core = { version = "7.0.0", path = "../../primitives/core" } sp-runtime = { version = "7.0.0", path = "../../primitives/runtime" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } +sp-version = { version = "5.0.0", path = "../../primitives/version" } +sc-client-api = { version = "4.0.0-dev", path = "../api" } codec = { package = "parity-scale-codec", version = "3.0.0" } thiserror = "1.0" serde = "1.0" hex = "0.4" futures = "0.3.21" +parking_lot = "0.12.1" +tokio-stream = { version = "0.1", features = ["sync"] } +array-bytes = "4.1" +log = "0.4.17" +futures-util = { version = "0.3.19", default-features = false } [dev-dependencies] serde_json = "1.0" tokio = { version = "1.22.0", features = ["macros"] } +substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" } +substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime" } +sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } +sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" } +sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } +assert_matches = "1.3.0" diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs new file mode 100644 index 0000000000..7e72d6d774 --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs @@ -0,0 +1,137 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +#![allow(non_snake_case)] + +//! API trait of the chain head. +use crate::chain_head::event::{ChainHeadEvent, FollowEvent, NetworkConfig}; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; + +#[rpc(client, server)] +pub trait ChainHeadApi { + /// Track the state of the head of the chain: the finalized, non-finalized, and best blocks. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[subscription( + name = "chainHead_unstable_follow", + unsubscribe = "chainHead_unstable_unfollow", + item = FollowEvent, + )] + fn chain_head_unstable_follow(&self, runtime_updates: bool); + + /// Retrieves the body (list of transactions) of a pinned block. + /// + /// This method should be seen as a complement to `chainHead_unstable_follow`, + /// allowing the JSON-RPC client to retrieve more information about a block + /// that has been reported. + /// + /// Use `archive_unstable_body` if instead you want to retrieve the body of an arbitrary block. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[subscription( + name = "chainHead_unstable_body", + unsubscribe = "chainHead_unstable_stopBody", + item = ChainHeadEvent, + )] + fn chain_head_unstable_body( + &self, + follow_subscription: String, + hash: Hash, + network_config: Option, + ); + + /// Retrieves the header of a pinned block. + /// + /// This method should be seen as a complement to `chainHead_unstable_follow`, + /// allowing the JSON-RPC client to retrieve more information about a block + /// that has been reported. + /// + /// Use `archive_unstable_header` if instead you want to retrieve the header of an arbitrary + /// block. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "chainHead_unstable_header", blocking)] + fn chain_head_unstable_header( + &self, + follow_subscription: String, + hash: Hash, + ) -> RpcResult>; + + /// Get the chain's genesis hash. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "chainHead_unstable_genesisHash", blocking)] + fn chain_head_unstable_genesis_hash(&self) -> RpcResult; + + /// Return a storage entry at a specific block's state. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[subscription( + name = "chainHead_unstable_storage", + unsubscribe = "chainHead_unstable_stopStorage", + item = ChainHeadEvent, + )] + fn chain_head_unstable_storage( + &self, + follow_subscription: String, + hash: Hash, + key: String, + child_key: Option, + network_config: Option, + ); + + /// Call into the Runtime API at a specified block's state. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[subscription( + name = "chainHead_unstable_call", + unsubscribe = "chainHead_unstable_stopCall", + item = ChainHeadEvent, + )] + fn chain_head_unstable_call( + &self, + follow_subscription: String, + hash: Hash, + function: String, + call_parameters: String, + network_config: Option, + ); + + /// Unpin a block reported by the `follow` method. + /// + /// Ongoing operations that require the provided block + /// will continue normally. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "chainHead_unstable_unpin", blocking)] + fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>; +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs new file mode 100644 index 0000000000..c55625e99c --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -0,0 +1,783 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! API implementation for `chainHead`. + +use crate::{ + chain_head::{ + api::ChainHeadApiServer, + error::Error as ChainHeadRpcError, + event::{ + BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent, + Initialized, NetworkConfig, NewBlock, RuntimeEvent, RuntimeVersionEvent, + }, + subscription::{SubscriptionHandle, SubscriptionManagement, SubscriptionManagementError}, + }, + SubscriptionTaskExecutor, +}; +use codec::Encode; +use futures::{ + channel::oneshot, + future::FutureExt, + stream::{self, Stream, StreamExt}, +}; +use futures_util::future::Either; +use jsonrpsee::{ + core::{async_trait, RpcResult}, + types::{SubscriptionEmptyError, SubscriptionId, SubscriptionResult}, + SubscriptionSink, +}; +use log::{debug, error}; +use sc_client_api::{ + Backend, BlockBackend, BlockImportNotification, BlockchainEvents, CallExecutor, ChildInfo, + ExecutorProvider, FinalityNotification, StorageKey, StorageProvider, +}; +use serde::Serialize; +use sp_api::CallApiAt; +use sp_blockchain::{ + Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, +}; +use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, Bytes}; +use sp_runtime::{ + generic::BlockId, + traits::{Block as BlockT, Header}, +}; +use std::{marker::PhantomData, sync::Arc}; + +/// An API for chain head RPC calls. +pub struct ChainHead { + /// Substrate client. + client: Arc, + /// Backend of the chain. + backend: Arc, + /// Executor to spawn subscriptions. + executor: SubscriptionTaskExecutor, + /// Keep track of the pinned blocks for each subscription. + subscriptions: Arc>, + /// The hexadecimal encoded hash of the genesis block. + genesis_hash: String, + /// The maximum number of pinned blocks allowed per connection. + max_pinned_blocks: usize, + /// Phantom member to pin the block type. + _phantom: PhantomData, +} + +impl ChainHead { + /// Create a new [`ChainHead`]. + pub fn new>( + client: Arc, + backend: Arc, + executor: SubscriptionTaskExecutor, + genesis_hash: GenesisHash, + max_pinned_blocks: usize, + ) -> Self { + let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref())); + + Self { + client, + backend, + executor, + subscriptions: Arc::new(SubscriptionManagement::new()), + genesis_hash, + max_pinned_blocks, + _phantom: PhantomData, + } + } + + /// Accept the subscription and return the subscription ID on success. + fn accept_subscription( + &self, + sink: &mut SubscriptionSink, + ) -> Result { + // The subscription must be accepted before it can provide a valid subscription ID. + sink.accept()?; + + let Some(sub_id) = sink.subscription_id() else { + // This can only happen if the subscription was not accepted. + return Err(SubscriptionEmptyError) + }; + + // Get the string representation for the subscription. + let sub_id = match sub_id { + SubscriptionId::Num(num) => num.to_string(), + SubscriptionId::Str(id) => id.into_owned().into(), + }; + + Ok(sub_id) + } +} + +/// Generate the initial events reported by the RPC `follow` method. +/// +/// This includes the "Initialized" event followed by the in-memory +/// blocks via "NewBlock" and the "BestBlockChanged". +fn generate_initial_events( + client: &Arc, + backend: &Arc, + handle: &SubscriptionHandle, + runtime_updates: bool, +) -> Result>, SubscriptionManagementError> +where + Block: BlockT + 'static, + Block::Header: Unpin, + BE: Backend + 'static, + Client: HeaderBackend + CallApiAt + 'static, +{ + // The initialized event is the first one sent. + let finalized_block_hash = client.info().finalized_hash; + handle.pin_block(finalized_block_hash)?; + + let finalized_block_runtime = generate_runtime_event( + &client, + runtime_updates, + &BlockId::Hash(finalized_block_hash), + None, + ); + + let initialized_event = FollowEvent::Initialized(Initialized { + finalized_block_hash, + finalized_block_runtime, + runtime_updates, + }); + + let initial_blocks = get_initial_blocks(&backend, finalized_block_hash); + let mut in_memory_blocks = Vec::with_capacity(initial_blocks.len() + 1); + + in_memory_blocks.push(initialized_event); + for (child, parent) in initial_blocks.into_iter() { + handle.pin_block(child)?; + + let new_runtime = generate_runtime_event( + &client, + runtime_updates, + &BlockId::Hash(child), + Some(&BlockId::Hash(parent)), + ); + + let event = FollowEvent::NewBlock(NewBlock { + block_hash: child, + parent_block_hash: parent, + new_runtime, + runtime_updates, + }); + + in_memory_blocks.push(event); + } + + // Generate a new best block event. + let best_block_hash = client.info().best_hash; + if best_block_hash != finalized_block_hash { + let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }); + in_memory_blocks.push(best_block); + }; + + Ok(in_memory_blocks) +} + +/// Parse hex-encoded string parameter as raw bytes. +/// +/// If the parsing fails, the subscription is rejected. +fn parse_hex_param( + sink: &mut SubscriptionSink, + param: String, +) -> Result, SubscriptionEmptyError> { + // Methods can accept empty parameters. + if param.is_empty() { + return Ok(Default::default()) + } + + match array_bytes::hex2bytes(¶m) { + Ok(bytes) => Ok(bytes), + Err(_) => { + let _ = sink.reject(ChainHeadRpcError::InvalidParam(param)); + Err(SubscriptionEmptyError) + }, + } +} + +/// Conditionally generate the runtime event of the given block. +fn generate_runtime_event( + client: &Arc, + runtime_updates: bool, + block: &BlockId, + parent: Option<&BlockId>, +) -> Option +where + Block: BlockT + 'static, + Client: CallApiAt + 'static, +{ + // No runtime versions should be reported. + if !runtime_updates { + return None + } + + let block_rt = match client.runtime_version_at(block) { + Ok(rt) => rt, + Err(err) => return Some(err.into()), + }; + + let parent = match parent { + Some(parent) => parent, + // Nothing to compare against, always report. + None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt })), + }; + + let parent_rt = match client.runtime_version_at(parent) { + Ok(rt) => rt, + Err(err) => return Some(err.into()), + }; + + // Report the runtime version change. + if block_rt != parent_rt { + Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt })) + } else { + None + } +} + +/// Get the in-memory blocks of the client, starting from the provided finalized hash. +/// +/// Returns a tuple of block hash with parent hash. +fn get_initial_blocks( + backend: &Arc, + parent_hash: Block::Hash, +) -> Vec<(Block::Hash, Block::Hash)> +where + Block: BlockT + 'static, + BE: Backend + 'static, +{ + let mut result = Vec::new(); + let mut next_hash = Vec::new(); + next_hash.push(parent_hash); + + while let Some(parent_hash) = next_hash.pop() { + let Ok(blocks) = backend.blockchain().children(parent_hash) else { + continue + }; + + for child_hash in blocks { + result.push((child_hash, parent_hash)); + next_hash.push(child_hash); + } + } + + result +} + +/// Submit the events from the provided stream to the RPC client +/// for as long as the `rx_stop` event was not called. +async fn submit_events( + sink: &mut SubscriptionSink, + mut stream: EventStream, + rx_stop: oneshot::Receiver<()>, +) where + EventStream: Stream + Unpin, + T: Serialize, +{ + let mut stream_item = stream.next(); + let mut stop_event = rx_stop; + + while let Either::Left((Some(event), next_stop_event)) = + futures_util::future::select(stream_item, stop_event).await + { + match sink.send(&event) { + Ok(true) => { + stream_item = stream.next(); + stop_event = next_stop_event; + }, + // Client disconnected. + Ok(false) => return, + Err(_) => { + // Failed to submit event. + break + }, + } + } + + let _ = sink.send(&FollowEvent::::Stop); +} + +/// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for +/// every notification. +fn handle_import_blocks( + client: &Arc, + handle: &SubscriptionHandle, + runtime_updates: bool, + notification: BlockImportNotification, +) -> Result<(FollowEvent, Option>), SubscriptionManagementError> +where + Block: BlockT + 'static, + Client: CallApiAt + 'static, +{ + handle.pin_block(notification.hash)?; + + let new_runtime = generate_runtime_event( + &client, + runtime_updates, + &BlockId::Hash(notification.hash), + Some(&BlockId::Hash(*notification.header.parent_hash())), + ); + + // Note: `Block::Hash` will serialize to hexadecimal encoded string. + let new_block = FollowEvent::NewBlock(NewBlock { + block_hash: notification.hash, + parent_block_hash: *notification.header.parent_hash(), + new_runtime, + runtime_updates, + }); + + if !notification.is_new_best { + return Ok((new_block, None)) + } + + // If this is the new best block, then we need to generate two events. + let best_block_event = + FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: notification.hash }); + + let mut best_block_cache = handle.best_block_write(); + match *best_block_cache { + Some(block_cache) => { + // The RPC layer has not reported this block as best before. + // Note: This handles the race with the finalized branch. + if block_cache != notification.hash { + *best_block_cache = Some(notification.hash); + Ok((new_block, Some(best_block_event))) + } else { + Ok((new_block, None)) + } + }, + None => { + *best_block_cache = Some(notification.hash); + Ok((new_block, Some(best_block_event))) + }, + } +} + +/// Generate the "Finalized" event and potentially the "BestBlockChanged" for +/// every notification. +fn handle_finalized_blocks( + client: &Arc, + handle: &SubscriptionHandle, + notification: FinalityNotification, +) -> Result<(FollowEvent, Option>), SubscriptionManagementError> +where + Block: BlockT + 'static, + Client: HeaderBackend + HeaderMetadata + 'static, +{ + let last_finalized = notification.hash; + // We might not receive all new blocks reports, also pin the block here. + handle.pin_block(last_finalized)?; + + // The tree route contains the exclusive path from the last finalized block + // to the block reported by the notification. Ensure the finalized block is + // properly reported to that path. + let mut finalized_block_hashes = notification.tree_route.iter().cloned().collect::>(); + finalized_block_hashes.push(last_finalized); + + let pruned_block_hashes: Vec<_> = notification.stale_heads.iter().cloned().collect(); + + let finalized_event = FollowEvent::Finalized(Finalized { + finalized_block_hashes, + pruned_block_hashes: pruned_block_hashes.clone(), + }); + + let mut best_block_cache = handle.best_block_write(); + match *best_block_cache { + Some(block_cache) => { + // Check if the current best block is also reported as pruned. + let reported_pruned = pruned_block_hashes.iter().find(|&&hash| hash == block_cache); + if reported_pruned.is_none() { + return Ok((finalized_event, None)) + } + + // The best block is reported as pruned. Therefore, we need to signal a new + // best block event before submitting the finalized event. + let best_block_hash = client.info().best_hash; + if best_block_hash == block_cache { + // The client doest not have any new information about the best block. + // The information from `.info()` is updated from the DB as the last + // step of the finalization and it should be up to date. Also, the + // displaced nodes (list of nodes reported) should be reported with + // an offset of 32 blocks for substrate. + // If the info is outdated, there is nothing the RPC can do for now. + error!(target: "rpc-spec-v2", "Client does not contain different best block"); + Ok((finalized_event, None)) + } else { + let ancestor = sp_blockchain::lowest_common_ancestor( + &**client, + last_finalized, + best_block_hash, + ) + .map_err(|_| { + SubscriptionManagementError::Custom("Could not find common ancestor".into()) + })?; + + // The client's best block must be a descendent of the last finalized block. + // In other words, the lowest common ancestor must be the last finalized block. + if ancestor.hash != last_finalized { + return Err(SubscriptionManagementError::Custom( + "The finalized block is not an ancestor of the best block".into(), + )) + } + + // The RPC needs to also submit a new best block changed before the + // finalized event. + *best_block_cache = Some(best_block_hash); + let best_block_event = + FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }); + Ok((finalized_event, Some(best_block_event))) + } + }, + None => Ok((finalized_event, None)), + } +} + +#[async_trait] +impl ChainHeadApiServer for ChainHead +where + Block: BlockT + 'static, + Block::Header: Unpin, + BE: Backend + 'static, + Client: BlockBackend + + ExecutorProvider + + HeaderBackend + + HeaderMetadata + + BlockchainEvents + + CallApiAt + + StorageProvider + + 'static, +{ + fn chain_head_unstable_follow( + &self, + mut sink: SubscriptionSink, + runtime_updates: bool, + ) -> SubscriptionResult { + let sub_id = match self.accept_subscription(&mut sink) { + Ok(sub_id) => sub_id, + Err(err) => { + sink.close(ChainHeadRpcError::InvalidSubscriptionID); + return Err(err) + }, + }; + + // Keep track of the subscription. + let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks) else { + // Inserting the subscription can only fail if the JsonRPSee + // generated a duplicate subscription ID. + debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription already accepted", sub_id); + let _ = sink.send(&FollowEvent::::Stop); + return Ok(()) + }; + debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription accepted", sub_id); + + let client = self.client.clone(); + let handle = sub_handle.clone(); + let subscription_id = sub_id.clone(); + + let stream_import = self + .client + .import_notification_stream() + .map(move |notification| { + match handle_import_blocks(&client, &handle, runtime_updates, notification) { + Ok((new_block, None)) => stream::iter(vec![new_block]), + Ok((new_block, Some(best_block))) => stream::iter(vec![new_block, best_block]), + Err(_) => { + debug!(target: "rpc-spec-v2", "[follow][id={:?}] Failed to handle block import notification.", subscription_id); + handle.stop(); + stream::iter(vec![]) + }, + } + }) + .flatten(); + + let client = self.client.clone(); + let handle = sub_handle.clone(); + let subscription_id = sub_id.clone(); + + let stream_finalized = self + .client + .finality_notification_stream() + .map(move |notification| { + match handle_finalized_blocks(&client, &handle, notification) { + Ok((finalized_event, None)) => stream::iter(vec![finalized_event]), + Ok((finalized_event, Some(best_block))) => + stream::iter(vec![best_block, finalized_event]), + Err(_) => { + debug!(target: "rpc-spec-v2", "[follow][id={:?}] Failed to import finalized blocks", subscription_id); + handle.stop(); + stream::iter(vec![]) + }, + } + }) + .flatten(); + + let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized); + let subscriptions = self.subscriptions.clone(); + let client = self.client.clone(); + let backend = self.backend.clone(); + let fut = async move { + let Ok(initial_events) = generate_initial_events(&client, &backend, &sub_handle, runtime_updates) else { + // Stop the subscription if we exceeded the maximum number of blocks pinned. + debug!(target: "rpc-spec-v2", "[follow][id={:?}] Exceeded max pinned blocks from initial events", sub_id); + let _ = sink.send(&FollowEvent::::Stop); + return + }; + + let stream = stream::iter(initial_events).chain(merged); + + submit_events(&mut sink, stream.boxed(), rx_stop).await; + // The client disconnected or called the unsubscribe method. + subscriptions.remove_subscription(&sub_id); + debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription removed", sub_id); + }; + + self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(()) + } + + fn chain_head_unstable_body( + &self, + mut sink: SubscriptionSink, + follow_subscription: String, + hash: Block::Hash, + _network_config: Option, + ) -> SubscriptionResult { + let client = self.client.clone(); + let subscriptions = self.subscriptions.clone(); + + let fut = async move { + let Some(handle) = subscriptions.get_subscription(&follow_subscription) else { + // Invalid invalid subscription ID. + let _ = sink.send(&ChainHeadEvent::::Disjoint); + return + }; + + // Block is not part of the subscription. + if !handle.contains_block(&hash) { + let _ = sink.reject(ChainHeadRpcError::InvalidBlock); + return + } + + let event = match client.block(&BlockId::Hash(hash)) { + Ok(Some(signed_block)) => { + let extrinsics = signed_block.block.extrinsics(); + let result = format!("0x{:?}", HexDisplay::from(&extrinsics.encode())); + ChainHeadEvent::Done(ChainHeadResult { result }) + }, + Ok(None) => { + // The block's body was pruned. This subscription ID has become invalid. + debug!(target: "rpc-spec-v2", "[body][id={:?}] Stopping subscription because hash={:?} was pruned", follow_subscription, hash); + handle.stop(); + ChainHeadEvent::::Disjoint + }, + Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }), + }; + let _ = sink.send(&event); + }; + + self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(()) + } + + fn chain_head_unstable_header( + &self, + follow_subscription: String, + hash: Block::Hash, + ) -> RpcResult> { + let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else { + // Invalid invalid subscription ID. + return Ok(None) + }; + + // Block is not part of the subscription. + if !handle.contains_block(&hash) { + return Err(ChainHeadRpcError::InvalidBlock.into()) + } + + self.client + .header(BlockId::Hash(hash)) + .map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode())))) + .map_err(ChainHeadRpcError::FetchBlockHeader) + .map_err(Into::into) + } + + fn chain_head_unstable_genesis_hash(&self) -> RpcResult { + Ok(self.genesis_hash.clone()) + } + + fn chain_head_unstable_storage( + &self, + mut sink: SubscriptionSink, + follow_subscription: String, + hash: Block::Hash, + key: String, + child_key: Option, + _network_config: Option, + ) -> SubscriptionResult { + let key = StorageKey(parse_hex_param(&mut sink, key)?); + + let child_key = child_key + .map(|child_key| parse_hex_param(&mut sink, child_key)) + .transpose()? + .map(ChildInfo::new_default_from_vec); + + let client = self.client.clone(); + let subscriptions = self.subscriptions.clone(); + + let fut = async move { + let Some(handle) = subscriptions.get_subscription(&follow_subscription) else { + // Invalid invalid subscription ID. + let _ = sink.send(&ChainHeadEvent::::Disjoint); + return + }; + + // Block is not part of the subscription. + if !handle.contains_block(&hash) { + let _ = sink.reject(ChainHeadRpcError::InvalidBlock); + return + } + + // The child key is provided, use the key to query the child trie. + if let Some(child_key) = child_key { + // The child key must not be prefixed with ":child_storage:" nor + // ":child_storage:default:". + if well_known_keys::is_default_child_storage_key(child_key.storage_key()) || + well_known_keys::is_child_storage_key(child_key.storage_key()) + { + let _ = sink + .send(&ChainHeadEvent::Done(ChainHeadResult { result: None:: })); + return + } + + let res = client + .child_storage(hash, &child_key, &key) + .map(|result| { + let result = + result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0))); + ChainHeadEvent::Done(ChainHeadResult { result }) + }) + .unwrap_or_else(|error| { + ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }) + }); + let _ = sink.send(&res); + return + } + + // The main key must not be prefixed with b":child_storage:" nor + // b":child_storage:default:". + if well_known_keys::is_default_child_storage_key(&key.0) || + well_known_keys::is_child_storage_key(&key.0) + { + let _ = + sink.send(&ChainHeadEvent::Done(ChainHeadResult { result: None:: })); + return + } + + // Main root trie storage query. + let res = client + .storage(hash, &key) + .map(|result| { + let result = + result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0))); + ChainHeadEvent::Done(ChainHeadResult { result }) + }) + .unwrap_or_else(|error| { + ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }) + }); + let _ = sink.send(&res); + }; + + self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(()) + } + + fn chain_head_unstable_call( + &self, + mut sink: SubscriptionSink, + follow_subscription: String, + hash: Block::Hash, + function: String, + call_parameters: String, + _network_config: Option, + ) -> SubscriptionResult { + let call_parameters = Bytes::from(parse_hex_param(&mut sink, call_parameters)?); + + let client = self.client.clone(); + let subscriptions = self.subscriptions.clone(); + + let fut = async move { + let Some(handle) = subscriptions.get_subscription(&follow_subscription) else { + // Invalid invalid subscription ID. + let _ = sink.send(&ChainHeadEvent::::Disjoint); + return + }; + + // Block is not part of the subscription. + if !handle.contains_block(&hash) { + let _ = sink.reject(ChainHeadRpcError::InvalidBlock); + return + } + + // Reject subscription if runtime_updates is false. + if !handle.has_runtime_updates() { + let _ = sink.reject(ChainHeadRpcError::InvalidParam( + "The runtime updates flag must be set".into(), + )); + return + } + + let res = client + .executor() + .call( + &BlockId::Hash(hash), + &function, + &call_parameters, + client.execution_extensions().strategies().other, + ) + .map(|result| { + let result = format!("0x{:?}", HexDisplay::from(&result)); + ChainHeadEvent::Done(ChainHeadResult { result }) + }) + .unwrap_or_else(|error| { + ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }) + }); + + let _ = sink.send(&res); + }; + + self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(()) + } + + fn chain_head_unstable_unpin( + &self, + follow_subscription: String, + hash: Block::Hash, + ) -> RpcResult<()> { + let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else { + // Invalid invalid subscription ID. + return Ok(()) + }; + + if !handle.unpin_block(&hash) { + return Err(ChainHeadRpcError::InvalidBlock.into()) + } + + Ok(()) + } +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/error.rs new file mode 100644 index 0000000000..92f336ed4f --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/chain_head/error.rs @@ -0,0 +1,74 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Error helpers for `chainHead` RPC module. + +use jsonrpsee::{ + core::Error as RpcError, + types::error::{CallError, ErrorObject}, +}; +use sp_blockchain::Error as BlockchainError; + +/// ChainHead RPC errors. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// The provided block hash is invalid. + #[error("Invalid block hash")] + InvalidBlock, + /// Fetch block header error. + #[error("Could not fetch block header: {0}")] + FetchBlockHeader(BlockchainError), + /// Invalid parameter provided to the RPC method. + #[error("Invalid parameter: {0}")] + InvalidParam(String), + /// Invalid subscription ID provided by the RPC server. + #[error("Invalid subscription ID")] + InvalidSubscriptionID, +} + +// Base code for all `chainHead` errors. +const BASE_ERROR: i32 = 2000; +/// The provided block hash is invalid. +const INVALID_BLOCK_ERROR: i32 = BASE_ERROR + 1; +/// Fetch block header error. +const FETCH_BLOCK_HEADER_ERROR: i32 = BASE_ERROR + 2; +/// Invalid parameter error. +const INVALID_PARAM_ERROR: i32 = BASE_ERROR + 3; +/// Invalid subscription ID. +const INVALID_SUB_ID: i32 = BASE_ERROR + 4; + +impl From for ErrorObject<'static> { + fn from(e: Error) -> Self { + let msg = e.to_string(); + + match e { + Error::InvalidBlock => ErrorObject::owned(INVALID_BLOCK_ERROR, msg, None::<()>), + Error::FetchBlockHeader(_) => + ErrorObject::owned(FETCH_BLOCK_HEADER_ERROR, msg, None::<()>), + Error::InvalidParam(_) => ErrorObject::owned(INVALID_PARAM_ERROR, msg, None::<()>), + Error::InvalidSubscriptionID => ErrorObject::owned(INVALID_SUB_ID, msg, None::<()>), + } + .into() + } +} + +impl From for RpcError { + fn from(e: Error) -> Self { + CallError::Custom(e.into()).into() + } +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/event.rs b/substrate/client/rpc-spec-v2/src/chain_head/event.rs new file mode 100644 index 0000000000..25930cb6f9 --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/chain_head/event.rs @@ -0,0 +1,480 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! The chain head's event returned as json compatible object. + +use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; +use sp_api::ApiError; +use sp_version::RuntimeVersion; +use std::num::NonZeroUsize; + +/// The network config parameter is used when a function +/// needs to request the information from its peers. +/// +/// These values can be tweaked depending on the urgency of the JSON-RPC function call. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkConfig { + /// The total number of peers from which the information is requested. + total_attempts: u64, + /// The maximum number of requests to perform in parallel. + /// + /// # Note + /// + /// A zero value is illegal. + max_parallel: NonZeroUsize, + /// The time, in milliseconds, after which a single requests towards one peer + /// is considered unsuccessful. + timeout_ms: u64, +} + +/// The operation could not be processed due to an error. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ErrorEvent { + /// Reason of the error. + pub error: String, +} + +/// The runtime specification of the current block. +/// +/// This event is generated for: +/// - the first announced block by the follow subscription +/// - blocks that suffered a change in runtime compared with their parents +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RuntimeVersionEvent { + /// The runtime version. + pub spec: RuntimeVersion, +} + +/// The runtime event generated if the `follow` subscription +/// has set the `runtime_updates` flag. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type")] +pub enum RuntimeEvent { + /// The runtime version of this block. + Valid(RuntimeVersionEvent), + /// The runtime could not be obtained due to an error. + Invalid(ErrorEvent), +} + +impl From for RuntimeEvent { + fn from(err: ApiError) -> Self { + RuntimeEvent::Invalid(ErrorEvent { error: format!("Api error: {}", err) }) + } +} + +/// Contain information about the latest finalized block. +/// +/// # Note +/// +/// This is the first event generated by the `follow` subscription +/// and is submitted only once. +/// +/// If the `runtime_updates` flag is set, then this event contains +/// the `RuntimeEvent`, otherwise the `RuntimeEvent` is not present. +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Initialized { + /// The hash of the latest finalized block. + pub finalized_block_hash: Hash, + /// The runtime version of the finalized block. + /// + /// # Note + /// + /// This is present only if the `runtime_updates` flag is set for + /// the `follow` subscription. + pub finalized_block_runtime: Option, + /// Privately keep track if the `finalized_block_runtime` should be + /// serialized. + #[serde(default)] + pub(crate) runtime_updates: bool, +} + +impl Serialize for Initialized { + /// Custom serialize implementation to include the `RuntimeEvent` depending + /// on the internal `runtime_updates` flag. + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + if self.runtime_updates { + let mut state = serializer.serialize_struct("Initialized", 2)?; + state.serialize_field("finalizedBlockHash", &self.finalized_block_hash)?; + state.serialize_field("finalizedBlockRuntime", &self.finalized_block_runtime)?; + state.end() + } else { + let mut state = serializer.serialize_struct("Initialized", 1)?; + state.serialize_field("finalizedBlockHash", &self.finalized_block_hash)?; + state.end() + } + } +} + +/// Indicate a new non-finalized block. +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NewBlock { + /// The hash of the new block. + pub block_hash: Hash, + /// The parent hash of the new block. + pub parent_block_hash: Hash, + /// The runtime version of the new block. + /// + /// # Note + /// + /// This is present only if the `runtime_updates` flag is set for + /// the `follow` subscription. + pub new_runtime: Option, + /// Privately keep track if the `finalized_block_runtime` should be + /// serialized. + #[serde(default)] + pub(crate) runtime_updates: bool, +} + +impl Serialize for NewBlock { + /// Custom serialize implementation to include the `RuntimeEvent` depending + /// on the internal `runtime_updates` flag. + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + if self.runtime_updates { + let mut state = serializer.serialize_struct("NewBlock", 3)?; + state.serialize_field("blockHash", &self.block_hash)?; + state.serialize_field("parentBlockHash", &self.parent_block_hash)?; + state.serialize_field("newRuntime", &self.new_runtime)?; + state.end() + } else { + let mut state = serializer.serialize_struct("NewBlock", 2)?; + state.serialize_field("blockHash", &self.block_hash)?; + state.serialize_field("parentBlockHash", &self.parent_block_hash)?; + state.end() + } + } +} + +/// Indicate the block hash of the new best block. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BestBlockChanged { + /// The block hash of the new best block. + pub best_block_hash: Hash, +} + +/// Indicate the finalized and pruned block hashes. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Finalized { + /// Block hashes that are finalized. + pub finalized_block_hashes: Vec, + /// Block hashes that are pruned (removed). + pub pruned_block_hashes: Vec, +} + +/// The event generated by the `follow` method. +/// +/// The events are generated in the following order: +/// 1. Initialized - generated only once to signal the +/// latest finalized block +/// 2. NewBlock - a new block was added. +/// 3. BestBlockChanged - indicate that the best block +/// is now the one from this event. The block was +/// announced priorly with the `NewBlock` event. +/// 4. Finalized - State the finalized and pruned blocks. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "event")] +pub enum FollowEvent { + /// The latest finalized block. + /// + /// This event is generated only once. + Initialized(Initialized), + /// A new non-finalized block was added. + NewBlock(NewBlock), + /// The best block of the chain. + BestBlockChanged(BestBlockChanged), + /// A list of finalized and pruned blocks. + Finalized(Finalized), + /// The subscription is dropped and no further events + /// will be generated. + Stop, +} + +/// The result of a chain head method. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ChainHeadResult { + /// Result of the method. + pub result: T, +} + +/// The event generated by the body / call / storage methods. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "event")] +pub enum ChainHeadEvent { + /// The request completed successfully. + Done(ChainHeadResult), + /// The resources requested are inaccessible. + /// + /// Resubmitting the request later might succeed. + Inaccessible(ErrorEvent), + /// An error occurred. This is definitive. + Error(ErrorEvent), + /// The provided subscription ID is stale or invalid. + Disjoint, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn follow_initialized_event_no_updates() { + // Runtime flag is false. + let event: FollowEvent = FollowEvent::Initialized(Initialized { + finalized_block_hash: "0x1".into(), + finalized_block_runtime: None, + runtime_updates: false, + }); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = r#"{"event":"initialized","finalizedBlockHash":"0x1"}"#; + assert_eq!(ser, exp); + + let event_dec: FollowEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn follow_initialized_event_with_updates() { + // Runtime flag is true, block runtime must always be reported for this event. + let runtime = RuntimeVersion { + spec_name: "ABC".into(), + impl_name: "Impl".into(), + spec_version: 1, + ..Default::default() + }; + + let runtime_event = RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime }); + let mut initialized = Initialized { + finalized_block_hash: "0x1".into(), + finalized_block_runtime: Some(runtime_event), + runtime_updates: true, + }; + let event: FollowEvent = FollowEvent::Initialized(initialized.clone()); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = concat!( + r#"{"event":"initialized","finalizedBlockHash":"0x1","#, + r#""finalizedBlockRuntime":{"type":"valid","spec":{"specName":"ABC","implName":"Impl","authoringVersion":0,"#, + r#""specVersion":1,"implVersion":0,"apis":[],"transactionVersion":0,"stateVersion":0}}}"#, + ); + assert_eq!(ser, exp); + + let event_dec: FollowEvent = serde_json::from_str(exp).unwrap(); + // The `runtime_updates` field is used for serialization purposes. + initialized.runtime_updates = false; + assert!(matches!( + event_dec, FollowEvent::Initialized(ref dec) if dec == &initialized + )); + } + + #[test] + fn follow_new_block_event_no_updates() { + // Runtime flag is false. + let event: FollowEvent = FollowEvent::NewBlock(NewBlock { + block_hash: "0x1".into(), + parent_block_hash: "0x2".into(), + new_runtime: None, + runtime_updates: false, + }); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = r#"{"event":"newBlock","blockHash":"0x1","parentBlockHash":"0x2"}"#; + assert_eq!(ser, exp); + + let event_dec: FollowEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn follow_new_block_event_with_updates() { + // Runtime flag is true, block runtime must always be reported for this event. + let runtime = RuntimeVersion { + spec_name: "ABC".into(), + impl_name: "Impl".into(), + spec_version: 1, + ..Default::default() + }; + + let runtime_event = RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime }); + let mut new_block = NewBlock { + block_hash: "0x1".into(), + parent_block_hash: "0x2".into(), + new_runtime: Some(runtime_event), + runtime_updates: true, + }; + + let event: FollowEvent = FollowEvent::NewBlock(new_block.clone()); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = concat!( + r#"{"event":"newBlock","blockHash":"0x1","parentBlockHash":"0x2","#, + r#""newRuntime":{"type":"valid","spec":{"specName":"ABC","implName":"Impl","authoringVersion":0,"#, + r#""specVersion":1,"implVersion":0,"apis":[],"transactionVersion":0,"stateVersion":0}}}"#, + ); + assert_eq!(ser, exp); + + let event_dec: FollowEvent = serde_json::from_str(exp).unwrap(); + // The `runtime_updates` field is used for serialization purposes. + new_block.runtime_updates = false; + assert!(matches!( + event_dec, FollowEvent::NewBlock(ref dec) if dec == &new_block + )); + + // Runtime flag is true, runtime didn't change compared to parent. + let mut new_block = NewBlock { + block_hash: "0x1".into(), + parent_block_hash: "0x2".into(), + new_runtime: None, + runtime_updates: true, + }; + let event: FollowEvent = FollowEvent::NewBlock(new_block.clone()); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = + r#"{"event":"newBlock","blockHash":"0x1","parentBlockHash":"0x2","newRuntime":null}"#; + assert_eq!(ser, exp); + new_block.runtime_updates = false; + let event_dec: FollowEvent = serde_json::from_str(exp).unwrap(); + assert!(matches!( + event_dec, FollowEvent::NewBlock(ref dec) if dec == &new_block + )); + } + + #[test] + fn follow_best_block_changed_event() { + let event: FollowEvent = + FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: "0x1".into() }); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = r#"{"event":"bestBlockChanged","bestBlockHash":"0x1"}"#; + assert_eq!(ser, exp); + + let event_dec: FollowEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn follow_finalized_event() { + let event: FollowEvent = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec!["0x1".into()], + pruned_block_hashes: vec!["0x2".into()], + }); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = + r#"{"event":"finalized","finalizedBlockHashes":["0x1"],"prunedBlockHashes":["0x2"]}"#; + assert_eq!(ser, exp); + + let event_dec: FollowEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn follow_stop_event() { + let event: FollowEvent = FollowEvent::Stop; + + let ser = serde_json::to_string(&event).unwrap(); + let exp = r#"{"event":"stop"}"#; + assert_eq!(ser, exp); + + let event_dec: FollowEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn chain_head_done_event() { + let event: ChainHeadEvent = + ChainHeadEvent::Done(ChainHeadResult { result: "A".into() }); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = r#"{"event":"done","result":"A"}"#; + assert_eq!(ser, exp); + + let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn chain_head_inaccessible_event() { + let event: ChainHeadEvent = + ChainHeadEvent::Inaccessible(ErrorEvent { error: "A".into() }); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = r#"{"event":"inaccessible","error":"A"}"#; + assert_eq!(ser, exp); + + let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn chain_head_error_event() { + let event: ChainHeadEvent = ChainHeadEvent::Error(ErrorEvent { error: "A".into() }); + + let ser = serde_json::to_string(&event).unwrap(); + let exp = r#"{"event":"error","error":"A"}"#; + assert_eq!(ser, exp); + + let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn chain_head_disjoint_event() { + let event: ChainHeadEvent = ChainHeadEvent::Disjoint; + + let ser = serde_json::to_string(&event).unwrap(); + let exp = r#"{"event":"disjoint"}"#; + assert_eq!(ser, exp); + + let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn chain_head_network_config() { + let conf = NetworkConfig { + total_attempts: 1, + max_parallel: NonZeroUsize::new(2).expect("Non zero number; qed"), + timeout_ms: 3, + }; + + let ser = serde_json::to_string(&conf).unwrap(); + let exp = r#"{"totalAttempts":1,"maxParallel":2,"timeoutMs":3}"#; + assert_eq!(ser, exp); + + let conf_dec: NetworkConfig = serde_json::from_str(exp).unwrap(); + assert_eq!(conf_dec, conf); + } +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs new file mode 100644 index 0000000000..a25933b40f --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs @@ -0,0 +1,40 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Substrate chain head API. +//! +//! # Note +//! +//! Methods are prefixed by `chainHead`. + +#[cfg(test)] +mod tests; + +pub mod api; +pub mod chain_head; +pub mod error; +pub mod event; + +mod subscription; + +pub use api::ChainHeadApiServer; +pub use chain_head::ChainHead; +pub use event::{ + BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent, + Initialized, NetworkConfig, NewBlock, RuntimeEvent, RuntimeVersionEvent, +}; diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription.rs new file mode 100644 index 0000000000..033db45ca7 --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -0,0 +1,284 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Subscription management for tracking subscription IDs to pinned blocks. + +use futures::channel::oneshot; +use parking_lot::{RwLock, RwLockWriteGuard}; +use sp_runtime::traits::Block as BlockT; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + sync::Arc, +}; + +/// Subscription management error. +#[derive(Debug)] +pub enum SubscriptionManagementError { + /// The block cannot be pinned into memory because + /// the subscription has exceeded the maximum number + /// of blocks pinned. + ExceededLimits, + /// Custom error. + Custom(String), +} + +/// Inner subscription data structure. +struct SubscriptionInner { + /// The `runtime_updates` parameter flag of the subscription. + runtime_updates: bool, + /// Signals the "Stop" event. + tx_stop: Option>, + /// The blocks pinned. + blocks: HashSet, + /// The maximum number of pinned blocks allowed per subscription. + max_pinned_blocks: usize, +} + +/// Manage the blocks of a specific subscription ID. +#[derive(Clone)] +pub struct SubscriptionHandle { + inner: Arc>>, + /// The best reported block by this subscription. + /// Have this as a separate variable to easily share + /// the write guard with the RPC layer. + best_block: Arc>>, +} + +impl SubscriptionHandle { + /// Construct a new [`SubscriptionHandle`]. + fn new(runtime_updates: bool, tx_stop: oneshot::Sender<()>, max_pinned_blocks: usize) -> Self { + SubscriptionHandle { + inner: Arc::new(RwLock::new(SubscriptionInner { + runtime_updates, + tx_stop: Some(tx_stop), + blocks: HashSet::new(), + max_pinned_blocks, + })), + best_block: Arc::new(RwLock::new(None)), + } + } + + /// Trigger the stop event for the current subscription. + /// + /// This can happen on internal failure (ie, the pruning deleted the block from memory) + /// or if the user exceeded the amount of available pinned blocks. + pub fn stop(&self) { + let mut inner = self.inner.write(); + + if let Some(tx_stop) = inner.tx_stop.take() { + let _ = tx_stop.send(()); + } + } + + /// Pin a new block for the current subscription ID. + /// + /// Returns whether the value was newly inserted if the block can be pinned. + /// Otherwise, returns an error if the maximum number of blocks has been exceeded. + pub fn pin_block(&self, hash: Block::Hash) -> Result { + let mut inner = self.inner.write(); + + if inner.blocks.len() == inner.max_pinned_blocks { + // We have reached the limit. However, the block can be already inserted. + if inner.blocks.contains(&hash) { + return Ok(false) + } else { + return Err(SubscriptionManagementError::ExceededLimits) + } + } + + Ok(inner.blocks.insert(hash)) + } + + /// Unpin a new block for the current subscription ID. + /// + /// Returns whether the value was present in the set. + pub fn unpin_block(&self, hash: &Block::Hash) -> bool { + let mut inner = self.inner.write(); + inner.blocks.remove(hash) + } + + /// Check if the block hash is present for the provided subscription ID. + /// + /// Returns `true` if the set contains the block. + pub fn contains_block(&self, hash: &Block::Hash) -> bool { + let inner = self.inner.read(); + inner.blocks.contains(hash) + } + + /// Get the `runtime_updates` flag of this subscription. + pub fn has_runtime_updates(&self) -> bool { + let inner = self.inner.read(); + inner.runtime_updates + } + + /// Get the write guard of the best reported block. + pub fn best_block_write(&self) -> RwLockWriteGuard<'_, Option> { + self.best_block.write() + } +} + +/// Manage block pinning / unpinning for subscription IDs. +pub struct SubscriptionManagement { + /// Manage subscription by mapping the subscription ID + /// to a set of block hashes. + inner: RwLock>>, +} + +impl SubscriptionManagement { + /// Construct a new [`SubscriptionManagement`]. + pub fn new() -> Self { + SubscriptionManagement { inner: RwLock::new(HashMap::new()) } + } + + /// Insert a new subscription ID. + /// + /// If the subscription was not previously inserted, the method returns a tuple of + /// the receiver that is triggered upon the "Stop" event and the subscription + /// handle. Otherwise, when the subscription ID was already inserted returns none. + pub fn insert_subscription( + &self, + subscription_id: String, + runtime_updates: bool, + max_pinned_blocks: usize, + ) -> Option<(oneshot::Receiver<()>, SubscriptionHandle)> { + let mut subs = self.inner.write(); + + if let Entry::Vacant(entry) = subs.entry(subscription_id) { + let (tx_stop, rx_stop) = oneshot::channel(); + let handle = + SubscriptionHandle::::new(runtime_updates, tx_stop, max_pinned_blocks); + entry.insert(handle.clone()); + Some((rx_stop, handle)) + } else { + None + } + } + + /// Remove the subscription ID with associated pinned blocks. + pub fn remove_subscription(&self, subscription_id: &String) { + let mut subs = self.inner.write(); + subs.remove(subscription_id); + } + + /// Obtain the specific subscription handle. + pub fn get_subscription(&self, subscription_id: &String) -> Option> { + let subs = self.inner.write(); + subs.get(subscription_id).and_then(|handle| Some(handle.clone())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sp_core::H256; + use substrate_test_runtime_client::runtime::Block; + + #[test] + fn subscription_check_id() { + let subs = SubscriptionManagement::::new(); + + let id = "abc".to_string(); + let hash = H256::random(); + + let handle = subs.get_subscription(&id); + assert!(handle.is_none()); + + let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); + assert!(!handle.contains_block(&hash)); + + subs.remove_subscription(&id); + + let handle = subs.get_subscription(&id); + assert!(handle.is_none()); + } + + #[test] + fn subscription_check_block() { + let subs = SubscriptionManagement::::new(); + + let id = "abc".to_string(); + let hash = H256::random(); + + // Check with subscription. + let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); + assert!(!handle.contains_block(&hash)); + assert!(!handle.unpin_block(&hash)); + + handle.pin_block(hash).unwrap(); + assert!(handle.contains_block(&hash)); + // Unpin an invalid block. + assert!(!handle.unpin_block(&H256::random())); + + // Unpin the valid block. + assert!(handle.unpin_block(&hash)); + assert!(!handle.contains_block(&hash)); + } + + #[test] + fn subscription_check_stop_event() { + let subs = SubscriptionManagement::::new(); + + let id = "abc".to_string(); + + // Check with subscription. + let (mut rx_stop, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); + + // Check the stop signal was not received. + let res = rx_stop.try_recv().unwrap(); + assert!(res.is_none()); + + // Inserting a second time returns None. + let res = subs.insert_subscription(id.clone(), false, 10); + assert!(res.is_none()); + + handle.stop(); + + // Check the signal was received. + let res = rx_stop.try_recv().unwrap(); + assert!(res.is_some()); + } + + #[test] + fn subscription_check_data() { + let subs = SubscriptionManagement::::new(); + + let id = "abc".to_string(); + let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); + assert!(!handle.has_runtime_updates()); + + let id2 = "abcd".to_string(); + let (_, handle) = subs.insert_subscription(id2.clone(), true, 10).unwrap(); + assert!(handle.has_runtime_updates()); + } + + #[test] + fn subscription_check_max_pinned() { + let subs = SubscriptionManagement::::new(); + + let id = "abc".to_string(); + let hash = H256::random(); + let hash_2 = H256::random(); + let (_, handle) = subs.insert_subscription(id.clone(), false, 1).unwrap(); + + handle.pin_block(hash).unwrap(); + // The same block can be pinned multiple times. + handle.pin_block(hash).unwrap(); + // Exceeded number of pinned blocks. + handle.pin_block(hash_2).unwrap_err(); + } +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs new file mode 100644 index 0000000000..4084075f0b --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -0,0 +1,1022 @@ +use super::*; +use assert_matches::assert_matches; +use codec::{Decode, Encode}; +use jsonrpsee::{ + core::{error::Error, server::rpc_module::Subscription as RpcSubscription}, + types::{error::CallError, EmptyServerParams as EmptyParams}, + RpcModule, +}; +use sc_block_builder::BlockBuilderProvider; +use sc_client_api::ChildInfo; +use sp_api::BlockId; +use sp_blockchain::HeaderBackend; +use sp_consensus::BlockOrigin; +use sp_core::{ + hexdisplay::HexDisplay, + storage::well_known_keys::{self, CODE}, + testing::TaskExecutor, +}; +use sp_version::RuntimeVersion; +use std::sync::Arc; +use substrate_test_runtime::Transfer; +use substrate_test_runtime_client::{ + prelude::*, runtime, Backend, BlockBuilderExt, Client, ClientBlockImportExt, +}; + +type Header = substrate_test_runtime_client::runtime::Header; +type Block = substrate_test_runtime_client::runtime::Block; +const MAX_PINNED_BLOCKS: usize = 32; +const CHAIN_GENESIS: [u8; 32] = [0; 32]; +const INVALID_HASH: [u8; 32] = [1; 32]; +const KEY: &[u8] = b":mock"; +const VALUE: &[u8] = b"hello world"; +const CHILD_STORAGE_KEY: &[u8] = b"child"; +const CHILD_VALUE: &[u8] = b"child value"; + +async fn get_next_event(sub: &mut RpcSubscription) -> T { + let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(1), sub.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + event +} + +async fn setup_api() -> ( + Arc>, + RpcModule>>, + RpcSubscription, + String, + Block, +) { + let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); + let builder = TestClientBuilder::new().add_extra_child_storage( + &child_info, + KEY.to_vec(), + CHILD_VALUE.to_vec(), + ); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + (client, api, sub, sub_id, block) +} + +#[tokio::test] +async fn follow_subscription_produces_blocks() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + let finalized_hash = client.info().finalized_hash; + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + + // Initialized must always be reported first. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Initialized(Initialized { + finalized_block_hash: format!("{:?}", finalized_hash), + finalized_block_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + + let best_hash = block.header.hash(); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", best_hash), + parent_block_hash: format!("{:?}", finalized_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", best_hash), + }); + assert_eq!(event, expected); + + client.finalize_block(best_hash, None).unwrap(); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![format!("{:?}", best_hash)], + pruned_block_hashes: vec![], + }); + assert_eq!(event, expected); +} + +#[tokio::test] +async fn follow_with_runtime() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + let finalized_hash = client.info().finalized_hash; + let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + + // Initialized must always be reported first. + let event: FollowEvent = get_next_event(&mut sub).await; + + let runtime_str = "{\"specName\":\"test\",\"implName\":\"parity-test\",\"authoringVersion\":1,\ + \"specVersion\":2,\"implVersion\":2,\"apis\":[[\"0xdf6acb689907609b\",4],\ + [\"0x37e397fc7c91f5e4\",1],[\"0xd2bc9897eed08f15\",3],[\"0x40fe3ad401f8959a\",6],\ + [\"0xc6e9a76309f39b09\",1],[\"0xdd718d5cc53262d4\",1],[\"0xcbca25e39f142387\",2],\ + [\"0xf78b278be53f454c\",2],[\"0xab3c0572291feb8b\",1],[\"0xbc9d89904f5b923f\",1]],\ + \"transactionVersion\":1,\"stateVersion\":1}"; + let runtime: RuntimeVersion = serde_json::from_str(runtime_str).unwrap(); + + let finalized_block_runtime = + Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime.clone() })); + // Runtime must always be reported with the first event. + let expected = FollowEvent::Initialized(Initialized { + finalized_block_hash: format!("{:?}", finalized_hash), + finalized_block_runtime, + runtime_updates: false, + }); + assert_eq!(event, expected); + + // Import a new block without runtime changes. + // The runtime field must be None in this case. + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let best_hash = block.header.hash(); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", best_hash), + parent_block_hash: format!("{:?}", finalized_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", best_hash), + }); + assert_eq!(event, expected); + + client.finalize_block(best_hash, None).unwrap(); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![format!("{:?}", best_hash)], + pruned_block_hashes: vec![], + }); + assert_eq!(event, expected); + + let finalized_hash = best_hash; + // The `RuntimeVersion` is embedded into the WASM blob at the `runtime_version` + // section. Modify the `RuntimeVersion` and commit the changes to a new block. + // The RPC must notify the runtime event change. + let wasm = sp_maybe_compressed_blob::decompress( + runtime::wasm_binary_unwrap(), + sp_maybe_compressed_blob::CODE_BLOB_BOMB_LIMIT, + ) + .unwrap(); + // Update the runtime spec version. + let mut runtime = runtime; + runtime.spec_version += 1; + let embedded = sp_version::embed::embed_runtime_version(&wasm, runtime.clone()).unwrap(); + let wasm = sp_maybe_compressed_blob::compress( + &embedded, + sp_maybe_compressed_blob::CODE_BLOB_BOMB_LIMIT, + ) + .unwrap(); + + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(CODE.to_vec(), Some(wasm)).unwrap(); + let block = builder.build().unwrap().block; + let best_hash = block.header.hash(); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + let new_runtime = Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime.clone() })); + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", best_hash), + parent_block_hash: format!("{:?}", finalized_hash), + new_runtime, + runtime_updates: false, + }); + assert_eq!(event, expected); +} + +#[tokio::test] +async fn get_genesis() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + let genesis: String = + api.call("chainHead_unstable_genesisHash", EmptyParams::new()).await.unwrap(); + assert_eq!(genesis, format!("0x{}", HexDisplay::from(&CHAIN_GENESIS))); +} + +#[tokio::test] +async fn get_header() { + let (_client, api, _sub, sub_id, block) = setup_api().await; + let block_hash = format!("{:?}", block.header.hash()); + let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH)); + + // Invalid subscription ID must produce no results. + let res: Option = api + .call("chainHead_unstable_header", ["invalid_sub_id", &invalid_hash]) + .await + .unwrap(); + assert!(res.is_none()); + + // Valid subscription with invalid block hash will error. + let err = api + .call::<_, serde_json::Value>("chainHead_unstable_header", [&sub_id, &invalid_hash]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + // Obtain the valid header. + let res: String = api.call("chainHead_unstable_header", [&sub_id, &block_hash]).await.unwrap(); + let bytes = array_bytes::hex2bytes(&res).unwrap(); + let header: Header = Decode::decode(&mut &bytes[..]).unwrap(); + assert_eq!(header, block.header); +} + +#[tokio::test] +async fn get_body() { + let (mut client, api, mut block_sub, sub_id, block) = setup_api().await; + let block_hash = format!("{:?}", block.header.hash()); + let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH)); + + // Subscription ID is stale the disjoint event is emitted. + let mut sub = api + .subscribe("chainHead_unstable_body", ["invalid_sub_id", &invalid_hash]) + .await + .unwrap(); + let event: ChainHeadEvent = get_next_event(&mut sub).await; + assert_eq!(event, ChainHeadEvent::::Disjoint); + + // Valid subscription ID with invalid block hash will error. + let err = api + .subscribe("chainHead_unstable_body", [&sub_id, &invalid_hash]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + // Obtain valid the body (list of extrinsics). + let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); + let event: ChainHeadEvent = get_next_event(&mut sub).await; + // Block contains no extrinsics. + assert_matches!(event, + ChainHeadEvent::Done(done) if done.result == "0x00" + ); + + // Import a block with extrinsics. + let mut builder = client.new_block(Default::default()).unwrap(); + builder + .push_transfer(runtime::Transfer { + from: AccountKeyring::Alice.into(), + to: AccountKeyring::Ferdie.into(), + amount: 42, + nonce: 0, + }) + .unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::BestBlockChanged(_) + ); + + let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); + let event: ChainHeadEvent = get_next_event(&mut sub).await; + // Hex encoded scale encoded string for the vector of extrinsics. + let expected = format!("0x{:?}", HexDisplay::from(&block.extrinsics.encode())); + assert_matches!(event, + ChainHeadEvent::Done(done) if done.result == expected + ); +} + +#[tokio::test] +async fn call_runtime() { + let (_client, api, _sub, sub_id, block) = setup_api().await; + let block_hash = format!("{:?}", block.header.hash()); + let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH)); + + // Subscription ID is stale the disjoint event is emitted. + let mut sub = api + .subscribe( + "chainHead_unstable_call", + ["invalid_sub_id", &block_hash, "BabeApi_current_epoch", "0x00"], + ) + .await + .unwrap(); + let event: ChainHeadEvent = get_next_event(&mut sub).await; + assert_eq!(event, ChainHeadEvent::::Disjoint); + + // Valid subscription ID with invalid block hash will error. + let err = api + .subscribe( + "chainHead_unstable_call", + [&sub_id, &invalid_hash, "BabeApi_current_epoch", "0x00"], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + // Pass an invalid parameters that cannot be decode. + let err = api + .subscribe( + "chainHead_unstable_call", + [&sub_id, &block_hash, "BabeApi_current_epoch", "0x0"], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2003 && err.message().contains("Invalid parameter") + ); + + let alice_id = AccountKeyring::Alice.to_account_id(); + // Hex encoded scale encoded bytes representing the call parameters. + let call_parameters = format!("0x{:?}", HexDisplay::from(&alice_id.encode())); + let mut sub = api + .subscribe( + "chainHead_unstable_call", + [&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters], + ) + .await + .unwrap(); + + assert_matches!( + get_next_event::>(&mut sub).await, + ChainHeadEvent::Done(done) if done.result == "0x0000000000000000" + ); + + // The `current_epoch` takes no parameters and not draining the input buffer + // will cause the execution to fail. + let mut sub = api + .subscribe( + "chainHead_unstable_call", + [&sub_id, &block_hash, "BabeApi_current_epoch", "0x00"], + ) + .await + .unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + ChainHeadEvent::Error(event) if event.error.contains("Execution failed") + ); +} + +#[tokio::test] +async fn call_runtime_without_flag() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Valid runtime call on a subscription started with `runtime_updates` false. + let alice_id = AccountKeyring::Alice.to_account_id(); + let call_parameters = format!("0x{:?}", HexDisplay::from(&alice_id.encode())); + let err = api + .subscribe( + "chainHead_unstable_call", + [&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters], + ) + .await + .unwrap_err(); + + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2003 && err.message().contains("The runtime updates flag must be set") + ); +} + +#[tokio::test] +async fn get_storage() { + let (mut client, api, mut block_sub, sub_id, block) = setup_api().await; + let block_hash = format!("{:?}", block.header.hash()); + let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH)); + let key = format!("0x{:?}", HexDisplay::from(&KEY)); + + // Subscription ID is stale the disjoint event is emitted. + let mut sub = api + .subscribe("chainHead_unstable_storage", ["invalid_sub_id", &invalid_hash, &key]) + .await + .unwrap(); + let event: ChainHeadEvent = get_next_event(&mut sub).await; + assert_eq!(event, ChainHeadEvent::::Disjoint); + + // Valid subscription ID with invalid block hash will error. + let err = api + .subscribe("chainHead_unstable_storage", [&sub_id, &invalid_hash, &key]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + // Valid call without storage at the key. + let mut sub = api + .subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &key]) + .await + .unwrap(); + let event: ChainHeadEvent> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadEvent::>::Done(done) if done.result.is_none()); + + // Import a new block with storage changes. + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(KEY.to_vec(), Some(VALUE.to_vec())).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Valid call with storage at the key. + let expected_value = Some(format!("0x{:?}", HexDisplay::from(&VALUE))); + let mut sub = api + .subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &key]) + .await + .unwrap(); + let event: ChainHeadEvent> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadEvent::>::Done(done) if done.result == expected_value); + + // Child value set in `setup_api`. + let child_info = format!("0x{:?}", HexDisplay::from(b"child")); + let genesis_hash = format!("{:?}", client.genesis_hash()); + let expected_value = Some(format!("0x{:?}", HexDisplay::from(&CHILD_VALUE))); + let mut sub = api + .subscribe("chainHead_unstable_storage", [&sub_id, &genesis_hash, &key, &child_info]) + .await + .unwrap(); + let event: ChainHeadEvent> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadEvent::>::Done(done) if done.result == expected_value); +} + +#[tokio::test] +async fn get_storage_wrong_key() { + let (mut _client, api, mut _block_sub, sub_id, block) = setup_api().await; + let block_hash = format!("{:?}", block.header.hash()); + let key = format!("0x{:?}", HexDisplay::from(&KEY)); + + // Key is prefixed by CHILD_STORAGE_KEY_PREFIX. + let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec(); + prefixed_key.extend_from_slice(&KEY); + let prefixed_key = format!("0x{:?}", HexDisplay::from(&prefixed_key)); + let mut sub = api + .subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &prefixed_key]) + .await + .unwrap(); + let event: ChainHeadEvent> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadEvent::>::Done(done) if done.result.is_none()); + + // Key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX. + let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec(); + prefixed_key.extend_from_slice(&KEY); + let prefixed_key = format!("0x{:?}", HexDisplay::from(&prefixed_key)); + let mut sub = api + .subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &prefixed_key]) + .await + .unwrap(); + let event: ChainHeadEvent> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadEvent::>::Done(done) if done.result.is_none()); + + // Child key is prefixed by CHILD_STORAGE_KEY_PREFIX. + let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec(); + prefixed_key.extend_from_slice(b"child"); + let prefixed_key = format!("0x{:?}", HexDisplay::from(&prefixed_key)); + let mut sub = api + .subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &key, &prefixed_key]) + .await + .unwrap(); + let event: ChainHeadEvent> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadEvent::>::Done(done) if done.result.is_none()); + + // Child key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX. + let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec(); + prefixed_key.extend_from_slice(b"child"); + let prefixed_key = format!("0x{:?}", HexDisplay::from(&prefixed_key)); + let mut sub = api + .subscribe("chainHead_unstable_storage", [&sub_id, &block_hash, &key, &prefixed_key]) + .await + .unwrap(); + let event: ChainHeadEvent> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadEvent::>::Done(done) if done.result.is_none()); +} + +#[tokio::test] +async fn follow_generates_initial_blocks() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + let finalized_hash = client.info().finalized_hash; + + // Block tree: + // finalized -> block 1 -> block 2 -> block 4 + // -> block 1 -> block 3 + let block_1 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_1_hash = block_1.header.hash(); + client.import(BlockOrigin::Own, block_1.clone()).await.unwrap(); + + let block_2 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_2_hash = block_2.header.hash(); + client.import(BlockOrigin::Own, block_2.clone()).await.unwrap(); + + let mut block_builder = client + .new_block_at(&BlockId::Hash(block_1.header.hash()), Default::default(), false) + .unwrap(); + // This push is required as otherwise block 3 has the same hash as block 2 and won't get + // imported + block_builder + .push_transfer(Transfer { + from: AccountKeyring::Alice.into(), + to: AccountKeyring::Ferdie.into(), + amount: 41, + nonce: 0, + }) + .unwrap(); + let block_3 = block_builder.build().unwrap().block; + let block_3_hash = block_3.header.hash(); + client.import(BlockOrigin::Own, block_3.clone()).await.unwrap(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + + // Initialized must always be reported first. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Initialized(Initialized { + finalized_block_hash: format!("{:?}", finalized_hash), + finalized_block_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + // Check block 1. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_1_hash), + parent_block_hash: format!("{:?}", finalized_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + // Check block 2. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_2_hash), + parent_block_hash: format!("{:?}", block_1_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + // Check block 3. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_3_hash), + parent_block_hash: format!("{:?}", block_1_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_2_hash), + }); + assert_eq!(event, expected); + + // Import block 4. + let block_4 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_4_hash = block_4.header.hash(); + client.import(BlockOrigin::Own, block_4.clone()).await.unwrap(); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_4_hash), + parent_block_hash: format!("{:?}", block_2_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_4_hash), + }); + assert_eq!(event, expected); + + // Check the finalized event: + // - blocks 1, 2, 4 from canonical chain are finalized + // - block 3 from the fork is pruned + client.finalize_block(block_4_hash, None).unwrap(); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![ + format!("{:?}", block_1_hash), + format!("{:?}", block_2_hash), + format!("{:?}", block_4_hash), + ], + pruned_block_hashes: vec![format!("{:?}", block_3_hash)], + }); + assert_eq!(event, expected); +} + +#[tokio::test] +async fn follow_exceeding_pinned_blocks() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + 2, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Block tree: + // finalized_block -> block -> block2 + // The first 2 blocks are pinned into the subscription, but the block2 will exceed the limit (2 + // blocks). + let block2 = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block2.clone()).await.unwrap(); + + assert_matches!(get_next_event::>(&mut sub).await, FollowEvent::Stop); + + // Subscription will not produce any more event for further blocks. + let block3 = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block3.clone()).await.unwrap(); + + assert!(sub.next::>().await.is_none()); +} + +#[tokio::test] +async fn follow_with_unpin() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + 2, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Unpin an invalid subscription ID must return Ok(()). + let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH)); + let _res: () = api + .call("chainHead_unstable_unpin", ["invalid_sub_id", &invalid_hash]) + .await + .unwrap(); + + // Valid subscription with invalid block hash. + let invalid_hash = format!("0x{:?}", HexDisplay::from(&INVALID_HASH)); + let err = api + .call::<_, serde_json::Value>("chainHead_unstable_unpin", [&sub_id, &invalid_hash]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + // To not exceed the number of pinned blocks, we need to unpin before the next import. + let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap(); + + // Block tree: + // finalized_block -> block -> block2 + // ^ has been unpinned + let block2 = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block2.clone()).await.unwrap(); + + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + let block3 = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block3.clone()).await.unwrap(); + + assert_matches!(get_next_event::>(&mut sub).await, FollowEvent::Stop); + assert!(sub.next::>().await.is_none()); +} + +#[tokio::test] +async fn follow_prune_best_block() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + let finalized_hash = client.info().finalized_hash; + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + + // Initialized must always be reported first. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Initialized(Initialized { + finalized_block_hash: format!("{:?}", finalized_hash), + finalized_block_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + // Block tree: + // + // finalized -> block 1 -> block 2 + // ^^^ best block reported + // + // -> block 1 -> block 3 -> block 4 + // ^^^ finalized + // + // The block 4 is needed on the longest chain because we want the + // best block 2 to be reported as pruned. Pruning is happening at + // height (N - 1), where N is the finalized block number. + + let block_1 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_1_hash = block_1.header.hash(); + client.import(BlockOrigin::Own, block_1.clone()).await.unwrap(); + + let block_3 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_3_hash = block_3.header.hash(); + client.import(BlockOrigin::Own, block_3.clone()).await.unwrap(); + + let block_4 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_4_hash = block_4.header.hash(); + client.import(BlockOrigin::Own, block_4.clone()).await.unwrap(); + + // Import block 2 as best on the fork. + let mut block_builder = client + .new_block_at(&BlockId::Hash(block_1.header.hash()), Default::default(), false) + .unwrap(); + // This push is required as otherwise block 3 has the same hash as block 2 and won't get + // imported + block_builder + .push_transfer(Transfer { + from: AccountKeyring::Alice.into(), + to: AccountKeyring::Ferdie.into(), + amount: 41, + nonce: 0, + }) + .unwrap(); + let block_2 = block_builder.build().unwrap().block; + let block_2_hash = block_2.header.hash(); + client.import_as_best(BlockOrigin::Own, block_2.clone()).await.unwrap(); + + // Check block 1. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_1_hash), + parent_block_hash: format!("{:?}", finalized_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_1_hash), + }); + assert_eq!(event, expected); + + // Check block 3. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_3_hash), + parent_block_hash: format!("{:?}", block_1_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_3_hash), + }); + assert_eq!(event, expected); + + // Check block 4. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_4_hash), + parent_block_hash: format!("{:?}", block_3_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_4_hash), + }); + assert_eq!(event, expected); + + // Check block 2, that we imported as custom best. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_2_hash), + parent_block_hash: format!("{:?}", block_1_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_2_hash), + }); + assert_eq!(event, expected); + + // Finalize the block 4 from the fork. + client.finalize_block(block_4_hash, None).unwrap(); + + // Expect to report the best block changed before the finalized event. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_4_hash), + }); + assert_eq!(event, expected); + + // Block 2 must be reported as pruned, even if it was the previous best. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![ + format!("{:?}", block_1_hash), + format!("{:?}", block_3_hash), + format!("{:?}", block_4_hash), + ], + pruned_block_hashes: vec![format!("{:?}", block_2_hash)], + }); + assert_eq!(event, expected); +} diff --git a/substrate/client/rpc-spec-v2/src/lib.rs b/substrate/client/rpc-spec-v2/src/lib.rs index f4b9d2f95b..5af7e3be26 100644 --- a/substrate/client/rpc-spec-v2/src/lib.rs +++ b/substrate/client/rpc-spec-v2/src/lib.rs @@ -23,6 +23,7 @@ #![warn(missing_docs)] #![deny(unused_crate_dependencies)] +pub mod chain_head; pub mod chain_spec; pub mod transaction; diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 1c8f4c7de7..64a5f70cc0 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -59,7 +59,7 @@ use sc_rpc::{ system::SystemApiServer, DenyUnsafe, SubscriptionTaskExecutor, }; -use sc_rpc_spec_v2::transaction::TransactionApiServer; +use sc_rpc_spec_v2::{chain_head::ChainHeadApiServer, transaction::TransactionApiServer}; use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO}; use sc_transaction_pool_api::MaintainedTransactionPool; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; @@ -549,7 +549,7 @@ where keystore.clone(), system_rpc_tx.clone(), &config, - backend.offchain_storage(), + backend.clone(), &*rpc_builder, ) }; @@ -643,7 +643,7 @@ fn gen_rpc_module( keystore: SyncCryptoStorePtr, system_rpc_tx: TracingUnboundedSender>, config: &Configuration, - offchain_storage: Option<>::OffchainStorage>, + backend: Arc, rpc_builder: &(dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result, Error>), ) -> Result, Error> where @@ -698,6 +698,19 @@ where ) .into_rpc(); + // Maximum pinned blocks per connection. + // This number is large enough to consider immediate blocks, + // but it will change to facilitate adequate limits for the pinning API. + const MAX_PINNED_BLOCKS: usize = 4096; + let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new( + client.clone(), + backend.clone(), + task_executor.clone(), + client.info().genesis_hash, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + let author = sc_rpc::author::Author::new( client.clone(), transaction_pool, @@ -709,7 +722,7 @@ where let system = sc_rpc::system::System::new(system_info, system_rpc_tx, deny_unsafe).into_rpc(); - if let Some(storage) = offchain_storage { + if let Some(storage) = backend.offchain_storage() { let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe).into_rpc(); rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?; @@ -717,6 +730,7 @@ where // Part of the RPC v2 spec. rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?; + rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?; // Part of the old RPC spec. rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;