rpc-v2: Enable the archive class of methods (#3017)

The
[archive](https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/archive.md)
API is unstable and subject to change.

This PR enables the `archive` class of the RPC-V2 spec to substrate
based chains.

The `archive` API is enabled for archive nodes: 
- the state of the blocks is in archive mode
- the block's bodies are in archive mode

While at it, this PR extends the `BlocksPrunning` enum with an
`is_archive` helper to check if the pruning mode keeps the block's
bodies for long enough.

Defaults used for the `archive` API:
- a maximum of 5 responses are provided for descendants queries (this is
similar to chainHead)
- a maximum of 8 item queries are accepted at a time

Before stabilizing the API we should look into these defaults and adjust
after collecting some data.

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2024-01-23 18:22:56 +02:00
committed by GitHub
parent b4dfad838c
commit 01ac54db29
6 changed files with 99 additions and 23 deletions
+10
View File
@@ -320,6 +320,16 @@ pub enum BlocksPruning {
Some(u32), Some(u32),
} }
impl BlocksPruning {
/// True if this is an archive pruning mode (either KeepAll or KeepFinalized).
pub fn is_archive(&self) -> bool {
match *self {
BlocksPruning::KeepAll | BlocksPruning::KeepFinalized => true,
BlocksPruning::Some(_) => false,
}
}
}
/// Where to find the database.. /// Where to find the database..
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum DatabaseSource { pub enum DatabaseSource {
@@ -34,7 +34,7 @@ use sp_api::{CallApiAt, CallContext};
use sp_blockchain::{ use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
}; };
use sp_core::Bytes; use sp_core::{Bytes, U256};
use sp_runtime::{ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor}, traits::{Block as BlockT, Header as HeaderT, NumberFor},
SaturatedConversion, SaturatedConversion,
@@ -43,6 +43,36 @@ use std::{collections::HashSet, marker::PhantomData, sync::Arc};
use super::archive_storage::ArchiveStorage; use super::archive_storage::ArchiveStorage;
/// The configuration of [`Archive`].
pub struct ArchiveConfig {
/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
pub max_descendant_responses: usize,
/// The maximum number of queried items allowed for the `archive_storage` at a time.
pub max_queried_items: usize,
}
/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
///
/// Note: this is identical to the `chainHead` value.
const MAX_DESCENDANT_RESPONSES: usize = 5;
/// The maximum number of queried items allowed for the `archive_storage` at a time.
///
/// Note: A queried item can also be a descendant query which can return up to
/// `MAX_DESCENDANT_RESPONSES`.
const MAX_QUERIED_ITEMS: usize = 8;
impl Default for ArchiveConfig {
fn default() -> Self {
Self {
max_descendant_responses: MAX_DESCENDANT_RESPONSES,
max_queried_items: MAX_QUERIED_ITEMS,
}
}
}
/// An API for archive RPC calls. /// An API for archive RPC calls.
pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> { pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client. /// Substrate client.
@@ -51,8 +81,9 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
backend: Arc<BE>, backend: Arc<BE>,
/// The hexadecimal encoded hash of the genesis block. /// The hexadecimal encoded hash of the genesis block.
genesis_hash: String, genesis_hash: String,
/// The maximum number of reported items by the `archive_storage` at a time. /// The maximum number of items the `archive_storage` can return for a descendant query before
storage_max_reported_items: usize, /// pagination is required.
storage_max_descendant_responses: usize,
/// The maximum number of queried items allowed for the `archive_storage` at a time. /// The maximum number of queried items allowed for the `archive_storage` at a time.
storage_max_queried_items: usize, storage_max_queried_items: usize,
/// Phantom member to pin the block type. /// Phantom member to pin the block type.
@@ -65,16 +96,15 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
client: Arc<Client>, client: Arc<Client>,
backend: Arc<BE>, backend: Arc<BE>,
genesis_hash: GenesisHash, genesis_hash: GenesisHash,
storage_max_reported_items: usize, config: ArchiveConfig,
storage_max_queried_items: usize,
) -> Self { ) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref()); let genesis_hash = hex_string(&genesis_hash.as_ref());
Self { Self {
client, client,
backend, backend,
genesis_hash, genesis_hash,
storage_max_reported_items, storage_max_descendant_responses: config.max_descendant_responses,
storage_max_queried_items, storage_max_queried_items: config.max_queried_items,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@@ -97,7 +127,6 @@ impl<BE, Block, Client> ArchiveApiServer<Block::Hash> for Archive<BE, Block, Cli
where where
Block: BlockT + 'static, Block: BlockT + 'static,
Block::Header: Unpin, Block::Header: Unpin,
<<Block as BlockT>::Header as HeaderT>::Number: From<u64>,
BE: Backend<Block> + 'static, BE: Backend<Block> + 'static,
Client: BlockBackend<Block> Client: BlockBackend<Block>
+ ExecutorProvider<Block> + ExecutorProvider<Block>
@@ -136,7 +165,10 @@ where
} }
fn archive_unstable_hash_by_height(&self, height: u64) -> RpcResult<Vec<String>> { fn archive_unstable_hash_by_height(&self, height: u64) -> RpcResult<Vec<String>> {
let height: NumberFor<Block> = height.into(); let height: NumberFor<Block> = U256::from(height)
.try_into()
.map_err(|_| ArchiveError::InvalidParam(format!("Invalid block height: {}", height)))?;
let finalized_num = self.client.info().finalized_number; let finalized_num = self.client.info().finalized_number;
if finalized_num >= height { if finalized_num >= height {
@@ -240,7 +272,7 @@ where
let storage_client = ArchiveStorage::new( let storage_client = ArchiveStorage::new(
self.client.clone(), self.client.clone(),
self.storage_max_reported_items, self.storage_max_descendant_responses,
self.storage_max_queried_items, self.storage_max_queried_items,
); );
Ok(storage_client.handle_query(hash, items, child_trie)) Ok(storage_client.handle_query(hash, items, child_trie))
@@ -28,12 +28,12 @@ use crate::common::{
storage::{IterQueryType, QueryIter, Storage}, storage::{IterQueryType, QueryIter, Storage},
}; };
/// Generates the events of the `chainHead_storage` method. /// Generates the events of the `archive_storage` method.
pub struct ArchiveStorage<Client, Block, BE> { pub struct ArchiveStorage<Client, Block, BE> {
/// Storage client. /// Storage client.
client: Storage<Client, Block, BE>, client: Storage<Client, Block, BE>,
/// The maximum number of reported items by the `archive_storage` at a time. /// The maximum number of responses the API can return for a descendant query at a time.
storage_max_reported_items: usize, storage_max_descendant_responses: usize,
/// The maximum number of queried items allowed for the `archive_storage` at a time. /// The maximum number of queried items allowed for the `archive_storage` at a time.
storage_max_queried_items: usize, storage_max_queried_items: usize,
} }
@@ -42,10 +42,14 @@ impl<Client, Block, BE> ArchiveStorage<Client, Block, BE> {
/// Constructs a new [`ArchiveStorage`]. /// Constructs a new [`ArchiveStorage`].
pub fn new( pub fn new(
client: Arc<Client>, client: Arc<Client>,
storage_max_reported_items: usize, storage_max_descendant_responses: usize,
storage_max_queried_items: usize, storage_max_queried_items: usize,
) -> Self { ) -> Self {
Self { client: Storage::new(client), storage_max_reported_items, storage_max_queried_items } Self {
client: Storage::new(client),
storage_max_descendant_responses,
storage_max_queried_items,
}
} }
} }
@@ -96,7 +100,7 @@ where
}, },
hash, hash,
child_key.as_ref(), child_key.as_ref(),
self.storage_max_reported_items, self.storage_max_descendant_responses,
) { ) {
Ok((results, _)) => storage_results.extend(results), Ok((results, _)) => storage_results.extend(results),
Err(error) => return ArchiveStorageResult::err(error), Err(error) => return ArchiveStorageResult::err(error),
@@ -111,7 +115,7 @@ where
}, },
hash, hash,
child_key.as_ref(), child_key.as_ref(),
self.storage_max_reported_items, self.storage_max_descendant_responses,
) { ) {
Ok((results, _)) => storage_results.extend(results), Ok((results, _)) => storage_results.extend(results),
Err(error) => return ArchiveStorageResult::err(error), Err(error) => return ArchiveStorageResult::err(error),
@@ -32,3 +32,4 @@ pub mod archive;
pub mod error; pub mod error;
pub use api::ArchiveApiServer; pub use api::ArchiveApiServer;
pub use archive::{Archive, ArchiveConfig};
@@ -24,7 +24,10 @@ use crate::{
hex_string, MethodResult, hex_string, MethodResult,
}; };
use super::{archive::Archive, *}; use super::{
archive::{Archive, ArchiveConfig},
*,
};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use codec::{Decode, Encode}; use codec::{Decode, Encode};
@@ -60,7 +63,7 @@ type Header = substrate_test_runtime_client::runtime::Header;
type Block = substrate_test_runtime_client::runtime::Block; type Block = substrate_test_runtime_client::runtime::Block;
fn setup_api( fn setup_api(
max_returned_items: usize, max_descendant_responses: usize,
max_queried_items: usize, max_queried_items: usize,
) -> (Arc<Client<Backend>>, RpcModule<Archive<Backend, Block, Client<Backend>>>) { ) -> (Arc<Client<Backend>>, RpcModule<Archive<Backend, Block, Client<Backend>>>) {
let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY);
@@ -72,9 +75,13 @@ fn setup_api(
let backend = builder.backend(); let backend = builder.backend();
let client = Arc::new(builder.build()); let client = Arc::new(builder.build());
let api = let api = Archive::new(
Archive::new(client.clone(), backend, CHAIN_GENESIS, max_returned_items, max_queried_items) client.clone(),
.into_rpc(); backend,
CHAIN_GENESIS,
ArchiveConfig { max_descendant_responses, max_queried_items },
)
.into_rpc();
(client, api) (client, api)
} }
+23 -1
View File
@@ -63,7 +63,9 @@ use sc_rpc::{
system::SystemApiServer, system::SystemApiServer,
DenyUnsafe, SubscriptionTaskExecutor, DenyUnsafe, SubscriptionTaskExecutor,
}; };
use sc_rpc_spec_v2::{chain_head::ChainHeadApiServer, transaction::TransactionApiServer}; use sc_rpc_spec_v2::{
archive::ArchiveApiServer, chain_head::ChainHeadApiServer, transaction::TransactionApiServer,
};
use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO}; use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool}; use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
@@ -664,6 +666,26 @@ where
) )
.into_rpc(); .into_rpc();
// Part of the RPC v2 spec.
// An archive node that can respond to the `archive` RPC-v2 queries is a node with:
// - state pruning in archive mode: The storage of blocks is kept around
// - block pruning in archive mode: The block's body is kept around
let is_archive_node = config.state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
config.blocks_pruning.is_archive();
if is_archive_node {
let genesis_hash =
client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
client.clone(),
backend.clone(),
genesis_hash,
// Defaults to sensible limits for the `Archive`.
sc_rpc_spec_v2::archive::ArchiveConfig::default(),
)
.into_rpc();
rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
}
let author = sc_rpc::author::Author::new( let author = sc_rpc::author::Author::new(
client.clone(), client.clone(),
transaction_pool, transaction_pool,