jsonrpsee integration (#8783)

* Add tokio

* No need to map CallError to CallError

* jsonrpsee proc macros (#9673)

* port error types to `JsonRpseeError`

* migrate chain module to proc macro api

* make it compile with proc macros

* update branch

* update branch

* update to jsonrpsee master

* port system rpc

* port state rpc

* port childstate & offchain

* frame system rpc

* frame transaction payment

* bring back CORS hack to work with polkadot UI

* port babe rpc

* port manual seal rpc

* port frame mmr rpc

* port frame contracts rpc

* port finality grandpa rpc

* port sync state rpc

* resolve a few TODO + no jsonrpc deps

* Update bin/node/rpc-client/src/main.rs

* Update bin/node/rpc-client/src/main.rs

* Update bin/node/rpc-client/src/main.rs

* Update bin/node/rpc-client/src/main.rs

* Port over system_ rpc tests

* Make it compile

* Use prost 0.8

* Use prost 0.8

* Make it compile

* Ignore more failing tests

* Comment out WIP tests

* fix nit in frame system api

* Update lockfile

* No more juggling tokio versions

* No more wait_for_stop ?

* Remove browser-testing

* Arguments must be arrays

* Use same argument names

* Resolve todo: no wait_for_stop for WS server
Add todo: is parse_rpc_result used?
Cleanup imports

* fmt

* log

* One test passes

* update jsonrpsee

* update jsonrpsee

* cleanup rpc-servers crate

* jsonrpsee: add host and origin filtering (#9787)

* add access control in the jsonrpsee servers

* use master

* fix nits

* rpc runtime_version safe

* fix nits

* fix grumbles

* remove unused files

* resolve some todos

* jsonrpsee more cleanup (#9803)

* more cleanup

* resolve TODOs

* fix some unwraps

* remove type hints

* update jsonrpsee

* downgrade zeroize

* pin jsonrpsee rev

* remove unwrap nit

* Comment out more tests that aren't ported

* Comment out more tests

* Fix tests after merge

* Subscription test

* Invalid nonce test

* Pending exts

* WIP removeExtrinsic test

* Test remove_extrinsic

* Make state test: should_return_storage work

* Uncomment/fix the other non-subscription related state tests

* test: author_insertKey

* test: author_rotateKeys

* Get rest of state tests passing

* asyncify a little more

* Add todo to note #msg change

* Crashing test for has_session_keys

* Fix error conversion to avoid stack overflows
Port author_hasSessionKeys test
fmt

* test author_hasKey

* Add two missing tests
Add a check on the return type
Add todos for James's concerns

* RPC tests for state, author and system (#9859)

* Fix test runner

* Impl Default for SubscriptionTaskExecutor

* Keep the minimul amount of code needed to compile tests

* Re-instate `RpcSession` (for now)

* cleanup

* Port over RPC tests

* Add tokio

* No need to map CallError to CallError

* Port over system_ rpc tests

* Make it compile

* Use prost 0.8

* Use prost 0.8

* Make it compile

* Ignore more failing tests

* Comment out WIP tests

* Update lockfile

* No more juggling tokio versions

* No more wait_for_stop ?

* Remove browser-testing

* Arguments must be arrays

* Use same argument names

* Resolve todo: no wait_for_stop for WS server
Add todo: is parse_rpc_result used?
Cleanup imports

* fmt

* log

* One test passes

* Comment out more tests that aren't ported

* Comment out more tests

* Fix tests after merge

* Subscription test

* Invalid nonce test

* Pending exts

* WIP removeExtrinsic test

* Test remove_extrinsic

* Make state test: should_return_storage work

* Uncomment/fix the other non-subscription related state tests

* test: author_insertKey

* test: author_rotateKeys

* Get rest of state tests passing

* asyncify a little more

* Add todo to note #msg change

* Crashing test for has_session_keys

* Fix error conversion to avoid stack overflows
Port author_hasSessionKeys test
fmt

* test author_hasKey

* Add two missing tests
Add a check on the return type
Add todos for James's concerns

* offchain rpc tests

* Address todos

* fmt

Co-authored-by: James Wilson <james@jsdw.me>

* fix drop in state test

* update jsonrpsee

* fix ignored system test

* fix chain tests

* remove some boiler plate

* Port BEEFY RPC (#9883)

* Merge master

* Port beefy RPC (ty @niklas!)

* trivial changes left over from merge

* Remove unused code

* Update jsonrpsee

* fix build

* make tests compile again

* beefy update jsonrpsee

* fix: respect rpc methods policy

* update cargo.lock

* update jsonrpsee

* update jsonrpsee

* downgrade error logs

* update jsonrpsee

* Fix typo

* remove unused file

* Better name

* Port Babe RPC tests

* Put docs back

* Resolve todo

* Port tests for System RPCs

* Resolve todo

* fix build

* Updated jsonrpsee to current master

* fix: port finality grandpa rpc tests

* Move .into() outside of the match

* more review grumbles

* jsonrpsee: add `rpc handlers` back (#10245)

* add back RpcHandlers

* cargo fmt

* fix docs

* fix grumble: remove needless alloc

* resolve TODO

* fmt

* Fix typo

* grumble: Use constants based on BASE_ERROR

* grumble: DRY whitelisted listening addresses
grumble: s/JSONRPC/JSON-RPC/

* cleanup

* grumbles: Making readers aware of the possibility of gaps

* review grumbles

* grumbles

* remove notes from niklasad1

* Update `jsonrpsee`

* fix: jsonrpsee features

* jsonrpsee: fallback to random port in case the specified port failed (#10304)

* jsonrpsee: fallback to random port

* better comment

* Update client/rpc-servers/src/lib.rs

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Update client/rpc-servers/src/lib.rs

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* address grumbles

* cargo fmt

* addrs already slice

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Update jsonrpsee to 092081a0a2b8904c6ebd2cd99e16c7bc13ffc3ae

* lockfile

* update jsonrpsee

* fix warning

* Don't fetch jsonrpsee from crates

* make tests compile again

* fix rpc tests

* remove unused deps

* update tokio

* fix rpc tests again

* fix: test runner

`HttpServerBuilder::builder` fails unless it's called within tokio runtime

* cargo fmt

* grumbles: fix subscription aliases

* make clippy happy

* update remaining subscriptions alias

* cleanup

* cleanup

* fix chain subscription: less boiler plate (#10285)

* fix chain subscription: less boiler plate

* fix bad merge

* cargo fmt

* Switch to jsonrpsee 0.5

* fix build

* add missing features

* fix nit: remove needless Box::pin

* Integrate jsonrpsee metrics (#10395)

* draft metrics impl

* Use latest api

* Add missing file

* Http server metrics

* cleanup

* bump jsonrpsee

* Remove `ServerMetrics` and use a single middleware for both connection counting (aka sessions) and call metrics.

* fix build

* remove needless Arc::clone

* Update to jsonrpsee 0.6

* lolz

* fix metrics

* Revert "lolz"

This reverts commit eed6c6a56e78d8e307b4950f4c52a1c3a2322ba1.

* fix: in-memory rpc support subscriptions

* commit Cargo.lock

* Update tests to 0.7

* fix TODOs

* ws server: generate subscriptionIDs as Strings

Some libraries seems to expect the subscription IDs to be Strings, let's not break
this in this PR.

* Increase timeout

* Port over tests

* cleanup

* Using error codes from the spec

* fix clippy

* cargo fmt

* update jsonrpsee

* fix nits

* fix: rpc_query

* enable custom subid gen through spawn_tasks

* remove unsed deps

* unify tokio deps

* Revert "enable custom subid gen through spawn_tasks"

This reverts commit 5c5eb70328fe39d154fdb55c56e637b4548cf470.

* fix bad merge of `test-utils`

* fix more nits

* downgrade wasm-instrument to 0.1.0

* [jsonrpsee]: enable custom RPC subscription ID generatation (#10731)

* enable custom subid gen through spawn_tasks

* fix nits

* Update client/service/src/builder.rs

Co-authored-by: David <dvdplm@gmail.com>

* add Poc; needs jsonrpsee pr

* update jsonrpsee

* add re-exports

* add docs

Co-authored-by: David <dvdplm@gmail.com>

* cargo fmt

* fmt

* port RPC-API dev

* Remove unused file

* fix nit: remove async trait

* fix doc links

* fix merge nit: remove jsonrpc deps

* kill namespace on rpc apis

* companion for jsonrpsee v0.10 (#11158)

* companion for jsonrpsee v0.10

* update versions v0.10.0

* add some fixes

* spelling

* fix spaces

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* send error before subs are closed

* fix unsubscribe method names: chain

* fix tests

* jsonrpc server: print binded local address

* grumbles: kill SubscriptionTaskExecutor

* Update client/sync-state-rpc/src/lib.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/rpc/src/chain/chain_full.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/rpc/src/chain/chain_full.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* sync-state-rpc: kill anyhow

* no more anyhow

* remove todo

* jsonrpsee:  fix bad params in subscriptions. (#11251)

* update jsonrpsee

* fix error responses

* revert error codes

* dont do weird stuff in drop impl

* rpc servers: remove needless clone

* Remove silly constants

* chore: update jsonrpsee v0.12

* commit Cargo.lock

* deps: downgrade git2

* feat: CLI flag max subscriptions per connection

* metrics: use old logging format

* fix: read WS address from substrate output (#11379)

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: James Wilson <james@jsdw.me>
Co-authored-by: Maciej Hirsz <hello@maciej.codes>
Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
David
2022-05-10 10:52:19 +02:00
committed by GitHub
parent e45d53552d
commit 29c0c6a4a8
93 changed files with 3813 additions and 5094 deletions
+64 -126
View File
@@ -25,7 +25,7 @@ use crate::{
start_rpc_servers, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
};
use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
use jsonrpc_pubsub::manager::SubscriptionManager;
use jsonrpsee::RpcModule;
use log::info;
use prometheus_endpoint::Registry;
use sc_chain_spec::get_extension;
@@ -45,6 +45,14 @@ use sc_network::{
warp_request_handler::{self, RequestHandler as WarpSyncRequestHandler, WarpSyncProvider},
NetworkService,
};
use sc_rpc::{
author::AuthorApiServer,
chain::ChainApiServer,
offchain::OffchainApiServer,
state::{ChildStateApiServer, StateApiServer},
system::SystemApiServer,
DenyUnsafe, SubscriptionTaskExecutor,
};
use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
use sc_transaction_pool_api::MaintainedTransactionPool;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
@@ -62,69 +70,6 @@ use sp_runtime::{
};
use std::{str::FromStr, sync::Arc, time::SystemTime};
/// A utility trait for building an RPC extension given a `DenyUnsafe` instance.
/// This is useful since at service definition time we don't know whether the
/// specific interface where the RPC extension will be exposed is safe or not.
/// This trait allows us to lazily build the RPC extension whenever we bind the
/// service to an interface.
pub trait RpcExtensionBuilder {
/// The type of the RPC extension that will be built.
type Output: sc_rpc::RpcExtension<sc_rpc::Metadata>;
/// Returns an instance of the RPC extension for a particular `DenyUnsafe`
/// value, e.g. the RPC extension might not expose some unsafe methods.
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Result<Self::Output, Error>;
}
impl<F, R> RpcExtensionBuilder for F
where
F: Fn(sc_rpc::DenyUnsafe, sc_rpc::SubscriptionTaskExecutor) -> Result<R, Error>,
R: sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
type Output = R;
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Result<Self::Output, Error> {
(*self)(deny, subscription_executor)
}
}
/// A utility struct for implementing an `RpcExtensionBuilder` given a cloneable
/// `RpcExtension`, the resulting builder will simply ignore the provided
/// `DenyUnsafe` instance and return a static `RpcExtension` instance.
pub struct NoopRpcExtensionBuilder<R>(pub R);
impl<R> RpcExtensionBuilder for NoopRpcExtensionBuilder<R>
where
R: Clone + sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
type Output = R;
fn build(
&self,
_deny: sc_rpc::DenyUnsafe,
_subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Result<Self::Output, Error> {
Ok(self.0.clone())
}
}
impl<R> From<R> for NoopRpcExtensionBuilder<R>
where
R: sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
fn from(e: R) -> NoopRpcExtensionBuilder<R> {
NoopRpcExtensionBuilder(e)
}
}
/// Full client type.
pub type TFullClient<TBl, TRtApi, TExec> =
Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
@@ -389,9 +334,9 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub keystore: SyncCryptoStorePtr,
/// A shared transaction pool.
pub transaction_pool: Arc<TExPool>,
/// A RPC extension builder. Use `NoopRpcExtensionBuilder` if you just want to pass in the
/// extensions directly.
pub rpc_extensions_builder: Box<dyn RpcExtensionBuilder<Output = TRpc> + Send>,
/// Builds additional [`RpcModule`]s that should be added to the server
pub rpc_builder:
Box<dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
/// A shared network instance.
pub network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
/// A Sender for RPC requests.
@@ -463,7 +408,6 @@ where
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash>
+ parity_util_mem::MallocSizeOf
+ 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
let SpawnTasksParams {
mut config,
@@ -472,7 +416,7 @@ where
backend,
keystore,
transaction_pool,
rpc_extensions_builder,
rpc_builder,
network,
system_rpc_tx,
telemetry,
@@ -536,35 +480,25 @@ where
metrics_service.run(client.clone(), transaction_pool.clone(), network.clone()),
);
// RPC
let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware| {
gen_handler(
let rpc_id_provider = config.rpc_id_provider.take();
// jsonrpsee RPC
let gen_rpc_module = |deny_unsafe: DenyUnsafe| {
gen_rpc_module(
deny_unsafe,
rpc_middleware,
&config,
task_manager.spawn_handle(),
client.clone(),
transaction_pool.clone(),
keystore.clone(),
&*rpc_extensions_builder,
backend.offchain_storage(),
system_rpc_tx.clone(),
&config,
backend.offchain_storage(),
&*rpc_builder,
)
};
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;
let server_metrics = sc_rpc_server::ServerMetrics::new(config.prometheus_registry())?;
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone(), server_metrics)?;
// This is used internally, so don't restrict access to unsafe RPC
let known_rpc_method_names =
sc_rpc_server::method_names(|m| gen_handler(sc_rpc::DenyUnsafe::No, m))?;
let rpc_handlers = RpcHandlers(Arc::new(
gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics, known_rpc_method_names, "inbrowser"),
)?
.into(),
));
let rpc = start_rpc_servers(&config, gen_rpc_module, rpc_id_provider)?;
let rpc_handlers = RpcHandlers(Arc::new(gen_rpc_module(sc_rpc::DenyUnsafe::No)?.into()));
// Spawn informant task
spawn_handle.spawn(
@@ -578,7 +512,7 @@ where
),
);
task_manager.keep_alive((config.base_path, rpc, rpc_handlers.clone()));
task_manager.keep_alive((config.base_path, rpc));
Ok(rpc_handlers)
}
@@ -642,18 +576,17 @@ fn init_telemetry<TBl: BlockT, TCl: BlockBackend<TBl>>(
Ok(telemetry.handle())
}
fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware,
config: &Configuration,
fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
deny_unsafe: DenyUnsafe,
spawn_handle: SpawnTaskHandle,
client: Arc<TCl>,
transaction_pool: Arc<TExPool>,
keystore: SyncCryptoStorePtr,
rpc_extensions_builder: &(dyn RpcExtensionBuilder<Output = TRpc> + Send),
offchain_storage: Option<<TBackend as sc_client_api::backend::Backend<TBl>>::OffchainStorage>,
system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
) -> Result<sc_rpc_server::RpcHandler<sc_rpc::Metadata>, Error>
config: &Configuration,
offchain_storage: Option<<TBackend as sc_client_api::backend::Backend<TBl>>::OffchainStorage>,
rpc_builder: &(dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
) -> Result<RpcModule<()>, Error>
where
TBl: BlockT,
TCl: ProvideRuntimeApi<TBl>
@@ -668,15 +601,12 @@ where
+ Send
+ Sync
+ 'static,
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TBackend: sc_client_api::backend::Backend<TBl> + 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata>,
<TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TBl::Hash: Unpin,
TBl::Header: Unpin,
{
use sc_rpc::{author, chain, offchain, state, system};
let system_info = sc_rpc::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.clone(),
@@ -685,42 +615,50 @@ where
chain_type: config.chain_spec.chain_type(),
};
let task_executor = sc_rpc::SubscriptionTaskExecutor::new(spawn_handle);
let subscriptions = SubscriptionManager::new(Arc::new(task_executor.clone()));
let mut rpc_api = RpcModule::new(());
let task_executor = Arc::new(spawn_handle);
let (chain, state, child_state) = {
// Full nodes
let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone());
let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
let (state, child_state) = sc_rpc::state::new_full(
client.clone(),
subscriptions.clone(),
task_executor.clone(),
deny_unsafe,
config.rpc_max_payload,
);
let state = state.into_rpc();
let child_state = child_state.into_rpc();
(chain, state, child_state)
};
let author =
sc_rpc::author::Author::new(client, transaction_pool, subscriptions, keystore, deny_unsafe);
let system = system::System::new(system_info, system_rpc_tx, deny_unsafe);
let author = sc_rpc::author::Author::new(
client.clone(),
transaction_pool,
keystore,
deny_unsafe,
task_executor.clone(),
)
.into_rpc();
let maybe_offchain_rpc = offchain_storage.map(|storage| {
let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe);
offchain::OffchainApi::to_delegate(offchain)
});
let system = sc_rpc::system::System::new(system_info, system_rpc_tx, deny_unsafe).into_rpc();
Ok(sc_rpc_server::rpc_handler(
(
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, task_executor)?,
),
rpc_middleware,
))
if let Some(storage) = offchain_storage {
let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe).into_rpc();
rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
}
rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
// Additional [`RpcModule`]s defined in the node to fit the specific blockchain
let extra_rpcs = rpc_builder(deny_unsafe, task_executor.clone())?;
rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
Ok(rpc_api)
}
/// Parameters to pass into `build_network`.
+12
View File
@@ -100,6 +100,18 @@ pub struct Configuration {
pub rpc_methods: RpcMethods,
/// Maximum payload of rpc request/responses.
pub rpc_max_payload: Option<usize>,
/// Maximum payload of a rpc request
pub rpc_max_request_size: Option<usize>,
/// Maximum payload of a rpc request
pub rpc_max_response_size: Option<usize>,
/// Custom JSON-RPC subscription ID provider.
///
/// Default: [`crate::RandomStringSubscriptionId`].
pub rpc_id_provider: Option<Box<dyn crate::RpcSubscriptionIdProvider>>,
/// Maximum allowed subscriptions per rpc connection
///
/// Default: 1024.
pub rpc_max_subs_per_conn: Option<usize>,
/// Maximum size of the output buffer capacity for websocket connections.
pub ws_max_out_buffer_capacity: Option<usize>,
/// Prometheus endpoint configuration. `None` if disabled.
+123 -151
View File
@@ -34,13 +34,15 @@ mod client;
mod metrics;
mod task_manager;
use std::{collections::HashMap, io, net::SocketAddr, pin::Pin};
use std::{collections::HashMap, net::SocketAddr};
use codec::{Decode, Encode};
use futures::{Future, FutureExt, StreamExt};
use futures::{channel::mpsc, FutureExt, StreamExt};
use jsonrpsee::{core::Error as JsonRpseeError, RpcModule};
use log::{debug, error, warn};
use sc_client_api::{BlockBackend, ProofProvider};
use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
use sc_network::PeerId;
use sc_rpc_server::WsConfig;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_blockchain::HeaderMetadata;
use sp_runtime::{
@@ -52,8 +54,7 @@ pub use self::{
builder::{
build_network, build_offchain_workers, new_client, new_db_backend, new_full_client,
new_full_parts, spawn_tasks, BuildNetworkParams, KeystoreContainer, NetworkStarter,
NoopRpcExtensionBuilder, RpcExtensionBuilder, SpawnTasksParams, TFullBackend,
TFullCallExecutor, TFullClient,
SpawnTasksParams, TFullBackend, TFullCallExecutor, TFullClient,
},
client::{ClientConfig, LocalCallExecutor},
error::Error,
@@ -65,12 +66,14 @@ pub use sc_chain_spec::{
ChainSpec, ChainType, Extension as ChainSpecExtension, GenericChainSpec, NoExtension,
Properties, RuntimeGenesis,
};
use sc_client_api::{blockchain::HeaderBackend, BlockchainEvents};
pub use sc_consensus::ImportQueue;
pub use sc_executor::NativeExecutionDispatch;
#[doc(hidden)]
pub use sc_network::config::{TransactionImport, TransactionImportFuture};
pub use sc_rpc::Metadata as RpcMetadata;
pub use sc_rpc::{
RandomIntegerSubscriptionId, RandomStringSubscriptionId, RpcSubscriptionIdProvider,
};
pub use sc_tracing::TracingReceiver;
pub use sc_transaction_pool::Options as TransactionPoolOptions;
pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
@@ -82,32 +85,27 @@ const DEFAULT_PROTOCOL_ID: &str = "sup";
/// RPC handlers that can perform RPC queries.
#[derive(Clone)]
pub struct RpcHandlers(
Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata, sc_rpc_server::RpcMiddleware>>,
);
pub struct RpcHandlers(Arc<RpcModule<()>>);
impl RpcHandlers {
/// Starts an RPC query.
///
/// The query is passed as a string and must be a JSON text similar to what an HTTP client
/// would for example send.
/// The query is passed as a string and must be valid JSON-RPC request object.
///
/// Returns a `Future` that contains the optional response.
/// Returns a response and a stream if the call successful, fails if the
/// query could not be decoded as a JSON-RPC request object.
///
/// If the request subscribes you to events, the `Sender` in the `RpcSession` object is used to
/// send back spontaneous events.
pub fn rpc_query(
/// If the request subscribes you to events, the `stream` can be used to
/// retrieve the events.
pub async fn rpc_query(
&self,
mem: &RpcSession,
request: &str,
) -> Pin<Box<dyn Future<Output = Option<String>> + Send>> {
self.0.handle_request(request, mem.metadata.clone()).boxed()
json_query: &str,
) -> Result<(String, mpsc::UnboundedReceiver<String>), JsonRpseeError> {
self.0.raw_json_request(json_query).await
}
/// Provides access to the underlying `MetaIoHandler`
pub fn io_handler(
&self,
) -> Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata, sc_rpc_server::RpcMiddleware>> {
/// Provides access to the underlying `RpcModule`
pub fn handle(&self) -> Arc<RpcModule<()>> {
self.0.clone()
}
}
@@ -284,74 +282,41 @@ async fn build_network_future<
// Wrapper for HTTP and WS servers that makes sure they are properly shut down.
mod waiting {
pub struct HttpServer(pub Option<sc_rpc_server::HttpServer>);
impl Drop for HttpServer {
fn drop(&mut self) {
if let Some(server) = self.0.take() {
server.close_handle().close();
server.wait();
}
}
}
pub struct IpcServer(pub Option<sc_rpc_server::IpcServer>);
impl Drop for IpcServer {
fn drop(&mut self) {
if let Some(server) = self.0.take() {
server.close_handle().close();
let _ = server.wait();
// This doesn't not wait for the server to be stopped but fires the signal.
let _ = server.stop();
}
}
}
pub struct WsServer(pub Option<sc_rpc_server::WsServer>);
impl Drop for WsServer {
fn drop(&mut self) {
if let Some(server) = self.0.take() {
server.close_handle().close();
let _ = server.wait();
// This doesn't not wait for the server to be stopped but fires the signal.
let _ = server.stop();
}
}
}
}
/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them
/// alive.
fn start_rpc_servers<
H: FnMut(
sc_rpc::DenyUnsafe,
sc_rpc_server::RpcMiddleware,
) -> Result<sc_rpc_server::RpcHandler<sc_rpc::Metadata>, Error>,
>(
/// Starts RPC servers.
fn start_rpc_servers<R>(
config: &Configuration,
mut gen_handler: H,
rpc_metrics: Option<sc_rpc_server::RpcMetrics>,
server_metrics: sc_rpc_server::ServerMetrics,
) -> Result<Box<dyn std::any::Any + Send>, Error> {
fn maybe_start_server<T, F>(
address: Option<SocketAddr>,
mut start: F,
) -> Result<Option<T>, Error>
where
F: FnMut(&SocketAddr) -> Result<T, Error>,
{
address
.map(|mut address| {
start(&address).or_else(|e| match e {
Error::Io(e) => match e.kind() {
io::ErrorKind::AddrInUse | io::ErrorKind::PermissionDenied => {
warn!("Unable to bind RPC server to {}. Trying random port.", address);
address.set_port(0);
start(&address)
},
_ => Err(e.into()),
},
e => Err(e),
})
})
.transpose()
}
gen_rpc_module: R,
rpc_id_provider: Option<Box<dyn RpcSubscriptionIdProvider>>,
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error>
where
R: Fn(sc_rpc::DenyUnsafe) -> Result<RpcModule<()>, Error>,
{
let (max_request_size, ws_max_response_size, http_max_response_size) =
legacy_cli_parsing(config);
fn deny_unsafe(addr: &SocketAddr, methods: &RpcMethods) -> sc_rpc::DenyUnsafe {
fn deny_unsafe(addr: SocketAddr, methods: &RpcMethods) -> sc_rpc::DenyUnsafe {
let is_exposed_addr = !addr.ip().is_loopback();
match (is_exposed_addr, methods) {
| (_, RpcMethods::Unsafe) | (false, RpcMethods::Auto) => sc_rpc::DenyUnsafe::No,
@@ -359,85 +324,54 @@ fn start_rpc_servers<
}
}
let rpc_method_names = sc_rpc_server::method_names(|m| gen_handler(sc_rpc::DenyUnsafe::No, m))?;
Ok(Box::new((
config
.rpc_ipc
.as_ref()
.map(|path| {
sc_rpc_server::start_ipc(
&*path,
gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(
rpc_metrics.clone(),
rpc_method_names.clone(),
"ipc",
),
)?,
server_metrics.clone(),
)
.map_err(Error::from)
})
.transpose()?,
maybe_start_server(config.rpc_http, |address| {
sc_rpc_server::start_http(
address,
config.rpc_cors.as_ref(),
gen_handler(
deny_unsafe(address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(
rpc_metrics.clone(),
rpc_method_names.clone(),
"http",
),
)?,
config.rpc_max_payload,
config.tokio_handle.clone(),
)
.map_err(Error::from)
})?
.map(|s| waiting::HttpServer(Some(s))),
maybe_start_server(config.rpc_ws, |address| {
sc_rpc_server::start_ws(
address,
config.rpc_ws_max_connections,
config.rpc_cors.as_ref(),
gen_handler(
deny_unsafe(address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(
rpc_metrics.clone(),
rpc_method_names.clone(),
"ws",
),
)?,
config.rpc_max_payload,
config.ws_max_out_buffer_capacity,
server_metrics.clone(),
config.tokio_handle.clone(),
)
.map_err(Error::from)
})?
.map(|s| waiting::WsServer(Some(s))),
)))
}
let random_port = |mut addr: SocketAddr| {
addr.set_port(0);
addr
};
/// An RPC session. Used to perform in-memory RPC queries (ie. RPC queries that don't go through
/// the HTTP or WebSockets server).
#[derive(Clone)]
pub struct RpcSession {
metadata: sc_rpc::Metadata,
}
let ws_addr = config
.rpc_ws
.unwrap_or_else(|| "127.0.0.1:9944".parse().expect("valid sockaddr; qed"));
let ws_addr2 = random_port(ws_addr);
let http_addr = config
.rpc_http
.unwrap_or_else(|| "127.0.0.1:9933".parse().expect("valid sockaddr; qed"));
let http_addr2 = random_port(http_addr);
impl RpcSession {
/// Creates an RPC session.
///
/// The `sender` is stored inside the `RpcSession` and is used to communicate spontaneous JSON
/// messages.
///
/// The `RpcSession` must be kept alive in order to receive messages on the sender.
pub fn new(sender: futures::channel::mpsc::UnboundedSender<String>) -> RpcSession {
RpcSession { metadata: sender.into() }
let metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;
let http_fut = sc_rpc_server::start_http(
[http_addr, http_addr2],
config.rpc_cors.as_ref(),
max_request_size,
http_max_response_size,
metrics.clone(),
gen_rpc_module(deny_unsafe(ws_addr, &config.rpc_methods))?,
config.tokio_handle.clone(),
);
let ws_config = WsConfig {
max_connections: config.rpc_ws_max_connections,
max_payload_in_mb: max_request_size,
max_payload_out_mb: ws_max_response_size,
max_subs_per_conn: config.rpc_max_subs_per_conn,
};
let ws_fut = sc_rpc_server::start_ws(
[ws_addr, ws_addr2],
config.rpc_cors.as_ref(),
ws_config,
metrics,
gen_rpc_module(deny_unsafe(http_addr, &config.rpc_methods))?,
config.tokio_handle.clone(),
rpc_id_provider,
);
match tokio::task::block_in_place(|| {
config.tokio_handle.block_on(futures::future::try_join(http_fut, ws_fut))
}) {
Ok((http, ws)) => Ok(Box::new((http, ws))),
Err(e) => Err(Error::Application(e)),
}
}
@@ -545,6 +479,44 @@ where
}
}
fn legacy_cli_parsing(config: &Configuration) -> (Option<usize>, Option<usize>, Option<usize>) {
let ws_max_response_size = config.ws_max_out_buffer_capacity.map(|max| {
eprintln!("DEPRECATED: `--ws_max_out_buffer_capacity` has been removed use `rpc-max-response-size or rpc-max-request-size` instead");
eprintln!("Setting WS `rpc-max-response-size` to `max(ws_max_out_buffer_capacity, rpc_max_response_size)`");
std::cmp::max(max, config.rpc_max_response_size.unwrap_or(0))
});
let max_request_size = match (config.rpc_max_payload, config.rpc_max_request_size) {
(Some(legacy_max), max) => {
eprintln!("DEPRECATED: `--rpc_max_payload` has been removed use `rpc-max-response-size or rpc-max-request-size` instead");
eprintln!(
"Setting `rpc-max-response-size` to `max(rpc_max_payload, rpc_max_request_size)`"
);
Some(std::cmp::max(legacy_max, max.unwrap_or(0)))
},
(None, Some(max)) => Some(max),
(None, None) => None,
};
let http_max_response_size = match (config.rpc_max_payload, config.rpc_max_request_size) {
(Some(legacy_max), max) => {
eprintln!("DEPRECATED: `--rpc_max_payload` has been removed use `rpc-max-response-size or rpc-max-request-size` instead");
eprintln!(
"Setting HTTP `rpc-max-response-size` to `max(rpc_max_payload, rpc_max_response_size)`"
);
Some(std::cmp::max(legacy_max, max.unwrap_or(0)))
},
(None, Some(max)) => Some(max),
(None, None) => None,
};
if config.rpc_ipc.is_some() {
eprintln!("DEPRECATED: `--ipc-path` has no effect anymore IPC support has been removed");
}
(max_request_size, ws_max_response_size, http_max_response_size)
}
#[cfg(test)]
mod tests {
use super::*;