Adjust the RPC service for the WASM-browser use case (#3013)

* Use SpawnTaskHandle to pass to the RPC

* Create the RPC server in lib.rs

* Create the RPC servers in a separate function

* Keep a local version of the RPC handlers

* Make rpc-servers compile for WASM

* Add RpcSesssion

* Clean up

* Address review

* Address pull request review
This commit is contained in:
Pierre Krieger
2019-07-05 09:55:50 +02:00
committed by Gavin Wood
parent 13164304b3
commit 8bca52128f
5 changed files with 199 additions and 160 deletions
+102 -47
View File
@@ -91,6 +91,7 @@ pub struct Service<Components: components::Components> {
to_poll: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
/// Configuration of this Service
pub config: FactoryFullConfiguration<Components::Factory>,
rpc_handlers: rpc::RpcHandler,
_rpc: Box<dyn std::any::Any + Send + Sync>,
_telemetry: Option<tel::Telemetry>,
_telemetry_on_connect_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>>,
@@ -442,37 +443,24 @@ impl<Components: components::Components> Service<Components> {
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));
// RPC
let system_info = rpc::apis::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.into(),
impl_version: config.impl_version.into(),
properties: config.chain_spec.properties(),
};
let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded();
struct ExecutorWithTx(mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>);
impl futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for ExecutorWithTx {
fn execute(
&self,
future: Box<dyn Future<Item = (), Error = ()> + Send>
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
self.0.unbounded_send(future)
.map_err(|err| {
let kind = futures::future::ExecuteErrorKind::Shutdown;
futures::future::ExecuteError::new(kind, err.into_inner())
})
}
}
let rpc = Components::RuntimeServices::start_rpc(
client.clone(),
system_rpc_tx,
system_info,
config.rpc_http,
config.rpc_ws,
config.rpc_ws_max_connections,
config.rpc_cors.clone(),
Arc::new(ExecutorWithTx(to_spawn_tx.clone())),
transaction_pool.clone(),
)?;
let gen_handler = || {
let system_info = rpc::apis::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.into(),
impl_version: config.impl_version.into(),
properties: config.chain_spec.properties(),
};
Components::RuntimeServices::start_rpc(
client.clone(),
system_rpc_tx.clone(),
system_info.clone(),
Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone() }),
transaction_pool.clone(),
)
};
let rpc_handlers = gen_handler();
let rpc = start_rpc_servers::<Components::Factory, _>(&config, gen_handler)?;
let _ = to_spawn_tx.unbounded_send(Box::new(build_system_rpc_handler::<Components>(
network.clone(),
system_rpc_rx,
@@ -534,7 +522,8 @@ impl<Components: components::Components> Service<Components> {
keystore,
config,
exit,
_rpc: Box::new(rpc),
rpc_handlers,
_rpc: rpc,
_telemetry: telemetry,
_offchain_workers: offchain_workers,
_telemetry_on_connect_sinks: telemetry_connection_sinks.clone(),
@@ -578,6 +567,20 @@ impl<Components: components::Components> Service<Components> {
}
}
/// Starts an RPC query.
///
/// The query is passed as a string and must be a JSON text similar to what an HTTP client
/// would for example send.
///
/// Returns a `Future` that contains the optional response.
///
/// If the request subscribes you to events, the `Sender` in the `RpcSession` object is used to
/// send back spontaneous events.
pub fn rpc_query(&self, mem: &RpcSession, request: &str)
-> impl Future<Item = Option<String>, Error = ()> {
self.rpc_handlers.handle_request(request, mem.metadata.clone())
}
/// Get shared client instance.
pub fn client(&self) -> Arc<ComponentClient<Components>> {
self.client.clone()
@@ -715,22 +718,74 @@ impl<Components> Drop for Service<Components> where Components: components::Comp
}
}
fn maybe_start_server<T, F>(address: Option<SocketAddr>, start: F) -> Result<Option<T>, io::Error>
where F: Fn(&SocketAddr) -> Result<T, io::Error>,
{
Ok(match address {
Some(mut address) => Some(start(&address)
.or_else(|e| match e.kind() {
io::ErrorKind::AddrInUse |
io::ErrorKind::PermissionDenied => {
warn!("Unable to bind server to {}. Trying random port.", address);
address.set_port(0);
start(&address)
},
_ => Err(e),
})?),
None => None,
})
/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(not(target_os = "unknown"))]
fn start_rpc_servers<F: ServiceFactory, H: FnMut() -> rpc::RpcHandler>(
config: &FactoryFullConfiguration<F>,
mut gen_handler: H
) -> Result<Box<std::any::Any + Send + Sync>, error::Error> {
fn maybe_start_server<T, F>(address: Option<SocketAddr>, mut start: F) -> Result<Option<T>, io::Error>
where F: FnMut(&SocketAddr) -> Result<T, io::Error>,
{
Ok(match address {
Some(mut address) => Some(start(&address)
.or_else(|e| match e.kind() {
io::ErrorKind::AddrInUse |
io::ErrorKind::PermissionDenied => {
warn!("Unable to bind server to {}. Trying random port.", address);
address.set_port(0);
start(&address)
},
_ => Err(e),
})?),
None => None,
})
}
Ok(Box::new((
maybe_start_server(
config.rpc_http,
|address| rpc::start_http(address, config.rpc_cors.as_ref(), gen_handler()),
)?,
maybe_start_server(
config.rpc_ws,
|address| rpc::start_ws(
address,
config.rpc_ws_max_connections,
config.rpc_cors.as_ref(),
gen_handler(),
),
)?.map(Mutex::new),
)))
}
/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive.
#[cfg(target_os = "unknown")]
fn start_rpc_servers<F: ServiceFactory, H: FnMut() -> rpc::RpcHandler>(
_: &FactoryFullConfiguration<F>,
_: H
) -> Result<Box<std::any::Any + Send + Sync>, error::Error> {
Ok(Box::new(()))
}
/// An RPC session. Used to perform in-memory RPC queries (ie. RPC queries that don't go through
/// the HTTP or WebSockets server).
pub struct RpcSession {
metadata: rpc::Metadata,
}
impl RpcSession {
/// Creates an RPC session.
///
/// The `sender` is stored inside the `RpcSession` and is used to communicate spontaneous JSON
/// messages.
///
/// The `RpcSession` must be kept alive in order to receive messages on the sender.
pub fn new(sender: mpsc::Sender<String>) -> RpcSession {
RpcSession {
metadata: rpc::Metadata::new(sender),
}
}
}
/// Transaction pool adapter.