Custom RPC implementation for node. (#3109)

* Allow RPCs to be customized.

* Implement node-rpc extensions.

* Working on a test.

* Add node-testing crate.

* Fix genesis test config

* Fix nonce lookups.

* Clean up.

* Fix expected block type.

* Make the RPC extension function optional.

* Fix service doc test.

* Bump jsonrpc.

* Bump client version.

* Update Cargo.lock

* Update jsonrpc.

* Fix build.

* Remove unused imports.

* Fix signed extra.

* Post merge clean up.

* Fix tests.

* Patch hashmap-core.

* Fix build.

* Fix build.

* Remove hashmap_core patches.
This commit is contained in:
Tomasz Drwięga
2019-08-20 11:06:35 +02:00
committed by Gavin Wood
parent 95abffc8e4
commit 56296386ab
29 changed files with 838 additions and 278 deletions
+68 -21
View File
@@ -34,7 +34,7 @@ use sr_primitives::{
};
use crate::config::Configuration;
use primitives::{Blake2Hasher, H256, traits::BareCryptoStorePtr};
use rpc::{self, apis::system::SystemInfo};
use rpc::{self, system::SystemInfo};
use futures::{prelude::*, future::Executor};
use futures03::{FutureExt as _, channel::mpsc, compat::Compat};
@@ -145,6 +145,9 @@ pub type PoolApi<C> = <C as Components>::TransactionPoolApi;
pub trait RuntimeGenesis: Serialize + DeserializeOwned + BuildStorage {}
impl<T: Serialize + DeserializeOwned + BuildStorage> RuntimeGenesis for T {}
/// A transport-agnostic handler of the RPC queries.
pub type RpcHandler = rpc_servers::RpcHandler<rpc::Metadata>;
/// Something that can create and store initial session keys from given seeds.
pub trait InitialSessionKeys<C: Components> {
/// Generate the initial session keys for the given seeds and store them in
@@ -168,46 +171,51 @@ impl<C: Components> InitialSessionKeys<Self> for C where
}
/// Something that can start the RPC service.
pub trait StartRPC<C: Components> {
pub trait StartRpc<C: Components> {
fn start_rpc(
client: Arc<ComponentClient<C>>,
system_send_back: mpsc::UnboundedSender<rpc::apis::system::Request<ComponentBlock<C>>>,
system_send_back: mpsc::UnboundedSender<rpc::system::Request<ComponentBlock<C>>>,
system_info: SystemInfo,
task_executor: TaskExecutor,
transaction_pool: Arc<TransactionPool<C::TransactionPoolApi>>,
rpc_extensions: impl rpc::RpcExtension<rpc::Metadata>,
keystore: KeyStorePtr,
) -> rpc::RpcHandler;
) -> RpcHandler;
}
impl<C: Components> StartRPC<Self> for C where
impl<C: Components> StartRpc<C> for C where
ComponentClient<C>: ProvideRuntimeApi,
<ComponentClient<C> as ProvideRuntimeApi>::Api:
runtime_api::Metadata<ComponentBlock<C>> + session::SessionKeys<ComponentBlock<C>>,
{
fn start_rpc(
client: Arc<ComponentClient<C>>,
system_send_back: mpsc::UnboundedSender<rpc::apis::system::Request<ComponentBlock<C>>>,
system_send_back: mpsc::UnboundedSender<rpc::system::Request<ComponentBlock<C>>>,
rpc_system_info: SystemInfo,
task_executor: TaskExecutor,
transaction_pool: Arc<TransactionPool<C::TransactionPoolApi>>,
rpc_extensions: impl rpc::RpcExtension<rpc::Metadata>,
keystore: KeyStorePtr,
) -> rpc::RpcHandler {
let subscriptions = rpc::apis::Subscriptions::new(task_executor.clone());
let chain = rpc::apis::chain::Chain::new(client.clone(), subscriptions.clone());
let state = rpc::apis::state::State::new(client.clone(), subscriptions.clone());
let author = rpc::apis::author::Author::new(
) -> RpcHandler {
use rpc::{chain, state, author, system};
let subscriptions = rpc::Subscriptions::new(task_executor.clone());
let chain = chain::Chain::new(client.clone(), subscriptions.clone());
let state = state::State::new(client.clone(), subscriptions.clone());
let author = rpc::author::Author::new(
client,
transaction_pool,
subscriptions,
keystore,
);
let system = rpc::apis::system::System::new(rpc_system_info, system_send_back);
rpc::rpc_handler::<ComponentBlock<C>, ComponentExHash<C>, _, _, _, _>(
state,
chain,
author,
system,
)
let system = system::System::new(rpc_system_info, system_send_back);
rpc_servers::rpc_handler((
state::StateApi::to_delegate(state),
chain::ChainApi::to_delegate(chain),
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions,
))
}
}
@@ -299,7 +307,7 @@ pub trait ServiceTrait<C: Components>:
Deref<Target = Service<C>>
+ Send
+ 'static
+ StartRPC<C>
+ StartRpc<C>
+ MaintainTransactionPool<C>
+ OffchainWorker<C>
+ InitialSessionKeys<C>
@@ -308,7 +316,7 @@ impl<C: Components, T> ServiceTrait<C> for T where
T: Deref<Target = Service<C>>
+ Send
+ 'static
+ StartRPC<C>
+ StartRpc<C>
+ MaintainTransactionPool<C>
+ OffchainWorker<C>
+ InitialSessionKeys<C>
@@ -335,6 +343,8 @@ pub trait ServiceFactory: 'static + Sized {
type Genesis: RuntimeGenesis;
/// Other configuration for service members.
type Configuration: Default;
/// RPC initialisation.
type RpcExtensions: rpc::RpcExtension<rpc::Metadata>;
/// Extended full service type.
type FullService: ServiceTrait<FullComponents<Self>>;
/// Extended light service type.
@@ -407,6 +417,18 @@ pub trait ServiceFactory: 'static + Sized {
Err("Chain Specification doesn't contain any consensus_engine name".into())
}
}
/// Create custom RPC method handlers for full node.
fn build_full_rpc_extensions(
client: Arc<FullClient<Self>>,
transaction_pool: Arc<TransactionPool<Self::FullTransactionPoolApi>>,
) -> Self::RpcExtensions;
/// Create custom RPC method handlers for light node.
fn build_light_rpc_extensions(
client: Arc<LightClient<Self>>,
transaction_pool: Arc<TransactionPool<Self::LightTransactionPoolApi>>,
) -> Self::RpcExtensions;
}
/// A collection of types and function to generalize over full / light client type.
@@ -419,8 +441,10 @@ pub trait Components: Sized + 'static {
type Executor: 'static + client::CallExecutor<FactoryBlock<Self::Factory>, Blake2Hasher> + Send + Sync + Clone;
/// The type that implements the runtime API.
type RuntimeApi: Send + Sync;
/// A type that can start all runtime-dependent services.
/// The type that can start all runtime-dependent services.
type RuntimeServices: ServiceTrait<Self>;
/// The type that can extend the RPC methods.
type RpcExtensions: rpc::RpcExtension<rpc::Metadata>;
// TODO: Traitify transaction pool and allow people to implement their own. (#1242)
/// Extrinsic pool type.
type TransactionPoolApi: 'static + txpool::ChainApi<
@@ -468,6 +492,12 @@ pub trait Components: Sized + 'static {
config: &mut FactoryFullConfiguration<Self::Factory>,
client: Arc<ComponentClient<Self>>
) -> Result<Option<Self::SelectChain>, error::Error>;
/// Build RPC extensions
fn build_rpc_extensions(
client: Arc<ComponentClient<Self>>,
transaction_pool: Arc<TransactionPool<Self::TransactionPoolApi>>,
) -> Self::RpcExtensions;
}
/// A struct that implement `Components` for the full client.
@@ -529,6 +559,7 @@ impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
type ImportQueue = Factory::FullImportQueue;
type RuntimeApi = Factory::RuntimeApi;
type RuntimeServices = Factory::FullService;
type RpcExtensions = Factory::RpcExtensions;
type SelectChain = Factory::SelectChain;
fn build_client(
@@ -594,6 +625,13 @@ impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
) -> Result<Option<Arc<dyn FinalityProofProvider<<Self::Factory as ServiceFactory>::Block>>>, error::Error> {
Factory::build_finality_proof_provider(client)
}
fn build_rpc_extensions(
client: Arc<ComponentClient<Self>>,
transaction_pool: Arc<TransactionPool<Self::TransactionPoolApi>>,
) -> Self::RpcExtensions {
Factory::build_full_rpc_extensions(client, transaction_pool)
}
}
/// A struct that implement `Components` for the light client.
@@ -655,6 +693,7 @@ impl<Factory: ServiceFactory> Components for LightComponents<Factory> {
type ImportQueue = <Factory as ServiceFactory>::LightImportQueue;
type RuntimeApi = Factory::RuntimeApi;
type RuntimeServices = Factory::LightService;
type RpcExtensions = Factory::RpcExtensions;
type SelectChain = Factory::SelectChain;
fn build_client(
@@ -709,12 +748,20 @@ impl<Factory: ServiceFactory> Components for LightComponents<Factory> {
) -> Result<Option<Arc<dyn FinalityProofProvider<<Self::Factory as ServiceFactory>::Block>>>, error::Error> {
Ok(None)
}
fn build_select_chain(
_config: &mut FactoryFullConfiguration<Self::Factory>,
_client: Arc<ComponentClient<Self>>
) -> Result<Option<Self::SelectChain>, error::Error> {
Ok(None)
}
fn build_rpc_extensions(
client: Arc<ComponentClient<Self>>,
transaction_pool: Arc<TransactionPool<Self::TransactionPoolApi>>,
) -> Self::RpcExtensions {
Factory::build_light_rpc_extensions(client, transaction_pool)
}
}
#[cfg(test)]
+33 -14
View File
@@ -63,7 +63,7 @@ pub use components::{
FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis,
ComponentExHash, ComponentExtrinsic, FactoryExtrinsic, InitialSessionKeys,
};
use components::{StartRPC, MaintainTransactionPool, OffchainWorker};
use components::{StartRpc, MaintainTransactionPool, OffchainWorker};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
#[doc(hidden)]
@@ -101,7 +101,7 @@ pub struct Service<Components: components::Components> {
to_poll: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
/// Configuration of this Service
config: FactoryFullConfiguration<Components::Factory>,
rpc_handlers: rpc::RpcHandler,
rpc_handlers: components::RpcHandler,
_rpc: Box<dyn std::any::Any + Send + Sync>,
_telemetry: Option<tel::Telemetry>,
_telemetry_on_connect_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>>,
@@ -373,7 +373,7 @@ impl<Components: components::Components> Service<Components> {
// RPC
let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded();
let gen_handler = || {
let system_info = rpc::apis::system::SystemInfo {
let system_info = rpc::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.into(),
impl_version: config.impl_version.into(),
@@ -385,6 +385,7 @@ impl<Components: components::Components> Service<Components> {
system_info.clone(),
Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone() }),
transaction_pool.clone(),
Components::build_rpc_extensions(client.clone(), transaction_pool.clone()),
keystore.clone(),
)
};
@@ -630,7 +631,7 @@ fn build_network_future<
mut network: network::NetworkWorker<B, S, H>,
client: Arc<C>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<(NetworkStatus<B>, NetworkState)>>>>,
rpc_rx: futures03::channel::mpsc::UnboundedReceiver<rpc::apis::system::Request<B>>,
rpc_rx: futures03::channel::mpsc::UnboundedReceiver<rpc::system::Request<B>>,
should_have_peers: bool,
) -> impl Future<Item = (), Error = ()> {
// Compatibility shim while we're transitionning to stable Futures.
@@ -666,16 +667,16 @@ fn build_network_future<
// Poll the RPC requests and answer them.
while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() {
match request {
rpc::apis::system::Request::Health(sender) => {
let _ = sender.send(rpc::apis::system::Health {
rpc::system::Request::Health(sender) => {
let _ = sender.send(rpc::system::Health {
peers: network.peers_debug_info().len(),
is_syncing: network.service().is_major_syncing(),
should_have_peers,
});
},
rpc::apis::system::Request::Peers(sender) => {
rpc::system::Request::Peers(sender) => {
let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)|
rpc::apis::system::PeerInfo {
rpc::system::PeerInfo {
peer_id: peer_id.to_base58(),
roles: format!("{:?}", p.roles),
protocol_version: p.protocol_version,
@@ -684,7 +685,7 @@ fn build_network_future<
}
).collect());
}
rpc::apis::system::Request::NetworkState(sender) => {
rpc::system::Request::NetworkState(sender) => {
let _ = sender.send(network.network_state());
}
};
@@ -756,7 +757,7 @@ impl<Components> Drop for Service<Components> where Components: components::Comp
/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(not(target_os = "unknown"))]
fn start_rpc_servers<C, G, H: FnMut() -> rpc::RpcHandler>(
fn start_rpc_servers<C, G, H: FnMut() -> components::RpcHandler>(
config: &Configuration<C, G>,
mut gen_handler: H
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
@@ -781,11 +782,11 @@ fn start_rpc_servers<C, G, H: FnMut() -> rpc::RpcHandler>(
Ok(Box::new((
maybe_start_server(
config.rpc_http,
|address| rpc::start_http(address, config.rpc_cors.as_ref(), gen_handler()),
|address| rpc_servers::start_http(address, config.rpc_cors.as_ref(), gen_handler()),
)?,
maybe_start_server(
config.rpc_ws,
|address| rpc::start_ws(
|address| rpc_servers::start_ws(
address,
config.rpc_ws_max_connections,
config.rpc_cors.as_ref(),
@@ -797,7 +798,7 @@ fn start_rpc_servers<C, G, H: FnMut() -> rpc::RpcHandler>(
/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(target_os = "unknown")]
fn start_rpc_servers<C, G, H: FnMut() -> rpc::RpcHandler>(
fn start_rpc_servers<C, G, H: FnMut() -> components::RpcHandler>(
_: &Configuration<C, G>,
_: H
) -> Result<Box<std::any::Any + Send + Sync>, error::Error> {
@@ -819,7 +820,7 @@ impl RpcSession {
/// The `RpcSession` must be kept alive in order to receive messages on the sender.
pub fn new(sender: mpsc::Sender<String>) -> RpcSession {
RpcSession {
metadata: rpc::Metadata::new(sender),
metadata: sender.into(),
}
}
}
@@ -989,6 +990,7 @@ where
/// FinalityProofProvider = { |client: Arc<FullClient<Self>>| {
/// Ok(Some(Arc::new(grandpa::FinalityProofProvider::new(client.clone(), client)) as _))
/// }},
/// RpcExtensions = (),
/// }
/// }
/// ```
@@ -1015,6 +1017,8 @@ macro_rules! construct_service_factory {
SelectChain = $select_chain:ty
{ $( $select_chain_init:tt )* },
FinalityProofProvider = { $( $finality_proof_provider_init:tt )* },
RpcExtensions = $rpc_extensions_ty:ty
$( { $( $rpc_extensions:tt )* } )?,
}
) => {
$( #[$attr] )*
@@ -1035,6 +1039,7 @@ macro_rules! construct_service_factory {
type FullImportQueue = $full_import_queue;
type LightImportQueue = $light_import_queue;
type SelectChain = $select_chain;
type RpcExtensions = $rpc_extensions_ty;
fn build_full_transaction_pool(
config: $crate::TransactionPoolOptions,
@@ -1102,6 +1107,20 @@ macro_rules! construct_service_factory {
($( $authority_setup )*)(service)
})
}
fn build_full_rpc_extensions(
client: Arc<$crate::FullClient<Self>>,
transaction_pool: Arc<$crate::TransactionPool<Self::FullTransactionPoolApi>>,
) -> Self::RpcExtensions {
$( ( $( $rpc_extensions )* ) (client, transaction_pool) )?
}
fn build_light_rpc_extensions(
client: Arc<$crate::LightClient<Self>>,
transaction_pool: Arc<$crate::TransactionPool<Self::LightTransactionPoolApi>>,
) -> Self::RpcExtensions {
$( ( $( $rpc_extensions )* ) (client, transaction_pool) )?
}
}
}
}