rpc server: break legacy CLI options and remove "backward compatible HTTP server" (#13384)

* jsonrpsee v0.16

* breaking: remove old CLI configs

* remove patch.crates-io

* fix bad merge

* fix clippy

* fix bad merge

* fix grumbles

* Update client/service/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* revert block_in_place

* add issue link in todo

* Update client/cli/src/config.rs

Co-authored-by: Dmitry Markin <dmitry@markin.tech>

* grumbles: add ipv6 loopback address

* Revert "grumbles: add ipv6 loopback address"

This reverts commit 3a0b1ece6c4e36055d666896c29d1da55ffa1c4f.

* remove nits

* bump zombienet version

* adress grumbles: provide structopt default_val_t

* remove duplicate from structopt

* bump zombienet v1.3.47

* bump zombienet version

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Javier Viola <javier@parity.io>
This commit is contained in:
Niklas Adolfsson
2023-05-03 17:18:25 +02:00
committed by GitHub
parent b8a01127d7
commit 5a1074712a
11 changed files with 149 additions and 393 deletions
+59 -117
View File
@@ -20,6 +20,9 @@
#![warn(missing_docs)]
pub mod middleware;
use http::header::HeaderValue;
use jsonrpsee::{
server::{
middleware::proxy_get_request::ProxyGetRequestLayer, AllowHosts, ServerBuilder,
@@ -28,119 +31,62 @@ use jsonrpsee::{
RpcModule,
};
use std::{error::Error as StdError, net::SocketAddr};
use tower_http::cors::{AllowOrigin, CorsLayer};
pub use crate::middleware::RpcMetrics;
use http::header::HeaderValue;
pub use jsonrpsee::core::{
id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
traits::IdProvider,
};
use tower_http::cors::{AllowOrigin, CorsLayer};
const MEGABYTE: usize = 1024 * 1024;
const MEGABYTE: u32 = 1024 * 1024;
/// Maximal payload accepted by RPC servers.
pub const RPC_MAX_PAYLOAD_DEFAULT: usize = 15 * MEGABYTE;
/// Default maximum number of connections for WS RPC servers.
const WS_MAX_CONNECTIONS: usize = 100;
/// Default maximum number subscriptions per connection for WS RPC servers.
const WS_MAX_SUBS_PER_CONN: usize = 1024;
pub mod middleware;
/// Type alias JSON-RPC server
/// Type alias for the JSON-RPC server.
pub type Server = ServerHandle;
/// Server config.
#[derive(Debug, Clone)]
pub struct WsConfig {
/// RPC server configuration.
#[derive(Debug)]
pub struct Config<'a, M: Send + Sync + 'static> {
/// Socket addresses.
pub addrs: [SocketAddr; 2],
/// CORS.
pub cors: Option<&'a Vec<String>>,
/// Maximum connections.
pub max_connections: Option<usize>,
pub max_connections: u32,
/// Maximum subscriptions per connection.
pub max_subs_per_conn: Option<usize>,
pub max_subs_per_conn: u32,
/// Maximum rpc request payload size.
pub max_payload_in_mb: Option<usize>,
pub max_payload_in_mb: u32,
/// Maximum rpc response payload size.
pub max_payload_out_mb: Option<usize>,
pub max_payload_out_mb: u32,
/// Metrics.
pub metrics: Option<RpcMetrics>,
/// RPC API.
pub rpc_api: RpcModule<M>,
/// Subscription ID provider.
pub id_provider: Option<Box<dyn IdProvider>>,
/// Tokio runtime handle.
pub tokio_handle: tokio::runtime::Handle,
}
impl WsConfig {
// Deconstructs the config to get the finalized inner values.
//
// `Payload size` or `max subs per connection` bigger than u32::MAX will be truncated.
fn deconstruct(self) -> (u32, u32, u32, u32) {
let max_conns = self.max_connections.unwrap_or(WS_MAX_CONNECTIONS) as u32;
let max_payload_in_mb = payload_size_or_default(self.max_payload_in_mb) as u32;
let max_payload_out_mb = payload_size_or_default(self.max_payload_out_mb) as u32;
let max_subs_per_conn = self.max_subs_per_conn.unwrap_or(WS_MAX_SUBS_PER_CONN) as u32;
(max_payload_in_mb, max_payload_out_mb, max_conns, max_subs_per_conn)
}
}
/// Start HTTP server listening on given address.
pub async fn start_http<M: Send + Sync + 'static>(
addrs: [SocketAddr; 2],
cors: Option<&Vec<String>>,
max_payload_in_mb: Option<usize>,
max_payload_out_mb: Option<usize>,
metrics: Option<RpcMetrics>,
rpc_api: RpcModule<M>,
rt: tokio::runtime::Handle,
/// Start RPC server listening on given address.
pub async fn start_server<M: Send + Sync + 'static>(
config: Config<'_, M>,
) -> Result<ServerHandle, Box<dyn StdError + Send + Sync>> {
let max_payload_in = payload_size_or_default(max_payload_in_mb) as u32;
let max_payload_out = payload_size_or_default(max_payload_out_mb) as u32;
let host_filter = hosts_filter(cors.is_some(), &addrs);
let Config {
addrs,
cors,
max_payload_in_mb,
max_payload_out_mb,
max_connections,
max_subs_per_conn,
metrics,
id_provider,
tokio_handle,
rpc_api,
} = config;
let middleware = tower::ServiceBuilder::new()
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
.layer(try_into_cors(cors)?);
let builder = ServerBuilder::new()
.max_request_body_size(max_payload_in)
.max_response_body_size(max_payload_out)
.set_host_filtering(host_filter)
.set_middleware(middleware)
.custom_tokio_runtime(rt)
.http_only();
let rpc_api = build_rpc_api(rpc_api);
let (handle, addr) = if let Some(metrics) = metrics {
let server = builder.set_logger(metrics).build(&addrs[..]).await?;
let addr = server.local_addr();
(server.start(rpc_api)?, addr)
} else {
let server = builder.build(&addrs[..]).await?;
let addr = server.local_addr();
(server.start(rpc_api)?, addr)
};
log::info!(
"Running JSON-RPC HTTP server: addr={}, allowed origins={}",
addr.map_or_else(|_| "unknown".to_string(), |a| a.to_string()),
format_cors(cors)
);
Ok(handle)
}
/// Start a JSON-RPC server listening on given address that supports both HTTP and WS.
pub async fn start<M: Send + Sync + 'static>(
addrs: [SocketAddr; 2],
cors: Option<&Vec<String>>,
ws_config: WsConfig,
metrics: Option<RpcMetrics>,
rpc_api: RpcModule<M>,
rt: tokio::runtime::Handle,
id_provider: Option<Box<dyn IdProvider>>,
) -> Result<ServerHandle, Box<dyn StdError + Send + Sync>> {
let (max_payload_in, max_payload_out, max_connections, max_subs_per_conn) =
ws_config.deconstruct();
let host_filter = hosts_filter(cors.is_some(), &addrs);
let host_filter = hosts_filtering(cors.is_some(), &addrs);
let middleware = tower::ServiceBuilder::new()
// Proxy `GET /health` requests to internal `system_health` method.
@@ -148,14 +94,14 @@ pub async fn start<M: Send + Sync + 'static>(
.layer(try_into_cors(cors)?);
let mut builder = ServerBuilder::new()
.max_request_body_size(max_payload_in)
.max_response_body_size(max_payload_out)
.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subs_per_conn)
.ping_interval(std::time::Duration::from_secs(30))
.set_host_filtering(host_filter)
.set_middleware(middleware)
.custom_tokio_runtime(rt);
.custom_tokio_runtime(tokio_handle);
if let Some(provider) = id_provider {
builder = builder.set_id_provider(provider);
@@ -175,7 +121,7 @@ pub async fn start<M: Send + Sync + 'static>(
};
log::info!(
"Running JSON-RPC WS server: addr={}, allowed origins={}",
"Running JSON-RPC server: addr={}, allowed origins={}",
addr.map_or_else(|_| "unknown".to_string(), |a| a.to_string()),
format_cors(cors)
);
@@ -183,6 +129,20 @@ pub async fn start<M: Send + Sync + 'static>(
Ok(handle)
}
fn hosts_filtering(enabled: bool, addrs: &[SocketAddr]) -> AllowHosts {
if enabled {
// NOTE The listening addresses are whitelisted by default.
let mut hosts = Vec::with_capacity(addrs.len() * 2);
for addr in addrs {
hosts.push(format!("localhost:{}", addr.port()).into());
hosts.push(format!("127.0.0.1:{}", addr.port()).into());
}
AllowHosts::Only(hosts)
} else {
AllowHosts::Any
}
}
fn build_rpc_api<M: Send + Sync + 'static>(mut rpc_api: RpcModule<M>) -> RpcModule<M> {
let mut available_methods = rpc_api.method_names().collect::<Vec<_>>();
available_methods.sort();
@@ -198,24 +158,6 @@ fn build_rpc_api<M: Send + Sync + 'static>(mut rpc_api: RpcModule<M>) -> RpcModu
rpc_api
}
fn payload_size_or_default(size_mb: Option<usize>) -> usize {
size_mb.map_or(RPC_MAX_PAYLOAD_DEFAULT, |mb| mb.saturating_mul(MEGABYTE))
}
fn hosts_filter(enabled: bool, addrs: &[SocketAddr]) -> AllowHosts {
if enabled {
// NOTE The listening addresses are whitelisted by default.
let mut hosts = Vec::with_capacity(addrs.len() * 2);
for addr in addrs {
hosts.push(format!("localhost:{}", addr.port()).into());
hosts.push(format!("127.0.0.1:{}", addr.port()).into());
}
AllowHosts::Only(hosts)
} else {
AllowHosts::Any
}
}
fn try_into_cors(
maybe_cors: Option<&Vec<String>>,
) -> Result<CorsLayer, Box<dyn StdError + Send + Sync>> {