grandpa-rpc don't share subscription manager, only executor (#7039)

* service builder: fix todo about jsonrpc Option workaround

* grandpa-rpc: only share executor instead of sub manager

* grandpa-rpc: fix compilation

* grandpa-rpc: rename to subscription_executor

* node/cli: remove another unused jsonrpc dependency

* grandpa: apply style fixes from code review

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Jon Häggblad
2020-09-14 13:02:37 +02:00
committed by GitHub
parent f9441b3ea5
commit 2da6adfdf1
7 changed files with 42 additions and 35 deletions
@@ -19,6 +19,7 @@
//! RPC API for GRANDPA.
#![warn(missing_docs)]
use std::sync::Arc;
use futures::{FutureExt, TryFutureExt, TryStreamExt, StreamExt};
use log::warn;
use jsonrpc_derive::rpc;
@@ -27,6 +28,7 @@ use jsonrpc_core::futures::{
sink::Sink as Sink01,
stream::Stream as Stream01,
future::Future as Future01,
future::Executor as Executor01,
};
mod error;
@@ -92,12 +94,16 @@ pub struct GrandpaRpcHandler<AuthoritySet, VoterState, Block: BlockT> {
impl<AuthoritySet, VoterState, Block: BlockT> GrandpaRpcHandler<AuthoritySet, VoterState, Block> {
/// Creates a new GrandpaRpcHandler instance.
pub fn new(
pub fn new<E>(
authority_set: AuthoritySet,
voter_state: VoterState,
justification_stream: GrandpaJustificationStream<Block>,
manager: SubscriptionManager,
) -> Self {
executor: E,
) -> Self
where
E: Executor01<Box<dyn Future01<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
{
let manager = SubscriptionManager::new(Arc::new(executor));
Self {
authority_set,
voter_state,
@@ -232,13 +238,12 @@ mod tests {
VoterState: ReportVoterState + Send + Sync + 'static,
{
let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();
let manager = SubscriptionManager::new(Arc::new(sc_rpc::testing::TaskExecutor));
let handler = GrandpaRpcHandler::new(
TestAuthoritySet,
voter_state,
justification_stream,
manager,
sc_rpc::testing::TaskExecutor,
);
let mut io = jsonrpc_core::MetaIoHandler::default();
+24 -16
View File
@@ -46,7 +46,7 @@ use sp_runtime::traits::{
};
use sp_api::{ProvideRuntimeApi, CallApiAt};
use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo};
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;
use wasm_timer::SystemTime;
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::MaintainedTransactionPool;
@@ -73,17 +73,25 @@ pub trait RpcExtensionBuilder {
/// Returns an instance of the RPC extension for a particular `DenyUnsafe`
/// value, e.g. the RPC extension might not expose some unsafe methods.
fn build(&self, deny: sc_rpc::DenyUnsafe, subscriptions: SubscriptionManager) -> Self::Output;
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output;
}
impl<F, R> RpcExtensionBuilder for F where
F: Fn(sc_rpc::DenyUnsafe, SubscriptionManager) -> R,
F: Fn(sc_rpc::DenyUnsafe, sc_rpc::SubscriptionTaskExecutor) -> R,
R: sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
type Output = R;
fn build(&self, deny: sc_rpc::DenyUnsafe, subscriptions: SubscriptionManager) -> Self::Output {
(*self)(deny, subscriptions)
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output {
(*self)(deny, subscription_executor)
}
}
@@ -97,7 +105,11 @@ impl<R> RpcExtensionBuilder for NoopRpcExtensionBuilder<R> where
{
type Output = R;
fn build(&self, _deny: sc_rpc::DenyUnsafe, _subscriptions: SubscriptionManager) -> Self::Output {
fn build(
&self,
_deny: sc_rpc::DenyUnsafe,
_subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output {
self.0.clone()
}
}
@@ -694,7 +706,7 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
};
let task_executor = sc_rpc::SubscriptionTaskExecutor::new(spawn_handle);
let subscriptions = SubscriptionManager::new(Arc::new(task_executor));
let subscriptions = SubscriptionManager::new(Arc::new(task_executor.clone()));
let (chain, state, child_state) = if let (Some(remote_blockchain), Some(on_demand)) =
(remote_blockchain, on_demand) {
@@ -723,20 +735,16 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
let author = sc_rpc::author::Author::new(
client,
transaction_pool,
subscriptions.clone(),
subscriptions,
keystore,
deny_unsafe,
);
let system = system::System::new(system_info, system_rpc_tx, deny_unsafe);
let maybe_offchain_rpc = offchain_storage
.map(|storage| {
let maybe_offchain_rpc = offchain_storage.map(|storage| {
let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe);
// FIXME: Use plain Option (don't collect into HashMap) when we upgrade to jsonrpc 14.1
// https://github.com/paritytech/jsonrpc/commit/20485387ed06a48f1a70bf4d609a7cde6cf0accf
let delegate = offchain::OffchainApi::to_delegate(offchain);
delegate.into_iter().collect::<HashMap<_, _>>()
}).unwrap_or_default();
offchain::OffchainApi::to_delegate(offchain)
});
sc_rpc_server::rpc_handler((
state::StateApi::to_delegate(state),
@@ -745,7 +753,7 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, subscriptions),
rpc_extensions_builder.build(deny_unsafe, task_executor),
))
}