mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-17 18:21:02 +00:00
Optimize offchain worker api by re-using http-client (#6454)
* Fix typo in offchain's docs * Use Self keyword in AsyncApi::new() * Move httpclient to be part of OffchainWorkers to optimize block import * Fix compilation errors for tests * Add wrapper struct for HyperClient * Use lazy_static share SharedClient amongst OffchainWorkers. Remove the need to raise the fd limit * Revert "Use lazy_static share SharedClient amongst OffchainWorkers. Remove the need to raise the fd limit" This reverts commit 7af97498a2383b5d7405e27823db8fd97245da41. * Add lazy_static for tests
This commit is contained in:
Generated
+1
-1
@@ -6573,12 +6573,12 @@ version = "2.0.0-rc3"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes 0.5.4",
|
"bytes 0.5.4",
|
||||||
"env_logger 0.7.1",
|
"env_logger 0.7.1",
|
||||||
"fdlimit",
|
|
||||||
"fnv",
|
"fnv",
|
||||||
"futures 0.3.4",
|
"futures 0.3.4",
|
||||||
"futures-timer 3.0.2",
|
"futures-timer 3.0.2",
|
||||||
"hyper 0.13.4",
|
"hyper 0.13.4",
|
||||||
"hyper-rustls",
|
"hyper-rustls",
|
||||||
|
"lazy_static",
|
||||||
"log",
|
"log",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
"parity-scale-codec",
|
"parity-scale-codec",
|
||||||
|
|||||||
@@ -37,12 +37,12 @@ hyper-rustls = "0.20"
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
env_logger = "0.7.0"
|
env_logger = "0.7.0"
|
||||||
fdlimit = "0.1.4"
|
|
||||||
sc-client-db = { version = "0.8.0-rc3", default-features = true, path = "../db/" }
|
sc-client-db = { version = "0.8.0-rc3", default-features = true, path = "../db/" }
|
||||||
sc-transaction-pool = { version = "2.0.0-rc3", path = "../../client/transaction-pool" }
|
sc-transaction-pool = { version = "2.0.0-rc3", path = "../../client/transaction-pool" }
|
||||||
sp-transaction-pool = { version = "2.0.0-rc3", path = "../../primitives/transaction-pool" }
|
sp-transaction-pool = { version = "2.0.0-rc3", path = "../../primitives/transaction-pool" }
|
||||||
substrate-test-runtime-client = { version = "2.0.0-rc3", path = "../../test-utils/runtime/client" }
|
substrate-test-runtime-client = { version = "2.0.0-rc3", path = "../../test-utils/runtime/client" }
|
||||||
tokio = "0.2"
|
tokio = "0.2"
|
||||||
|
lazy_static = "1.4.0"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ use sp_core::offchain::{
|
|||||||
OpaqueNetworkState, OpaquePeerId, OpaqueMultiaddr, StorageKind,
|
OpaqueNetworkState, OpaquePeerId, OpaqueMultiaddr, StorageKind,
|
||||||
};
|
};
|
||||||
pub use sp_offchain::STORAGE_PREFIX;
|
pub use sp_offchain::STORAGE_PREFIX;
|
||||||
|
pub use http::SharedClient;
|
||||||
|
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
mod http;
|
mod http;
|
||||||
@@ -260,8 +261,9 @@ impl AsyncApi {
|
|||||||
db: S,
|
db: S,
|
||||||
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
|
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
|
||||||
is_validator: bool,
|
is_validator: bool,
|
||||||
) -> (Api<S>, AsyncApi) {
|
shared_client: SharedClient,
|
||||||
let (http_api, http_worker) = http::http();
|
) -> (Api<S>, Self) {
|
||||||
|
let (http_api, http_worker) = http::http(shared_client);
|
||||||
|
|
||||||
let api = Api {
|
let api = Api {
|
||||||
db,
|
db,
|
||||||
@@ -270,7 +272,7 @@ impl AsyncApi {
|
|||||||
http: http_api,
|
http: http_api,
|
||||||
};
|
};
|
||||||
|
|
||||||
let async_api = AsyncApi {
|
let async_api = Self {
|
||||||
http: Some(http_worker),
|
http: Some(http_worker),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -308,11 +310,14 @@ mod tests {
|
|||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
let db = LocalStorage::new_test();
|
let db = LocalStorage::new_test();
|
||||||
let mock = Arc::new(MockNetworkStateInfo());
|
let mock = Arc::new(MockNetworkStateInfo());
|
||||||
|
let shared_client = SharedClient::new();
|
||||||
|
|
||||||
|
|
||||||
AsyncApi::new(
|
AsyncApi::new(
|
||||||
db,
|
db,
|
||||||
mock,
|
mock,
|
||||||
false,
|
false,
|
||||||
|
shared_client,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -33,9 +33,22 @@ use log::error;
|
|||||||
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
|
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
|
||||||
use std::{convert::TryFrom, fmt, io::Read as _, pin::Pin, task::{Context, Poll}};
|
use std::{convert::TryFrom, fmt, io::Read as _, pin::Pin, task::{Context, Poll}};
|
||||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use hyper::{Client as HyperClient, Body, client};
|
||||||
|
use hyper_rustls::HttpsConnector;
|
||||||
|
|
||||||
|
/// Wrapper struct used for keeping the hyper_rustls client running.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SharedClient(Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>);
|
||||||
|
|
||||||
|
impl SharedClient {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self(Arc::new(HyperClient::builder().build(HttpsConnector::new())))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
|
/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
|
||||||
pub fn http() -> (HttpApi, HttpWorker) {
|
pub fn http(shared_client: SharedClient) -> (HttpApi, HttpWorker) {
|
||||||
let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker");
|
let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker");
|
||||||
let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api");
|
let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api");
|
||||||
|
|
||||||
@@ -51,7 +64,7 @@ pub fn http() -> (HttpApi, HttpWorker) {
|
|||||||
let engine = HttpWorker {
|
let engine = HttpWorker {
|
||||||
to_api,
|
to_api,
|
||||||
from_api,
|
from_api,
|
||||||
http_client: hyper::Client::builder().build(hyper_rustls::HttpsConnector::new()),
|
http_client: shared_client.0,
|
||||||
requests: Vec::new(),
|
requests: Vec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -551,7 +564,7 @@ pub struct HttpWorker {
|
|||||||
/// Used to receive messages from the `HttpApi`.
|
/// Used to receive messages from the `HttpApi`.
|
||||||
from_api: TracingUnboundedReceiver<ApiToWorker>,
|
from_api: TracingUnboundedReceiver<ApiToWorker>,
|
||||||
/// The engine that runs HTTP requests.
|
/// The engine that runs HTTP requests.
|
||||||
http_client: hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
|
http_client: Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>,
|
||||||
/// HTTP requests that are being worked on by the engine.
|
/// HTTP requests that are being worked on by the engine.
|
||||||
requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
|
requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
|
||||||
}
|
}
|
||||||
@@ -685,21 +698,23 @@ impl fmt::Debug for HttpWorkerRequest {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use core::convert::Infallible;
|
use core::convert::Infallible;
|
||||||
use crate::api::timestamp;
|
use crate::api::timestamp;
|
||||||
use super::http;
|
use super::{http, SharedClient};
|
||||||
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration};
|
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
|
// Using lazy_static to avoid spawning lots of different SharedClients,
|
||||||
|
// as spawning a SharedClient is CPU-intensive and opens lots of fds.
|
||||||
|
lazy_static! {
|
||||||
|
static ref SHARED_CLIENT: SharedClient = SharedClient::new();
|
||||||
|
}
|
||||||
|
|
||||||
// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
|
// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
|
||||||
// server that runs in the background as well.
|
// server that runs in the background as well.
|
||||||
macro_rules! build_api_server {
|
macro_rules! build_api_server {
|
||||||
() => {{
|
() => {{
|
||||||
// We spawn quite a bit of HTTP servers here due to how async API
|
let hyper_client = SHARED_CLIENT.clone();
|
||||||
// works for offchain workers, so be sure to raise the FD limit
|
let (api, worker) = http(hyper_client.clone());
|
||||||
// (particularly useful for macOS where the default soft limit may
|
|
||||||
// not be enough).
|
|
||||||
fdlimit::raise_fd_limit();
|
|
||||||
|
|
||||||
let (api, worker) = http();
|
|
||||||
|
|
||||||
let (addr_tx, addr_rx) = std::sync::mpsc::channel();
|
let (addr_tx, addr_rx) = std::sync::mpsc::channel();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
|
|||||||
@@ -19,8 +19,18 @@
|
|||||||
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
|
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
|
||||||
use std::{future::Future, pin::Pin, task::Context, task::Poll};
|
use std::{future::Future, pin::Pin, task::Context, task::Poll};
|
||||||
|
|
||||||
|
/// Wrapper struct (wrapping nothing in case of http_dummy) used for keeping the hyper_rustls client running.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SharedClient;
|
||||||
|
|
||||||
|
impl SharedClient {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
|
/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
|
||||||
pub fn http() -> (HttpApi, HttpWorker) {
|
pub fn http(_: SharedClient) -> (HttpApi, HttpWorker) {
|
||||||
(HttpApi, HttpWorker)
|
(HttpApi, HttpWorker)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@
|
|||||||
//! The offchain workers is a special function of the runtime that
|
//! The offchain workers is a special function of the runtime that
|
||||||
//! gets executed after block is imported. During execution
|
//! gets executed after block is imported. During execution
|
||||||
//! it's able to asynchronously submit extrinsics that will either
|
//! it's able to asynchronously submit extrinsics that will either
|
||||||
//! be propagated to other nodes added to the next block
|
//! be propagated to other nodes or added to the next block
|
||||||
//! produced by the node as unsigned transactions.
|
//! produced by the node as unsigned transactions.
|
||||||
//!
|
//!
|
||||||
//! Offchain workers can be used for computation-heavy tasks
|
//! Offchain workers can be used for computation-heavy tasks
|
||||||
@@ -46,6 +46,7 @@ use sp_runtime::{generic::BlockId, traits::{self, Header}};
|
|||||||
use futures::{prelude::*, future::ready};
|
use futures::{prelude::*, future::ready};
|
||||||
|
|
||||||
mod api;
|
mod api;
|
||||||
|
use api::SharedClient;
|
||||||
|
|
||||||
pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};
|
pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};
|
||||||
|
|
||||||
@@ -55,16 +56,19 @@ pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
|
|||||||
db: Storage,
|
db: Storage,
|
||||||
_block: PhantomData<Block>,
|
_block: PhantomData<Block>,
|
||||||
thread_pool: Mutex<ThreadPool>,
|
thread_pool: Mutex<ThreadPool>,
|
||||||
|
shared_client: SharedClient,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
|
impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
|
||||||
/// Creates new `OffchainWorkers`.
|
/// Creates new `OffchainWorkers`.
|
||||||
pub fn new(client: Arc<Client>, db: Storage) -> Self {
|
pub fn new(client: Arc<Client>, db: Storage) -> Self {
|
||||||
|
let shared_client = SharedClient::new();
|
||||||
Self {
|
Self {
|
||||||
client,
|
client,
|
||||||
db,
|
db,
|
||||||
_block: PhantomData,
|
_block: PhantomData,
|
||||||
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
|
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
|
||||||
|
shared_client,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -120,6 +124,7 @@ impl<Client, Storage, Block> OffchainWorkers<
|
|||||||
self.db.clone(),
|
self.db.clone(),
|
||||||
network_state.clone(),
|
network_state.clone(),
|
||||||
is_validator,
|
is_validator,
|
||||||
|
self.shared_client.clone(),
|
||||||
);
|
);
|
||||||
debug!("Spawning offchain workers at {:?}", at);
|
debug!("Spawning offchain workers at {:?}", at);
|
||||||
let header = header.clone();
|
let header = header.clone();
|
||||||
|
|||||||
Reference in New Issue
Block a user