Files
pezkuwi-subxt/substrate/client/rpc/src/author/mod.rs
T
Niklas Adolfsson e16ef0861f rpc: backpressured RPC server (bump jsonrpsee 0.20) (#1313)
This is a rather big change in jsonrpsee, the major things in this bump
are:
- Server backpressure (the subscription impls are modified to deal with
that)
- Allow custom error types / return types (remove jsonrpsee::core::Error
and jsonrpee::core::CallError)
- Bug fixes (graceful shutdown in particular not used by substrate
anyway)
   - Less dependencies for the clients in particular
   - Return type requires Clone in method call responses
   - Moved to tokio channels
   - Async subscription API (not used in this PR)

Major changes in this PR:
- The subscriptions are now bounded and if subscription can't keep up
with the server it is dropped
- CLI: add parameter to configure the jsonrpc server bounded message
buffer (default is 64)
- Add our own subscription helper to deal with the unbounded streams in
substrate

The most important things in this PR to review is the added helpers
functions in `substrate/client/rpc/src/utils.rs` and the rest is pretty
much chore.

Regarding the "bounded buffer limit" it may cause the server to handle
the JSON-RPC calls
slower than before.

The message size limit is bounded by "--rpc-response-size" thus "by
default 10MB * 64 = 640MB"
but the subscription message size is not covered by this limit and could
be capped as well.

Hopefully the last release prior to 1.0, sorry in advance for a big PR

Previous attempt: https://github.com/paritytech/substrate/pull/13992

Resolves https://github.com/paritytech/polkadot-sdk/issues/748, resolves
https://github.com/paritytech/polkadot-sdk/issues/627
2024-01-23 08:55:13 +00:00

211 lines
6.1 KiB
Rust

// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Substrate block-author/full-node API.
#[cfg(test)]
mod tests;
use std::sync::Arc;
use crate::{
utils::{pipe_from_stream, spawn_subscription_task},
SubscriptionTaskExecutor,
};
use codec::{Decode, Encode};
use futures::TryFutureExt;
use jsonrpsee::{core::async_trait, types::ErrorObject, PendingSubscriptionSink};
use sc_rpc_api::DenyUnsafe;
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, InPoolTransaction, TransactionFor, TransactionPool,
TransactionSource, TxHash,
};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_core::Bytes;
use sp_keystore::{KeystoreExt, KeystorePtr};
use sp_runtime::traits::Block as BlockT;
use sp_session::SessionKeys;
use self::error::{Error, Result};
/// Re-export the API for backward compatibility.
pub use sc_rpc_api::author::*;
/// Authoring API
pub struct Author<P, Client> {
/// Substrate client
client: Arc<Client>,
/// Transactions pool
pool: Arc<P>,
/// The key store.
keystore: KeystorePtr,
/// Whether to deny unsafe calls
deny_unsafe: DenyUnsafe,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
}
impl<P, Client> Author<P, Client> {
/// Create new instance of Authoring API.
pub fn new(
client: Arc<Client>,
pool: Arc<P>,
keystore: KeystorePtr,
deny_unsafe: DenyUnsafe,
executor: SubscriptionTaskExecutor,
) -> Self {
Author { client, pool, keystore, deny_unsafe, executor }
}
}
/// Currently we treat all RPC transactions as externals.
///
/// Possibly in the future we could allow opt-in for special treatment
/// of such transactions, so that the block authors can inject
/// some unique transactions via RPC and have them included in the pool.
const TX_SOURCE: TransactionSource = TransactionSource::External;
#[async_trait]
impl<P, Client> AuthorApiServer<TxHash<P>, BlockHash<P>> for Author<P, Client>
where
P: TransactionPool + Sync + Send + 'static,
Client: HeaderBackend<P::Block> + ProvideRuntimeApi<P::Block> + Send + Sync + 'static,
Client::Api: SessionKeys<P::Block>,
P::Hash: Unpin,
<P::Block as BlockT>::Hash: Unpin,
{
async fn submit_extrinsic(&self, ext: Bytes) -> Result<TxHash<P>> {
let xt = match Decode::decode(&mut &ext[..]) {
Ok(xt) => xt,
Err(err) => return Err(Error::Client(Box::new(err)).into()),
};
let best_block_hash = self.client.info().best_hash;
self.pool.submit_one(best_block_hash, TX_SOURCE, xt).await.map_err(|e| {
e.into_pool_error()
.map(|e| Error::Pool(e))
.unwrap_or_else(|e| Error::Verification(Box::new(e)))
.into()
})
}
fn insert_key(&self, key_type: String, suri: String, public: Bytes) -> Result<()> {
self.deny_unsafe.check_if_safe()?;
let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
self.keystore
.insert(key_type, &suri, &public[..])
.map_err(|_| Error::KeystoreUnavailable)?;
Ok(())
}
fn rotate_keys(&self) -> Result<Bytes> {
self.deny_unsafe.check_if_safe()?;
let best_block_hash = self.client.info().best_hash;
let mut runtime_api = self.client.runtime_api();
runtime_api.register_extension(KeystoreExt::from(self.keystore.clone()));
runtime_api
.generate_session_keys(best_block_hash, None)
.map(Into::into)
.map_err(|api_err| Error::Client(Box::new(api_err)).into())
}
fn has_session_keys(&self, session_keys: Bytes) -> Result<bool> {
self.deny_unsafe.check_if_safe()?;
let best_block_hash = self.client.info().best_hash;
let keys = self
.client
.runtime_api()
.decode_session_keys(best_block_hash, session_keys.to_vec())
.map_err(|e| Error::Client(Box::new(e)))?
.ok_or(Error::InvalidSessionKeys)?;
Ok(self.keystore.has_keys(&keys))
}
fn has_key(&self, public_key: Bytes, key_type: String) -> Result<bool> {
self.deny_unsafe.check_if_safe()?;
let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
Ok(self.keystore.has_keys(&[(public_key.to_vec(), key_type)]))
}
fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
Ok(self.pool.ready().map(|tx| tx.data().encode().into()).collect())
}
fn remove_extrinsic(
&self,
bytes_or_hash: Vec<hash::ExtrinsicOrHash<TxHash<P>>>,
) -> Result<Vec<TxHash<P>>> {
self.deny_unsafe.check_if_safe()?;
let hashes = bytes_or_hash
.into_iter()
.map(|x| match x {
hash::ExtrinsicOrHash::Hash(h) => Ok(h),
hash::ExtrinsicOrHash::Extrinsic(bytes) => {
let xt = Decode::decode(&mut &bytes[..])?;
Ok(self.pool.hash_of(&xt))
},
})
.collect::<Result<Vec<_>>>()?;
Ok(self
.pool
.remove_invalid(&hashes)
.into_iter()
.map(|tx| tx.hash().clone())
.collect())
}
fn watch_extrinsic(&self, pending: PendingSubscriptionSink, xt: Bytes) {
let best_block_hash = self.client.info().best_hash;
let dxt = match TransactionFor::<P>::decode(&mut &xt[..]).map_err(|e| Error::from(e)) {
Ok(dxt) => dxt,
Err(e) => {
spawn_subscription_task(&self.executor, pending.reject(e));
return
},
};
let submit = self.pool.submit_and_watch(best_block_hash, TX_SOURCE, dxt).map_err(|e| {
e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)))
});
let fut = async move {
let stream = match submit.await {
Ok(stream) => stream,
Err(err) => {
let _ = pending.reject(ErrorObject::from(err)).await;
return
},
};
pipe_from_stream(pending, stream).await;
};
spawn_subscription_task(&self.executor, fut);
}
}