rpc: Implement chainHead RPC API (#12544)

* rpc/chain_head: Add event structure for serialization

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Add tests for events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Add API trait for `chainHead`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Add RPC errors

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Manage subscription ID tracking for pinned blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Add tests for subscription management

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Constructor for the API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Placeholders for API implementation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Accept RPC subscription sink

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Generate the runtime API event

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Implement the `follow` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Implement the `body` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Implement the `header` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Implement the `storage` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Implement the `call` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Implement the `unpin` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update `Cargo.lock`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Implement `getGenesis` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Fix clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Parse params from hex string

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Constuct API with genesis hash

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Add the finalized block to reported tree route

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Export the API and events for better ergonomics

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Add test module with helper functions

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Test block events from the `follow` pubsub

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Test `genesisHash` getter

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Test `header` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Test `body` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Test calling into the runtime API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Test runtime for the `follow` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Add runtime code changes for `follow` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Remove space from rustdoc

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Use the `child_key` for storage queries

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Test `storage` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Test child trie query for `storage` method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Event serialization typo

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Remove subscription aliases

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Add `NetworkConfig` parameter

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Named parameters as camelCase if present

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Implement From<ApiError> for RuntimeEvents

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Handle pruning of the best block in finalization window

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Generate initial block events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Verify that initial in-memory blocks are reported

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Verify the finalized event with forks and pruned blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Fix clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Separate logic for generating initial events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Handle stopping a subscription ID

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Submit events until the "Stop" event is triggered

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Separate logic for handling new and finalized blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Extend subscription logic with subId handle

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Adjust to the new subscription mngmt API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Refuse RuntimeAPI calls without the runtime flag

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Verify RuntimeAPI calls without runtime flag

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Add best block per subscription

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Check storage keys for prefixes

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Check storage queries with invalid prefixes

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Allow maximum number of pinned blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Test the maximum number of pinned blocks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Adjust to origin/master and apply clippy

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* client/service: Enable the `chainHead` API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Stop subscription on client disconnect and add debug logs

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Fix sending `Stop` on subscription exit

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Check best block is descendent of latest finalized

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head/tests: Report events before pruning the best block

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Nonrecursive initial block generation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Generate initial events on subscription executor

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Reduce dev-dependencies for tokio

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Apply suggestions from code review

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

* rpc/chain_head: Accept empty parameters

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Use debug of `HexDisplay` for full format

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Enable subscription ID

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Use jsonrpsee 16.2 camelCase feature for paramaters

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Use `NonZeroUsize` for `NetworkConfig` param

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/chain_head: Rename `runtime_updates` to `has_runtime_updates`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
This commit is contained in:
Alexandru Vasile
2022-12-19 17:38:12 +02:00
committed by GitHub
parent b92aa3dbdc
commit 46932f2b47
11 changed files with 2885 additions and 9 deletions
@@ -0,0 +1,137 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#![allow(non_snake_case)]
//! API trait of the chain head.
use crate::chain_head::event::{ChainHeadEvent, FollowEvent, NetworkConfig};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
#[rpc(client, server)]
pub trait ChainHeadApi<Hash> {
/// Track the state of the head of the chain: the finalized, non-finalized, and best blocks.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "chainHead_unstable_follow",
unsubscribe = "chainHead_unstable_unfollow",
item = FollowEvent<Hash>,
)]
fn chain_head_unstable_follow(&self, runtime_updates: bool);
/// Retrieves the body (list of transactions) of a pinned block.
///
/// This method should be seen as a complement to `chainHead_unstable_follow`,
/// allowing the JSON-RPC client to retrieve more information about a block
/// that has been reported.
///
/// Use `archive_unstable_body` if instead you want to retrieve the body of an arbitrary block.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "chainHead_unstable_body",
unsubscribe = "chainHead_unstable_stopBody",
item = ChainHeadEvent<String>,
)]
fn chain_head_unstable_body(
&self,
follow_subscription: String,
hash: Hash,
network_config: Option<NetworkConfig>,
);
/// Retrieves the header of a pinned block.
///
/// This method should be seen as a complement to `chainHead_unstable_follow`,
/// allowing the JSON-RPC client to retrieve more information about a block
/// that has been reported.
///
/// Use `archive_unstable_header` if instead you want to retrieve the header of an arbitrary
/// block.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_header", blocking)]
fn chain_head_unstable_header(
&self,
follow_subscription: String,
hash: Hash,
) -> RpcResult<Option<String>>;
/// Get the chain's genesis hash.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_genesisHash", blocking)]
fn chain_head_unstable_genesis_hash(&self) -> RpcResult<String>;
/// Return a storage entry at a specific block's state.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "chainHead_unstable_storage",
unsubscribe = "chainHead_unstable_stopStorage",
item = ChainHeadEvent<String>,
)]
fn chain_head_unstable_storage(
&self,
follow_subscription: String,
hash: Hash,
key: String,
child_key: Option<String>,
network_config: Option<NetworkConfig>,
);
/// Call into the Runtime API at a specified block's state.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "chainHead_unstable_call",
unsubscribe = "chainHead_unstable_stopCall",
item = ChainHeadEvent<String>,
)]
fn chain_head_unstable_call(
&self,
follow_subscription: String,
hash: Hash,
function: String,
call_parameters: String,
network_config: Option<NetworkConfig>,
);
/// Unpin a block reported by the `follow` method.
///
/// Ongoing operations that require the provided block
/// will continue normally.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_unpin", blocking)]
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;
}
@@ -0,0 +1,783 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! API implementation for `chainHead`.
use crate::{
chain_head::{
api::ChainHeadApiServer,
error::Error as ChainHeadRpcError,
event::{
BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent,
Initialized, NetworkConfig, NewBlock, RuntimeEvent, RuntimeVersionEvent,
},
subscription::{SubscriptionHandle, SubscriptionManagement, SubscriptionManagementError},
},
SubscriptionTaskExecutor,
};
use codec::Encode;
use futures::{
channel::oneshot,
future::FutureExt,
stream::{self, Stream, StreamExt},
};
use futures_util::future::Either;
use jsonrpsee::{
core::{async_trait, RpcResult},
types::{SubscriptionEmptyError, SubscriptionId, SubscriptionResult},
SubscriptionSink,
};
use log::{debug, error};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, CallExecutor, ChildInfo,
ExecutorProvider, FinalityNotification, StorageKey, StorageProvider,
};
use serde::Serialize;
use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
};
use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, Bytes};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header},
};
use std::{marker::PhantomData, sync::Arc};
/// An API for chain head RPC calls.
pub struct ChainHead<BE, Block: BlockT, Client> {
/// Substrate client.
client: Arc<Client>,
/// Backend of the chain.
backend: Arc<BE>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// Keep track of the pinned blocks for each subscription.
subscriptions: Arc<SubscriptionManagement<Block>>,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of pinned blocks allowed per connection.
max_pinned_blocks: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
impl<BE, Block: BlockT, Client> ChainHead<BE, Block, Client> {
/// Create a new [`ChainHead`].
pub fn new<GenesisHash: AsRef<[u8]>>(
client: Arc<Client>,
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
genesis_hash: GenesisHash,
max_pinned_blocks: usize,
) -> Self {
let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref()));
Self {
client,
backend,
executor,
subscriptions: Arc::new(SubscriptionManagement::new()),
genesis_hash,
max_pinned_blocks,
_phantom: PhantomData,
}
}
/// Accept the subscription and return the subscription ID on success.
fn accept_subscription(
&self,
sink: &mut SubscriptionSink,
) -> Result<String, SubscriptionEmptyError> {
// The subscription must be accepted before it can provide a valid subscription ID.
sink.accept()?;
let Some(sub_id) = sink.subscription_id() else {
// This can only happen if the subscription was not accepted.
return Err(SubscriptionEmptyError)
};
// Get the string representation for the subscription.
let sub_id = match sub_id {
SubscriptionId::Num(num) => num.to_string(),
SubscriptionId::Str(id) => id.into_owned().into(),
};
Ok(sub_id)
}
}
/// Generate the initial events reported by the RPC `follow` method.
///
/// This includes the "Initialized" event followed by the in-memory
/// blocks via "NewBlock" and the "BestBlockChanged".
fn generate_initial_events<Block, BE, Client>(
client: &Arc<Client>,
backend: &Arc<BE>,
handle: &SubscriptionHandle<Block>,
runtime_updates: bool,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError>
where
Block: BlockT + 'static,
Block::Header: Unpin,
BE: Backend<Block> + 'static,
Client: HeaderBackend<Block> + CallApiAt<Block> + 'static,
{
// The initialized event is the first one sent.
let finalized_block_hash = client.info().finalized_hash;
handle.pin_block(finalized_block_hash)?;
let finalized_block_runtime = generate_runtime_event(
&client,
runtime_updates,
&BlockId::Hash(finalized_block_hash),
None,
);
let initialized_event = FollowEvent::Initialized(Initialized {
finalized_block_hash,
finalized_block_runtime,
runtime_updates,
});
let initial_blocks = get_initial_blocks(&backend, finalized_block_hash);
let mut in_memory_blocks = Vec::with_capacity(initial_blocks.len() + 1);
in_memory_blocks.push(initialized_event);
for (child, parent) in initial_blocks.into_iter() {
handle.pin_block(child)?;
let new_runtime = generate_runtime_event(
&client,
runtime_updates,
&BlockId::Hash(child),
Some(&BlockId::Hash(parent)),
);
let event = FollowEvent::NewBlock(NewBlock {
block_hash: child,
parent_block_hash: parent,
new_runtime,
runtime_updates,
});
in_memory_blocks.push(event);
}
// Generate a new best block event.
let best_block_hash = client.info().best_hash;
if best_block_hash != finalized_block_hash {
let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
in_memory_blocks.push(best_block);
};
Ok(in_memory_blocks)
}
/// Parse hex-encoded string parameter as raw bytes.
///
/// If the parsing fails, the subscription is rejected.
fn parse_hex_param(
sink: &mut SubscriptionSink,
param: String,
) -> Result<Vec<u8>, SubscriptionEmptyError> {
// Methods can accept empty parameters.
if param.is_empty() {
return Ok(Default::default())
}
match array_bytes::hex2bytes(&param) {
Ok(bytes) => Ok(bytes),
Err(_) => {
let _ = sink.reject(ChainHeadRpcError::InvalidParam(param));
Err(SubscriptionEmptyError)
},
}
}
/// Conditionally generate the runtime event of the given block.
fn generate_runtime_event<Client, Block>(
client: &Arc<Client>,
runtime_updates: bool,
block: &BlockId<Block>,
parent: Option<&BlockId<Block>>,
) -> Option<RuntimeEvent>
where
Block: BlockT + 'static,
Client: CallApiAt<Block> + 'static,
{
// No runtime versions should be reported.
if !runtime_updates {
return None
}
let block_rt = match client.runtime_version_at(block) {
Ok(rt) => rt,
Err(err) => return Some(err.into()),
};
let parent = match parent {
Some(parent) => parent,
// Nothing to compare against, always report.
None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt })),
};
let parent_rt = match client.runtime_version_at(parent) {
Ok(rt) => rt,
Err(err) => return Some(err.into()),
};
// Report the runtime version change.
if block_rt != parent_rt {
Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt }))
} else {
None
}
}
/// Get the in-memory blocks of the client, starting from the provided finalized hash.
///
/// Returns a tuple of block hash with parent hash.
fn get_initial_blocks<BE, Block>(
backend: &Arc<BE>,
parent_hash: Block::Hash,
) -> Vec<(Block::Hash, Block::Hash)>
where
Block: BlockT + 'static,
BE: Backend<Block> + 'static,
{
let mut result = Vec::new();
let mut next_hash = Vec::new();
next_hash.push(parent_hash);
while let Some(parent_hash) = next_hash.pop() {
let Ok(blocks) = backend.blockchain().children(parent_hash) else {
continue
};
for child_hash in blocks {
result.push((child_hash, parent_hash));
next_hash.push(child_hash);
}
}
result
}
/// Submit the events from the provided stream to the RPC client
/// for as long as the `rx_stop` event was not called.
async fn submit_events<EventStream, T>(
sink: &mut SubscriptionSink,
mut stream: EventStream,
rx_stop: oneshot::Receiver<()>,
) where
EventStream: Stream<Item = T> + Unpin,
T: Serialize,
{
let mut stream_item = stream.next();
let mut stop_event = rx_stop;
while let Either::Left((Some(event), next_stop_event)) =
futures_util::future::select(stream_item, stop_event).await
{
match sink.send(&event) {
Ok(true) => {
stream_item = stream.next();
stop_event = next_stop_event;
},
// Client disconnected.
Ok(false) => return,
Err(_) => {
// Failed to submit event.
break
},
}
}
let _ = sink.send(&FollowEvent::<String>::Stop);
}
/// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for
/// every notification.
fn handle_import_blocks<Client, Block>(
client: &Arc<Client>,
handle: &SubscriptionHandle<Block>,
runtime_updates: bool,
notification: BlockImportNotification<Block>,
) -> Result<(FollowEvent<Block::Hash>, Option<FollowEvent<Block::Hash>>), SubscriptionManagementError>
where
Block: BlockT + 'static,
Client: CallApiAt<Block> + 'static,
{
handle.pin_block(notification.hash)?;
let new_runtime = generate_runtime_event(
&client,
runtime_updates,
&BlockId::Hash(notification.hash),
Some(&BlockId::Hash(*notification.header.parent_hash())),
);
// Note: `Block::Hash` will serialize to hexadecimal encoded string.
let new_block = FollowEvent::NewBlock(NewBlock {
block_hash: notification.hash,
parent_block_hash: *notification.header.parent_hash(),
new_runtime,
runtime_updates,
});
if !notification.is_new_best {
return Ok((new_block, None))
}
// If this is the new best block, then we need to generate two events.
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: notification.hash });
let mut best_block_cache = handle.best_block_write();
match *best_block_cache {
Some(block_cache) => {
// The RPC layer has not reported this block as best before.
// Note: This handles the race with the finalized branch.
if block_cache != notification.hash {
*best_block_cache = Some(notification.hash);
Ok((new_block, Some(best_block_event)))
} else {
Ok((new_block, None))
}
},
None => {
*best_block_cache = Some(notification.hash);
Ok((new_block, Some(best_block_event)))
},
}
}
/// Generate the "Finalized" event and potentially the "BestBlockChanged" for
/// every notification.
fn handle_finalized_blocks<Client, Block>(
client: &Arc<Client>,
handle: &SubscriptionHandle<Block>,
notification: FinalityNotification<Block>,
) -> Result<(FollowEvent<Block::Hash>, Option<FollowEvent<Block::Hash>>), SubscriptionManagementError>
where
Block: BlockT + 'static,
Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = BlockChainError> + 'static,
{
let last_finalized = notification.hash;
// We might not receive all new blocks reports, also pin the block here.
handle.pin_block(last_finalized)?;
// The tree route contains the exclusive path from the last finalized block
// to the block reported by the notification. Ensure the finalized block is
// properly reported to that path.
let mut finalized_block_hashes = notification.tree_route.iter().cloned().collect::<Vec<_>>();
finalized_block_hashes.push(last_finalized);
let pruned_block_hashes: Vec<_> = notification.stale_heads.iter().cloned().collect();
let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
pruned_block_hashes: pruned_block_hashes.clone(),
});
let mut best_block_cache = handle.best_block_write();
match *best_block_cache {
Some(block_cache) => {
// Check if the current best block is also reported as pruned.
let reported_pruned = pruned_block_hashes.iter().find(|&&hash| hash == block_cache);
if reported_pruned.is_none() {
return Ok((finalized_event, None))
}
// The best block is reported as pruned. Therefore, we need to signal a new
// best block event before submitting the finalized event.
let best_block_hash = client.info().best_hash;
if best_block_hash == block_cache {
// The client doest not have any new information about the best block.
// The information from `.info()` is updated from the DB as the last
// step of the finalization and it should be up to date. Also, the
// displaced nodes (list of nodes reported) should be reported with
// an offset of 32 blocks for substrate.
// If the info is outdated, there is nothing the RPC can do for now.
error!(target: "rpc-spec-v2", "Client does not contain different best block");
Ok((finalized_event, None))
} else {
let ancestor = sp_blockchain::lowest_common_ancestor(
&**client,
last_finalized,
best_block_hash,
)
.map_err(|_| {
SubscriptionManagementError::Custom("Could not find common ancestor".into())
})?;
// The client's best block must be a descendent of the last finalized block.
// In other words, the lowest common ancestor must be the last finalized block.
if ancestor.hash != last_finalized {
return Err(SubscriptionManagementError::Custom(
"The finalized block is not an ancestor of the best block".into(),
))
}
// The RPC needs to also submit a new best block changed before the
// finalized event.
*best_block_cache = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
Ok((finalized_event, Some(best_block_event)))
}
},
None => Ok((finalized_event, None)),
}
}
#[async_trait]
impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
where
Block: BlockT + 'static,
Block::Header: Unpin,
BE: Backend<Block> + 'static,
Client: BlockBackend<Block>
+ ExecutorProvider<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = BlockChainError>
+ BlockchainEvents<Block>
+ CallApiAt<Block>
+ StorageProvider<Block, BE>
+ 'static,
{
fn chain_head_unstable_follow(
&self,
mut sink: SubscriptionSink,
runtime_updates: bool,
) -> SubscriptionResult {
let sub_id = match self.accept_subscription(&mut sink) {
Ok(sub_id) => sub_id,
Err(err) => {
sink.close(ChainHeadRpcError::InvalidSubscriptionID);
return Err(err)
},
};
// Keep track of the subscription.
let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks) else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription already accepted", sub_id);
let _ = sink.send(&FollowEvent::<Block::Hash>::Stop);
return Ok(())
};
debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription accepted", sub_id);
let client = self.client.clone();
let handle = sub_handle.clone();
let subscription_id = sub_id.clone();
let stream_import = self
.client
.import_notification_stream()
.map(move |notification| {
match handle_import_blocks(&client, &handle, runtime_updates, notification) {
Ok((new_block, None)) => stream::iter(vec![new_block]),
Ok((new_block, Some(best_block))) => stream::iter(vec![new_block, best_block]),
Err(_) => {
debug!(target: "rpc-spec-v2", "[follow][id={:?}] Failed to handle block import notification.", subscription_id);
handle.stop();
stream::iter(vec![])
},
}
})
.flatten();
let client = self.client.clone();
let handle = sub_handle.clone();
let subscription_id = sub_id.clone();
let stream_finalized = self
.client
.finality_notification_stream()
.map(move |notification| {
match handle_finalized_blocks(&client, &handle, notification) {
Ok((finalized_event, None)) => stream::iter(vec![finalized_event]),
Ok((finalized_event, Some(best_block))) =>
stream::iter(vec![best_block, finalized_event]),
Err(_) => {
debug!(target: "rpc-spec-v2", "[follow][id={:?}] Failed to import finalized blocks", subscription_id);
handle.stop();
stream::iter(vec![])
},
}
})
.flatten();
let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
let subscriptions = self.subscriptions.clone();
let client = self.client.clone();
let backend = self.backend.clone();
let fut = async move {
let Ok(initial_events) = generate_initial_events(&client, &backend, &sub_handle, runtime_updates) else {
// Stop the subscription if we exceeded the maximum number of blocks pinned.
debug!(target: "rpc-spec-v2", "[follow][id={:?}] Exceeded max pinned blocks from initial events", sub_id);
let _ = sink.send(&FollowEvent::<Block::Hash>::Stop);
return
};
let stream = stream::iter(initial_events).chain(merged);
submit_events(&mut sink, stream.boxed(), rx_stop).await;
// The client disconnected or called the unsubscribe method.
subscriptions.remove_subscription(&sub_id);
debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription removed", sub_id);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
fn chain_head_unstable_body(
&self,
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
_network_config: Option<NetworkConfig>,
) -> SubscriptionResult {
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};
// Block is not part of the subscription.
if !handle.contains_block(&hash) {
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
let event = match client.block(&BlockId::Hash(hash)) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block.block.extrinsics();
let result = format!("0x{:?}", HexDisplay::from(&extrinsics.encode()));
ChainHeadEvent::Done(ChainHeadResult { result })
},
Ok(None) => {
// The block's body was pruned. This subscription ID has become invalid.
debug!(target: "rpc-spec-v2", "[body][id={:?}] Stopping subscription because hash={:?} was pruned", follow_subscription, hash);
handle.stop();
ChainHeadEvent::<String>::Disjoint
},
Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }),
};
let _ = sink.send(&event);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
fn chain_head_unstable_header(
&self,
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
return Ok(None)
};
// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Err(ChainHeadRpcError::InvalidBlock.into())
}
self.client
.header(BlockId::Hash(hash))
.map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode()))))
.map_err(ChainHeadRpcError::FetchBlockHeader)
.map_err(Into::into)
}
fn chain_head_unstable_genesis_hash(&self) -> RpcResult<String> {
Ok(self.genesis_hash.clone())
}
fn chain_head_unstable_storage(
&self,
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
key: String,
child_key: Option<String>,
_network_config: Option<NetworkConfig>,
) -> SubscriptionResult {
let key = StorageKey(parse_hex_param(&mut sink, key)?);
let child_key = child_key
.map(|child_key| parse_hex_param(&mut sink, child_key))
.transpose()?
.map(ChildInfo::new_default_from_vec);
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};
// Block is not part of the subscription.
if !handle.contains_block(&hash) {
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
// The child key is provided, use the key to query the child trie.
if let Some(child_key) = child_key {
// The child key must not be prefixed with ":child_storage:" nor
// ":child_storage:default:".
if well_known_keys::is_default_child_storage_key(child_key.storage_key()) ||
well_known_keys::is_child_storage_key(child_key.storage_key())
{
let _ = sink
.send(&ChainHeadEvent::Done(ChainHeadResult { result: None::<String> }));
return
}
let res = client
.child_storage(hash, &child_key, &key)
.map(|result| {
let result =
result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0)));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
return
}
// The main key must not be prefixed with b":child_storage:" nor
// b":child_storage:default:".
if well_known_keys::is_default_child_storage_key(&key.0) ||
well_known_keys::is_child_storage_key(&key.0)
{
let _ =
sink.send(&ChainHeadEvent::Done(ChainHeadResult { result: None::<String> }));
return
}
// Main root trie storage query.
let res = client
.storage(hash, &key)
.map(|result| {
let result =
result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0)));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
fn chain_head_unstable_call(
&self,
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
function: String,
call_parameters: String,
_network_config: Option<NetworkConfig>,
) -> SubscriptionResult {
let call_parameters = Bytes::from(parse_hex_param(&mut sink, call_parameters)?);
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};
// Block is not part of the subscription.
if !handle.contains_block(&hash) {
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
// Reject subscription if runtime_updates is false.
if !handle.has_runtime_updates() {
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"The runtime updates flag must be set".into(),
));
return
}
let res = client
.executor()
.call(
&BlockId::Hash(hash),
&function,
&call_parameters,
client.execution_extensions().strategies().other,
)
.map(|result| {
let result = format!("0x{:?}", HexDisplay::from(&result));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
fn chain_head_unstable_unpin(
&self,
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<()> {
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
return Ok(())
};
if !handle.unpin_block(&hash) {
return Err(ChainHeadRpcError::InvalidBlock.into())
}
Ok(())
}
}
@@ -0,0 +1,74 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Error helpers for `chainHead` RPC module.
use jsonrpsee::{
core::Error as RpcError,
types::error::{CallError, ErrorObject},
};
use sp_blockchain::Error as BlockchainError;
/// ChainHead RPC errors.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The provided block hash is invalid.
#[error("Invalid block hash")]
InvalidBlock,
/// Fetch block header error.
#[error("Could not fetch block header: {0}")]
FetchBlockHeader(BlockchainError),
/// Invalid parameter provided to the RPC method.
#[error("Invalid parameter: {0}")]
InvalidParam(String),
/// Invalid subscription ID provided by the RPC server.
#[error("Invalid subscription ID")]
InvalidSubscriptionID,
}
// Base code for all `chainHead` errors.
const BASE_ERROR: i32 = 2000;
/// The provided block hash is invalid.
const INVALID_BLOCK_ERROR: i32 = BASE_ERROR + 1;
/// Fetch block header error.
const FETCH_BLOCK_HEADER_ERROR: i32 = BASE_ERROR + 2;
/// Invalid parameter error.
const INVALID_PARAM_ERROR: i32 = BASE_ERROR + 3;
/// Invalid subscription ID.
const INVALID_SUB_ID: i32 = BASE_ERROR + 4;
impl From<Error> for ErrorObject<'static> {
fn from(e: Error) -> Self {
let msg = e.to_string();
match e {
Error::InvalidBlock => ErrorObject::owned(INVALID_BLOCK_ERROR, msg, None::<()>),
Error::FetchBlockHeader(_) =>
ErrorObject::owned(FETCH_BLOCK_HEADER_ERROR, msg, None::<()>),
Error::InvalidParam(_) => ErrorObject::owned(INVALID_PARAM_ERROR, msg, None::<()>),
Error::InvalidSubscriptionID => ErrorObject::owned(INVALID_SUB_ID, msg, None::<()>),
}
.into()
}
}
impl From<Error> for RpcError {
fn from(e: Error) -> Self {
CallError::Custom(e.into()).into()
}
}
@@ -0,0 +1,480 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! The chain head's event returned as json compatible object.
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
use sp_api::ApiError;
use sp_version::RuntimeVersion;
use std::num::NonZeroUsize;
/// The network config parameter is used when a function
/// needs to request the information from its peers.
///
/// These values can be tweaked depending on the urgency of the JSON-RPC function call.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkConfig {
/// The total number of peers from which the information is requested.
total_attempts: u64,
/// The maximum number of requests to perform in parallel.
///
/// # Note
///
/// A zero value is illegal.
max_parallel: NonZeroUsize,
/// The time, in milliseconds, after which a single requests towards one peer
/// is considered unsuccessful.
timeout_ms: u64,
}
/// The operation could not be processed due to an error.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ErrorEvent {
/// Reason of the error.
pub error: String,
}
/// The runtime specification of the current block.
///
/// This event is generated for:
/// - the first announced block by the follow subscription
/// - blocks that suffered a change in runtime compared with their parents
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RuntimeVersionEvent {
/// The runtime version.
pub spec: RuntimeVersion,
}
/// The runtime event generated if the `follow` subscription
/// has set the `runtime_updates` flag.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub enum RuntimeEvent {
/// The runtime version of this block.
Valid(RuntimeVersionEvent),
/// The runtime could not be obtained due to an error.
Invalid(ErrorEvent),
}
impl From<ApiError> for RuntimeEvent {
fn from(err: ApiError) -> Self {
RuntimeEvent::Invalid(ErrorEvent { error: format!("Api error: {}", err) })
}
}
/// Contain information about the latest finalized block.
///
/// # Note
///
/// This is the first event generated by the `follow` subscription
/// and is submitted only once.
///
/// If the `runtime_updates` flag is set, then this event contains
/// the `RuntimeEvent`, otherwise the `RuntimeEvent` is not present.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Initialized<Hash> {
/// The hash of the latest finalized block.
pub finalized_block_hash: Hash,
/// The runtime version of the finalized block.
///
/// # Note
///
/// This is present only if the `runtime_updates` flag is set for
/// the `follow` subscription.
pub finalized_block_runtime: Option<RuntimeEvent>,
/// Privately keep track if the `finalized_block_runtime` should be
/// serialized.
#[serde(default)]
pub(crate) runtime_updates: bool,
}
impl<Hash: Serialize> Serialize for Initialized<Hash> {
/// Custom serialize implementation to include the `RuntimeEvent` depending
/// on the internal `runtime_updates` flag.
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if self.runtime_updates {
let mut state = serializer.serialize_struct("Initialized", 2)?;
state.serialize_field("finalizedBlockHash", &self.finalized_block_hash)?;
state.serialize_field("finalizedBlockRuntime", &self.finalized_block_runtime)?;
state.end()
} else {
let mut state = serializer.serialize_struct("Initialized", 1)?;
state.serialize_field("finalizedBlockHash", &self.finalized_block_hash)?;
state.end()
}
}
}
/// Indicate a new non-finalized block.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NewBlock<Hash> {
/// The hash of the new block.
pub block_hash: Hash,
/// The parent hash of the new block.
pub parent_block_hash: Hash,
/// The runtime version of the new block.
///
/// # Note
///
/// This is present only if the `runtime_updates` flag is set for
/// the `follow` subscription.
pub new_runtime: Option<RuntimeEvent>,
/// Privately keep track if the `finalized_block_runtime` should be
/// serialized.
#[serde(default)]
pub(crate) runtime_updates: bool,
}
impl<Hash: Serialize> Serialize for NewBlock<Hash> {
/// Custom serialize implementation to include the `RuntimeEvent` depending
/// on the internal `runtime_updates` flag.
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if self.runtime_updates {
let mut state = serializer.serialize_struct("NewBlock", 3)?;
state.serialize_field("blockHash", &self.block_hash)?;
state.serialize_field("parentBlockHash", &self.parent_block_hash)?;
state.serialize_field("newRuntime", &self.new_runtime)?;
state.end()
} else {
let mut state = serializer.serialize_struct("NewBlock", 2)?;
state.serialize_field("blockHash", &self.block_hash)?;
state.serialize_field("parentBlockHash", &self.parent_block_hash)?;
state.end()
}
}
}
/// Indicate the block hash of the new best block.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BestBlockChanged<Hash> {
/// The block hash of the new best block.
pub best_block_hash: Hash,
}
/// Indicate the finalized and pruned block hashes.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Finalized<Hash> {
/// Block hashes that are finalized.
pub finalized_block_hashes: Vec<Hash>,
/// Block hashes that are pruned (removed).
pub pruned_block_hashes: Vec<Hash>,
}
/// The event generated by the `follow` method.
///
/// The events are generated in the following order:
/// 1. Initialized - generated only once to signal the
/// latest finalized block
/// 2. NewBlock - a new block was added.
/// 3. BestBlockChanged - indicate that the best block
/// is now the one from this event. The block was
/// announced priorly with the `NewBlock` event.
/// 4. Finalized - State the finalized and pruned blocks.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
pub enum FollowEvent<Hash> {
/// The latest finalized block.
///
/// This event is generated only once.
Initialized(Initialized<Hash>),
/// A new non-finalized block was added.
NewBlock(NewBlock<Hash>),
/// The best block of the chain.
BestBlockChanged(BestBlockChanged<Hash>),
/// A list of finalized and pruned blocks.
Finalized(Finalized<Hash>),
/// The subscription is dropped and no further events
/// will be generated.
Stop,
}
/// The result of a chain head method.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ChainHeadResult<T> {
/// Result of the method.
pub result: T,
}
/// The event generated by the body / call / storage methods.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
pub enum ChainHeadEvent<T> {
/// The request completed successfully.
Done(ChainHeadResult<T>),
/// The resources requested are inaccessible.
///
/// Resubmitting the request later might succeed.
Inaccessible(ErrorEvent),
/// An error occurred. This is definitive.
Error(ErrorEvent),
/// The provided subscription ID is stale or invalid.
Disjoint,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn follow_initialized_event_no_updates() {
// Runtime flag is false.
let event: FollowEvent<String> = FollowEvent::Initialized(Initialized {
finalized_block_hash: "0x1".into(),
finalized_block_runtime: None,
runtime_updates: false,
});
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"initialized","finalizedBlockHash":"0x1"}"#;
assert_eq!(ser, exp);
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn follow_initialized_event_with_updates() {
// Runtime flag is true, block runtime must always be reported for this event.
let runtime = RuntimeVersion {
spec_name: "ABC".into(),
impl_name: "Impl".into(),
spec_version: 1,
..Default::default()
};
let runtime_event = RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime });
let mut initialized = Initialized {
finalized_block_hash: "0x1".into(),
finalized_block_runtime: Some(runtime_event),
runtime_updates: true,
};
let event: FollowEvent<String> = FollowEvent::Initialized(initialized.clone());
let ser = serde_json::to_string(&event).unwrap();
let exp = concat!(
r#"{"event":"initialized","finalizedBlockHash":"0x1","#,
r#""finalizedBlockRuntime":{"type":"valid","spec":{"specName":"ABC","implName":"Impl","authoringVersion":0,"#,
r#""specVersion":1,"implVersion":0,"apis":[],"transactionVersion":0,"stateVersion":0}}}"#,
);
assert_eq!(ser, exp);
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
// The `runtime_updates` field is used for serialization purposes.
initialized.runtime_updates = false;
assert!(matches!(
event_dec, FollowEvent::Initialized(ref dec) if dec == &initialized
));
}
#[test]
fn follow_new_block_event_no_updates() {
// Runtime flag is false.
let event: FollowEvent<String> = FollowEvent::NewBlock(NewBlock {
block_hash: "0x1".into(),
parent_block_hash: "0x2".into(),
new_runtime: None,
runtime_updates: false,
});
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"newBlock","blockHash":"0x1","parentBlockHash":"0x2"}"#;
assert_eq!(ser, exp);
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn follow_new_block_event_with_updates() {
// Runtime flag is true, block runtime must always be reported for this event.
let runtime = RuntimeVersion {
spec_name: "ABC".into(),
impl_name: "Impl".into(),
spec_version: 1,
..Default::default()
};
let runtime_event = RuntimeEvent::Valid(RuntimeVersionEvent { spec: runtime });
let mut new_block = NewBlock {
block_hash: "0x1".into(),
parent_block_hash: "0x2".into(),
new_runtime: Some(runtime_event),
runtime_updates: true,
};
let event: FollowEvent<String> = FollowEvent::NewBlock(new_block.clone());
let ser = serde_json::to_string(&event).unwrap();
let exp = concat!(
r#"{"event":"newBlock","blockHash":"0x1","parentBlockHash":"0x2","#,
r#""newRuntime":{"type":"valid","spec":{"specName":"ABC","implName":"Impl","authoringVersion":0,"#,
r#""specVersion":1,"implVersion":0,"apis":[],"transactionVersion":0,"stateVersion":0}}}"#,
);
assert_eq!(ser, exp);
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
// The `runtime_updates` field is used for serialization purposes.
new_block.runtime_updates = false;
assert!(matches!(
event_dec, FollowEvent::NewBlock(ref dec) if dec == &new_block
));
// Runtime flag is true, runtime didn't change compared to parent.
let mut new_block = NewBlock {
block_hash: "0x1".into(),
parent_block_hash: "0x2".into(),
new_runtime: None,
runtime_updates: true,
};
let event: FollowEvent<String> = FollowEvent::NewBlock(new_block.clone());
let ser = serde_json::to_string(&event).unwrap();
let exp =
r#"{"event":"newBlock","blockHash":"0x1","parentBlockHash":"0x2","newRuntime":null}"#;
assert_eq!(ser, exp);
new_block.runtime_updates = false;
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
assert!(matches!(
event_dec, FollowEvent::NewBlock(ref dec) if dec == &new_block
));
}
#[test]
fn follow_best_block_changed_event() {
let event: FollowEvent<String> =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: "0x1".into() });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"bestBlockChanged","bestBlockHash":"0x1"}"#;
assert_eq!(ser, exp);
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn follow_finalized_event() {
let event: FollowEvent<String> = FollowEvent::Finalized(Finalized {
finalized_block_hashes: vec!["0x1".into()],
pruned_block_hashes: vec!["0x2".into()],
});
let ser = serde_json::to_string(&event).unwrap();
let exp =
r#"{"event":"finalized","finalizedBlockHashes":["0x1"],"prunedBlockHashes":["0x2"]}"#;
assert_eq!(ser, exp);
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn follow_stop_event() {
let event: FollowEvent<String> = FollowEvent::Stop;
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"stop"}"#;
assert_eq!(ser, exp);
let event_dec: FollowEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_done_event() {
let event: ChainHeadEvent<String> =
ChainHeadEvent::Done(ChainHeadResult { result: "A".into() });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"done","result":"A"}"#;
assert_eq!(ser, exp);
let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_inaccessible_event() {
let event: ChainHeadEvent<String> =
ChainHeadEvent::Inaccessible(ErrorEvent { error: "A".into() });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"inaccessible","error":"A"}"#;
assert_eq!(ser, exp);
let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_error_event() {
let event: ChainHeadEvent<String> = ChainHeadEvent::Error(ErrorEvent { error: "A".into() });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"error","error":"A"}"#;
assert_eq!(ser, exp);
let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_disjoint_event() {
let event: ChainHeadEvent<String> = ChainHeadEvent::Disjoint;
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"disjoint"}"#;
assert_eq!(ser, exp);
let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn chain_head_network_config() {
let conf = NetworkConfig {
total_attempts: 1,
max_parallel: NonZeroUsize::new(2).expect("Non zero number; qed"),
timeout_ms: 3,
};
let ser = serde_json::to_string(&conf).unwrap();
let exp = r#"{"totalAttempts":1,"maxParallel":2,"timeoutMs":3}"#;
assert_eq!(ser, exp);
let conf_dec: NetworkConfig = serde_json::from_str(exp).unwrap();
assert_eq!(conf_dec, conf);
}
}
@@ -0,0 +1,40 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Substrate chain head API.
//!
//! # Note
//!
//! Methods are prefixed by `chainHead`.
#[cfg(test)]
mod tests;
pub mod api;
pub mod chain_head;
pub mod error;
pub mod event;
mod subscription;
pub use api::ChainHeadApiServer;
pub use chain_head::ChainHead;
pub use event::{
BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent,
Initialized, NetworkConfig, NewBlock, RuntimeEvent, RuntimeVersionEvent,
};
@@ -0,0 +1,284 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Subscription management for tracking subscription IDs to pinned blocks.
use futures::channel::oneshot;
use parking_lot::{RwLock, RwLockWriteGuard};
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::Arc,
};
/// Subscription management error.
#[derive(Debug)]
pub enum SubscriptionManagementError {
/// The block cannot be pinned into memory because
/// the subscription has exceeded the maximum number
/// of blocks pinned.
ExceededLimits,
/// Custom error.
Custom(String),
}
/// Inner subscription data structure.
struct SubscriptionInner<Block: BlockT> {
/// The `runtime_updates` parameter flag of the subscription.
runtime_updates: bool,
/// Signals the "Stop" event.
tx_stop: Option<oneshot::Sender<()>>,
/// The blocks pinned.
blocks: HashSet<Block::Hash>,
/// The maximum number of pinned blocks allowed per subscription.
max_pinned_blocks: usize,
}
/// Manage the blocks of a specific subscription ID.
#[derive(Clone)]
pub struct SubscriptionHandle<Block: BlockT> {
inner: Arc<RwLock<SubscriptionInner<Block>>>,
/// The best reported block by this subscription.
/// Have this as a separate variable to easily share
/// the write guard with the RPC layer.
best_block: Arc<RwLock<Option<Block::Hash>>>,
}
impl<Block: BlockT> SubscriptionHandle<Block> {
/// Construct a new [`SubscriptionHandle`].
fn new(runtime_updates: bool, tx_stop: oneshot::Sender<()>, max_pinned_blocks: usize) -> Self {
SubscriptionHandle {
inner: Arc::new(RwLock::new(SubscriptionInner {
runtime_updates,
tx_stop: Some(tx_stop),
blocks: HashSet::new(),
max_pinned_blocks,
})),
best_block: Arc::new(RwLock::new(None)),
}
}
/// Trigger the stop event for the current subscription.
///
/// This can happen on internal failure (ie, the pruning deleted the block from memory)
/// or if the user exceeded the amount of available pinned blocks.
pub fn stop(&self) {
let mut inner = self.inner.write();
if let Some(tx_stop) = inner.tx_stop.take() {
let _ = tx_stop.send(());
}
}
/// Pin a new block for the current subscription ID.
///
/// Returns whether the value was newly inserted if the block can be pinned.
/// Otherwise, returns an error if the maximum number of blocks has been exceeded.
pub fn pin_block(&self, hash: Block::Hash) -> Result<bool, SubscriptionManagementError> {
let mut inner = self.inner.write();
if inner.blocks.len() == inner.max_pinned_blocks {
// We have reached the limit. However, the block can be already inserted.
if inner.blocks.contains(&hash) {
return Ok(false)
} else {
return Err(SubscriptionManagementError::ExceededLimits)
}
}
Ok(inner.blocks.insert(hash))
}
/// Unpin a new block for the current subscription ID.
///
/// Returns whether the value was present in the set.
pub fn unpin_block(&self, hash: &Block::Hash) -> bool {
let mut inner = self.inner.write();
inner.blocks.remove(hash)
}
/// Check if the block hash is present for the provided subscription ID.
///
/// Returns `true` if the set contains the block.
pub fn contains_block(&self, hash: &Block::Hash) -> bool {
let inner = self.inner.read();
inner.blocks.contains(hash)
}
/// Get the `runtime_updates` flag of this subscription.
pub fn has_runtime_updates(&self) -> bool {
let inner = self.inner.read();
inner.runtime_updates
}
/// Get the write guard of the best reported block.
pub fn best_block_write(&self) -> RwLockWriteGuard<'_, Option<Block::Hash>> {
self.best_block.write()
}
}
/// Manage block pinning / unpinning for subscription IDs.
pub struct SubscriptionManagement<Block: BlockT> {
/// Manage subscription by mapping the subscription ID
/// to a set of block hashes.
inner: RwLock<HashMap<String, SubscriptionHandle<Block>>>,
}
impl<Block: BlockT> SubscriptionManagement<Block> {
/// Construct a new [`SubscriptionManagement`].
pub fn new() -> Self {
SubscriptionManagement { inner: RwLock::new(HashMap::new()) }
}
/// Insert a new subscription ID.
///
/// If the subscription was not previously inserted, the method returns a tuple of
/// the receiver that is triggered upon the "Stop" event and the subscription
/// handle. Otherwise, when the subscription ID was already inserted returns none.
pub fn insert_subscription(
&self,
subscription_id: String,
runtime_updates: bool,
max_pinned_blocks: usize,
) -> Option<(oneshot::Receiver<()>, SubscriptionHandle<Block>)> {
let mut subs = self.inner.write();
if let Entry::Vacant(entry) = subs.entry(subscription_id) {
let (tx_stop, rx_stop) = oneshot::channel();
let handle =
SubscriptionHandle::<Block>::new(runtime_updates, tx_stop, max_pinned_blocks);
entry.insert(handle.clone());
Some((rx_stop, handle))
} else {
None
}
}
/// Remove the subscription ID with associated pinned blocks.
pub fn remove_subscription(&self, subscription_id: &String) {
let mut subs = self.inner.write();
subs.remove(subscription_id);
}
/// Obtain the specific subscription handle.
pub fn get_subscription(&self, subscription_id: &String) -> Option<SubscriptionHandle<Block>> {
let subs = self.inner.write();
subs.get(subscription_id).and_then(|handle| Some(handle.clone()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_core::H256;
use substrate_test_runtime_client::runtime::Block;
#[test]
fn subscription_check_id() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
let hash = H256::random();
let handle = subs.get_subscription(&id);
assert!(handle.is_none());
let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap();
assert!(!handle.contains_block(&hash));
subs.remove_subscription(&id);
let handle = subs.get_subscription(&id);
assert!(handle.is_none());
}
#[test]
fn subscription_check_block() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
let hash = H256::random();
// Check with subscription.
let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap();
assert!(!handle.contains_block(&hash));
assert!(!handle.unpin_block(&hash));
handle.pin_block(hash).unwrap();
assert!(handle.contains_block(&hash));
// Unpin an invalid block.
assert!(!handle.unpin_block(&H256::random()));
// Unpin the valid block.
assert!(handle.unpin_block(&hash));
assert!(!handle.contains_block(&hash));
}
#[test]
fn subscription_check_stop_event() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
// Check with subscription.
let (mut rx_stop, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap();
// Check the stop signal was not received.
let res = rx_stop.try_recv().unwrap();
assert!(res.is_none());
// Inserting a second time returns None.
let res = subs.insert_subscription(id.clone(), false, 10);
assert!(res.is_none());
handle.stop();
// Check the signal was received.
let res = rx_stop.try_recv().unwrap();
assert!(res.is_some());
}
#[test]
fn subscription_check_data() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap();
assert!(!handle.has_runtime_updates());
let id2 = "abcd".to_string();
let (_, handle) = subs.insert_subscription(id2.clone(), true, 10).unwrap();
assert!(handle.has_runtime_updates());
}
#[test]
fn subscription_check_max_pinned() {
let subs = SubscriptionManagement::<Block>::new();
let id = "abc".to_string();
let hash = H256::random();
let hash_2 = H256::random();
let (_, handle) = subs.insert_subscription(id.clone(), false, 1).unwrap();
handle.pin_block(hash).unwrap();
// The same block can be pinned multiple times.
handle.pin_block(hash).unwrap();
// Exceeded number of pinned blocks.
handle.pin_block(hash_2).unwrap_err();
}
}
File diff suppressed because it is too large Load Diff
+1
View File
@@ -23,6 +23,7 @@
#![warn(missing_docs)]
#![deny(unused_crate_dependencies)]
pub mod chain_head;
pub mod chain_spec;
pub mod transaction;