Offchain-worker: Make it possible to disable http support (#10087)

* Offchain-worker: Make it possible to disable http support

If a chain doesn't require http support in its offchain workers, this pr enables them to disable the
http support.

* Switch to bitflags

* Use Capabilities

* Update client/offchain/src/lib.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Fix test

* Update client/offchain/src/lib.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
Bastian Köcher
2021-10-31 19:13:19 +01:00
committed by GitHub
parent 4292e18e50
commit 47c7447106
9 changed files with 142 additions and 112 deletions
+5 -7
View File
@@ -298,9 +298,9 @@ impl AsyncApi {
pub fn new(
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
is_validator: bool,
shared_client: SharedClient,
shared_http_client: SharedClient,
) -> (Api, Self) {
let (http_api, http_worker) = http::http(shared_client);
let (http_api, http_worker) = http::http(shared_http_client);
let api = Api { network_provider, is_validator, http: http_api };
@@ -310,10 +310,8 @@ impl AsyncApi {
}
/// Run a processing task for the API
pub fn process(mut self) -> impl Future<Output = ()> {
let http = self.http.take().expect("Take invoked only once.");
http
pub fn process(self) -> impl Future<Output = ()> {
self.http.expect("`process` is only called once; qed")
}
}
@@ -328,7 +326,7 @@ mod tests {
time::SystemTime,
};
struct TestNetwork();
pub(super) struct TestNetwork();
impl NetworkProvider for TestNetwork {
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
+44 -5
View File
@@ -34,6 +34,7 @@ use futures::{channel::mpsc, future, prelude::*};
use hyper::{client, Body, Client as HyperClient};
use hyper_rustls::HttpsConnector;
use log::error;
use once_cell::sync::Lazy;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp};
use std::{
@@ -47,11 +48,13 @@ use std::{
/// Wrapper struct used for keeping the hyper_rustls client running.
#[derive(Clone)]
pub struct SharedClient(Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>);
pub struct SharedClient(Arc<Lazy<HyperClient<HttpsConnector<client::HttpConnector>, Body>>>);
impl SharedClient {
pub fn new() -> Self {
Self(Arc::new(HyperClient::builder().build(HttpsConnector::with_native_roots())))
Self(Arc::new(Lazy::new(|| {
HyperClient::builder().build(HttpsConnector::with_native_roots())
})))
}
}
@@ -567,7 +570,7 @@ pub struct HttpWorker {
/// Used to receive messages from the `HttpApi`.
from_api: TracingUnboundedReceiver<ApiToWorker>,
/// The engine that runs HTTP requests.
http_client: Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>,
http_client: Arc<Lazy<HyperClient<HttpsConnector<client::HttpConnector>, Body>>>,
/// HTTP requests that are being worked on by the engine.
requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
}
@@ -697,12 +700,15 @@ impl fmt::Debug for HttpWorkerRequest {
#[cfg(test)]
mod tests {
use super::{http, SharedClient};
use super::{
super::{tests::TestNetwork, AsyncApi},
*,
};
use crate::api::timestamp;
use core::convert::Infallible;
use futures::{future, StreamExt};
use lazy_static::lazy_static;
use sp_core::offchain::{Duration, HttpError, HttpRequestId, HttpRequestStatus};
use sp_core::offchain::{Duration, Externalities, HttpError, HttpRequestId, HttpRequestStatus};
// Using lazy_static to avoid spawning lots of different SharedClients,
// as spawning a SharedClient is CPU-intensive and opens lots of fds.
@@ -1006,4 +1012,37 @@ mod tests {
}
}
}
#[test]
fn shared_http_client_is_only_initialized_on_access() {
let shared_client = SharedClient::new();
{
let mock = Arc::new(TestNetwork());
let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
api.timestamp();
futures::executor::block_on(async move {
assert!(futures::poll!(async_api.process()).is_pending());
});
}
// Check that the http client wasn't initialized, because it wasn't used.
assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_err());
let shared_client = SharedClient::new();
{
let mock = Arc::new(TestNetwork());
let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
let id = api.http_request_start("lol", "nope", &[]).unwrap();
api.http_request_write_body(id, &[], None).unwrap();
futures::executor::block_on(async move {
assert!(futures::poll!(async_api.process()).is_pending());
});
}
// Check that the http client initialized, because it was used.
assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_ok());
}
}
+32 -11
View File
@@ -81,18 +81,31 @@ where
}
}
/// Options for [`OffchainWorkers`]
pub struct OffchainWorkerOptions {
/// Enable http requests from offchain workers?
///
/// If not enabled, any http request will panic.
pub enable_http_requests: bool,
}
/// An offchain workers manager.
pub struct OffchainWorkers<Client, Block: traits::Block> {
client: Arc<Client>,
_block: PhantomData<Block>,
thread_pool: Mutex<ThreadPool>,
shared_client: api::SharedClient,
shared_http_client: api::SharedClient,
enable_http: bool,
}
impl<Client, Block: traits::Block> OffchainWorkers<Client, Block> {
/// Creates new `OffchainWorkers`.
/// Creates new [`OffchainWorkers`].
pub fn new(client: Arc<Client>) -> Self {
let shared_client = api::SharedClient::new();
Self::new_with_options(client, OffchainWorkerOptions { enable_http_requests: true })
}
/// Creates new [`OffchainWorkers`] using the given `options`.
pub fn new_with_options(client: Arc<Client>, options: OffchainWorkerOptions) -> Self {
Self {
client,
_block: PhantomData,
@@ -100,7 +113,8 @@ impl<Client, Block: traits::Block> OffchainWorkers<Client, Block> {
"offchain-worker".into(),
num_cpus::get(),
)),
shared_client,
shared_http_client: api::SharedClient::new(),
enable_http: options.enable_http_requests,
}
}
}
@@ -140,18 +154,22 @@ where
},
};
debug!("Checking offchain workers at {:?}: version:{}", at, version);
if version > 0 {
let process = (version > 0).then(|| {
let (api, runner) =
api::AsyncApi::new(network_provider, is_validator, self.shared_client.clone());
api::AsyncApi::new(network_provider, is_validator, self.shared_http_client.clone());
debug!("Spawning offchain workers at {:?}", at);
let header = header.clone();
let client = self.client.clone();
let mut capabilities = offchain::Capabilities::all();
capabilities.set(offchain::Capabilities::HTTP, self.enable_http);
self.spawn_worker(move || {
let runtime = client.runtime_api();
let api = Box::new(api);
debug!("Running offchain workers at {:?}", at);
let context =
ExecutionContext::OffchainCall(Some((api, offchain::Capabilities::all())));
let context = ExecutionContext::OffchainCall(Some((api, capabilities)));
let run = if version == 2 {
runtime.offchain_worker_with_context(&at, context, &header)
} else {
@@ -166,9 +184,12 @@ where
log::error!("Error running offchain workers at {:?}: {:?}", at, e);
}
});
futures::future::Either::Left(runner.process())
} else {
futures::future::Either::Right(futures::future::ready(()))
runner.process()
});
async move {
futures::future::OptionFuture::from(process).await;
}
}