diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 84ad0c0a56..666fe6e451 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -8184,6 +8184,7 @@ dependencies = [ "lazy_static", "log 0.4.14", "num_cpus", + "once_cell", "parity-scale-codec", "parking_lot 0.11.1", "rand 0.7.3", @@ -9232,6 +9233,7 @@ name = "sp-core" version = "4.0.0-dev" dependencies = [ "base58", + "bitflags", "blake2-rfc", "byteorder", "criterion", diff --git a/substrate/client/api/src/execution_extensions.rs b/substrate/client/api/src/execution_extensions.rs index ec44294b8a..56e70cc2b6 100644 --- a/substrate/client/api/src/execution_extensions.rs +++ b/substrate/client/api/src/execution_extensions.rs @@ -161,13 +161,13 @@ impl ExecutionExtensions { let mut extensions = self.extensions_factory.read().extensions_for(capabilities); - if capabilities.has(offchain::Capability::Keystore) { + if capabilities.contains(offchain::Capabilities::KEYSTORE) { if let Some(ref keystore) = self.keystore { extensions.register(KeystoreExt(keystore.clone())); } } - if capabilities.has(offchain::Capability::TransactionPool) { + if capabilities.contains(offchain::Capabilities::TRANSACTION_POOL) { if let Some(pool) = self.transaction_pool.read().as_ref().and_then(|x| x.upgrade()) { extensions .register(TransactionPoolExt( @@ -176,8 +176,8 @@ impl ExecutionExtensions { } } - if capabilities.has(offchain::Capability::OffchainDbRead) || - capabilities.has(offchain::Capability::OffchainDbWrite) + if capabilities.contains(offchain::Capabilities::OFFCHAIN_DB_READ) || + capabilities.contains(offchain::Capabilities::OFFCHAIN_DB_WRITE) { if let Some(offchain_db) = self.offchain_db.as_ref() { extensions.register(OffchainDbExt::new(offchain::LimitedExternalities::new( @@ -210,7 +210,7 @@ impl ExecutionExtensions { ExecutionContext::BlockConstruction => self.strategies.block_construction.get_manager(), ExecutionContext::Syncing => self.strategies.syncing.get_manager(), ExecutionContext::Importing => self.strategies.importing.get_manager(), - ExecutionContext::OffchainCall(Some((_, capabilities))) if capabilities.has_all() => + ExecutionContext::OffchainCall(Some((_, capabilities))) if capabilities.is_all() => self.strategies.offchain_worker.get_manager(), ExecutionContext::OffchainCall(_) => self.strategies.other.get_manager(), }; diff --git a/substrate/client/offchain/Cargo.toml b/substrate/client/offchain/Cargo.toml index 146ce07e13..104a0e61f3 100644 --- a/substrate/client/offchain/Cargo.toml +++ b/substrate/client/offchain/Cargo.toml @@ -33,6 +33,7 @@ sc-utils = { version = "4.0.0-dev", path = "../utils" } threadpool = "1.7" hyper = "0.14.11" hyper-rustls = "0.22.1" +once_cell = "1.8" [dev-dependencies] sc-client-db = { version = "0.10.0-dev", default-features = true, path = "../db" } diff --git a/substrate/client/offchain/src/api.rs b/substrate/client/offchain/src/api.rs index b2276a8523..07136d1815 100644 --- a/substrate/client/offchain/src/api.rs +++ b/substrate/client/offchain/src/api.rs @@ -298,9 +298,9 @@ impl AsyncApi { pub fn new( network_provider: Arc, is_validator: bool, - shared_client: SharedClient, + shared_http_client: SharedClient, ) -> (Api, Self) { - let (http_api, http_worker) = http::http(shared_client); + let (http_api, http_worker) = http::http(shared_http_client); let api = Api { network_provider, is_validator, http: http_api }; @@ -310,10 +310,8 @@ impl AsyncApi { } /// Run a processing task for the API - pub fn process(mut self) -> impl Future { - let http = self.http.take().expect("Take invoked only once."); - - http + pub fn process(self) -> impl Future { + self.http.expect("`process` is only called once; qed") } } @@ -328,7 +326,7 @@ mod tests { time::SystemTime, }; - struct TestNetwork(); + pub(super) struct TestNetwork(); impl NetworkProvider for TestNetwork { fn set_authorized_peers(&self, _peers: HashSet) { diff --git a/substrate/client/offchain/src/api/http.rs b/substrate/client/offchain/src/api/http.rs index 31f7d60e34..a2975bad16 100644 --- a/substrate/client/offchain/src/api/http.rs +++ b/substrate/client/offchain/src/api/http.rs @@ -34,6 +34,7 @@ use futures::{channel::mpsc, future, prelude::*}; use hyper::{client, Body, Client as HyperClient}; use hyper_rustls::HttpsConnector; use log::error; +use once_cell::sync::Lazy; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp}; use std::{ @@ -47,11 +48,13 @@ use std::{ /// Wrapper struct used for keeping the hyper_rustls client running. #[derive(Clone)] -pub struct SharedClient(Arc, Body>>); +pub struct SharedClient(Arc, Body>>>); impl SharedClient { pub fn new() -> Self { - Self(Arc::new(HyperClient::builder().build(HttpsConnector::with_native_roots()))) + Self(Arc::new(Lazy::new(|| { + HyperClient::builder().build(HttpsConnector::with_native_roots()) + }))) } } @@ -567,7 +570,7 @@ pub struct HttpWorker { /// Used to receive messages from the `HttpApi`. from_api: TracingUnboundedReceiver, /// The engine that runs HTTP requests. - http_client: Arc, Body>>, + http_client: Arc, Body>>>, /// HTTP requests that are being worked on by the engine. requests: Vec<(HttpRequestId, HttpWorkerRequest)>, } @@ -697,12 +700,15 @@ impl fmt::Debug for HttpWorkerRequest { #[cfg(test)] mod tests { - use super::{http, SharedClient}; + use super::{ + super::{tests::TestNetwork, AsyncApi}, + *, + }; use crate::api::timestamp; use core::convert::Infallible; use futures::{future, StreamExt}; use lazy_static::lazy_static; - use sp_core::offchain::{Duration, HttpError, HttpRequestId, HttpRequestStatus}; + use sp_core::offchain::{Duration, Externalities, HttpError, HttpRequestId, HttpRequestStatus}; // Using lazy_static to avoid spawning lots of different SharedClients, // as spawning a SharedClient is CPU-intensive and opens lots of fds. @@ -1006,4 +1012,37 @@ mod tests { } } } + + #[test] + fn shared_http_client_is_only_initialized_on_access() { + let shared_client = SharedClient::new(); + + { + let mock = Arc::new(TestNetwork()); + let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone()); + api.timestamp(); + + futures::executor::block_on(async move { + assert!(futures::poll!(async_api.process()).is_pending()); + }); + } + + // Check that the http client wasn't initialized, because it wasn't used. + assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_err()); + + let shared_client = SharedClient::new(); + + { + let mock = Arc::new(TestNetwork()); + let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone()); + let id = api.http_request_start("lol", "nope", &[]).unwrap(); + api.http_request_write_body(id, &[], None).unwrap(); + futures::executor::block_on(async move { + assert!(futures::poll!(async_api.process()).is_pending()); + }); + } + + // Check that the http client initialized, because it was used. + assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_ok()); + } } diff --git a/substrate/client/offchain/src/lib.rs b/substrate/client/offchain/src/lib.rs index be6e4238ca..a77fd17a2c 100644 --- a/substrate/client/offchain/src/lib.rs +++ b/substrate/client/offchain/src/lib.rs @@ -81,18 +81,31 @@ where } } +/// Options for [`OffchainWorkers`] +pub struct OffchainWorkerOptions { + /// Enable http requests from offchain workers? + /// + /// If not enabled, any http request will panic. + pub enable_http_requests: bool, +} + /// An offchain workers manager. pub struct OffchainWorkers { client: Arc, _block: PhantomData, thread_pool: Mutex, - shared_client: api::SharedClient, + shared_http_client: api::SharedClient, + enable_http: bool, } impl OffchainWorkers { - /// Creates new `OffchainWorkers`. + /// Creates new [`OffchainWorkers`]. pub fn new(client: Arc) -> Self { - let shared_client = api::SharedClient::new(); + Self::new_with_options(client, OffchainWorkerOptions { enable_http_requests: true }) + } + + /// Creates new [`OffchainWorkers`] using the given `options`. + pub fn new_with_options(client: Arc, options: OffchainWorkerOptions) -> Self { Self { client, _block: PhantomData, @@ -100,7 +113,8 @@ impl OffchainWorkers { "offchain-worker".into(), num_cpus::get(), )), - shared_client, + shared_http_client: api::SharedClient::new(), + enable_http: options.enable_http_requests, } } } @@ -140,18 +154,22 @@ where }, }; debug!("Checking offchain workers at {:?}: version:{}", at, version); - if version > 0 { + let process = (version > 0).then(|| { let (api, runner) = - api::AsyncApi::new(network_provider, is_validator, self.shared_client.clone()); + api::AsyncApi::new(network_provider, is_validator, self.shared_http_client.clone()); debug!("Spawning offchain workers at {:?}", at); let header = header.clone(); let client = self.client.clone(); + + let mut capabilities = offchain::Capabilities::all(); + + capabilities.set(offchain::Capabilities::HTTP, self.enable_http); self.spawn_worker(move || { let runtime = client.runtime_api(); let api = Box::new(api); debug!("Running offchain workers at {:?}", at); - let context = - ExecutionContext::OffchainCall(Some((api, offchain::Capabilities::all()))); + + let context = ExecutionContext::OffchainCall(Some((api, capabilities))); let run = if version == 2 { runtime.offchain_worker_with_context(&at, context, &header) } else { @@ -166,9 +184,12 @@ where log::error!("Error running offchain workers at {:?}: {:?}", at, e); } }); - futures::future::Either::Left(runner.process()) - } else { - futures::future::Either::Right(futures::future::ready(())) + + runner.process() + }); + + async move { + futures::future::OptionFuture::from(process).await; } } diff --git a/substrate/primitives/core/Cargo.toml b/substrate/primitives/core/Cargo.toml index dd721d744f..14a76e2482 100644 --- a/substrate/primitives/core/Cargo.toml +++ b/substrate/primitives/core/Cargo.toml @@ -49,6 +49,7 @@ parity-util-mem = { version = "0.10.0", default-features = false, features = [ futures = { version = "0.3.1", optional = true } dyn-clonable = { version = "0.9.0", optional = true } thiserror = { version = "1.0.21", optional = true } +bitflags = "1.3" # full crypto ed25519-dalek = { version = "1.0.1", default-features = false, features = [ diff --git a/substrate/primitives/core/src/lib.rs b/substrate/primitives/core/src/lib.rs index a6229fe43a..fd752397cd 100644 --- a/substrate/primitives/core/src/lib.rs +++ b/substrate/primitives/core/src/lib.rs @@ -118,15 +118,13 @@ impl ExecutionContext { use ExecutionContext::*; match self { - Importing | Syncing | BlockConstruction => offchain::Capabilities::none(), + Importing | Syncing | BlockConstruction => offchain::Capabilities::empty(), // Enable keystore, transaction pool and Offchain DB reads by default for offchain // calls. - OffchainCall(None) => [ - offchain::Capability::Keystore, - offchain::Capability::OffchainDbRead, - offchain::Capability::TransactionPool, - ][..] - .into(), + OffchainCall(None) => + offchain::Capabilities::KEYSTORE | + offchain::Capabilities::OFFCHAIN_DB_READ | + offchain::Capabilities::TRANSACTION_POOL, OffchainCall(Some((_, capabilities))) => *capabilities, } } diff --git a/substrate/primitives/core/src/offchain/mod.rs b/substrate/primitives/core/src/offchain/mod.rs index 640f4d2583..dfe23c1ff8 100644 --- a/substrate/primitives/core/src/offchain/mod.rs +++ b/substrate/primitives/core/src/offchain/mod.rs @@ -258,65 +258,35 @@ impl Timestamp { } } -/// Execution context extra capabilities. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -#[repr(u8)] -pub enum Capability { - /// Access to transaction pool. - TransactionPool = 1, - /// External http calls. - Http = 2, - /// Keystore access. - Keystore = 4, - /// Randomness source. - Randomness = 8, - /// Access to opaque network state. - NetworkState = 16, - /// Access to offchain worker DB (read only). - OffchainDbRead = 32, - /// Access to offchain worker DB (writes). - OffchainDbWrite = 64, - /// Manage the authorized nodes - NodeAuthorization = 128, +bitflags::bitflags! { + /// Execution context extra capabilities. + pub struct Capabilities: u32 { + /// Access to transaction pool. + const TRANSACTION_POOL = 0b0000_0001; + /// External http calls. + const HTTP = 0b0000_0010; + /// Keystore access. + const KEYSTORE = 0b0000_0100; + /// Randomness source. + const RANDOMNESS = 0b0000_1000; + /// Access to opaque network state. + const NETWORK_STATE = 0b0001_0000; + /// Access to offchain worker DB (read only). + const OFFCHAIN_DB_READ = 0b0010_0000; + /// Access to offchain worker DB (writes). + const OFFCHAIN_DB_WRITE = 0b0100_0000; + /// Manage the authorized nodes + const NODE_AUTHORIZATION = 0b1000_0000; + } } -/// A set of capabilities -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub struct Capabilities(u8); - impl Capabilities { - /// Return an object representing an empty set of capabilities. - pub fn none() -> Self { - Self(0) - } - - /// Return an object representing all capabilities enabled. - pub fn all() -> Self { - Self(u8::MAX) - } - /// Return capabilities for rich offchain calls. /// /// Those calls should be allowed to sign and submit transactions /// and access offchain workers database (but read only!). pub fn rich_offchain_call() -> Self { - [Capability::TransactionPool, Capability::Keystore, Capability::OffchainDbRead][..].into() - } - - /// Check if particular capability is enabled. - pub fn has(&self, capability: Capability) -> bool { - self.0 & capability as u8 != 0 - } - - /// Check if this capability object represents all capabilities. - pub fn has_all(&self) -> bool { - self == &Capabilities::all() - } -} - -impl<'a> From<&'a [Capability]> for Capabilities { - fn from(list: &'a [Capability]) -> Self { - Capabilities(list.iter().fold(0_u8, |a, b| a | *b as u8)) + Capabilities::TRANSACTION_POOL | Capabilities::KEYSTORE | Capabilities::OFFCHAIN_DB_READ } } @@ -552,8 +522,8 @@ impl LimitedExternalities { /// Check if given capability is allowed. /// /// Panics in case it is not. - fn check(&self, capability: Capability, name: &'static str) { - if !self.capabilities.has(capability) { + fn check(&self, capability: Capabilities, name: &'static str) { + if !self.capabilities.contains(capability) { panic!("Accessing a forbidden API: {}. No: {:?} capability.", name, capability); } } @@ -561,27 +531,27 @@ impl LimitedExternalities { impl Externalities for LimitedExternalities { fn is_validator(&self) -> bool { - self.check(Capability::Keystore, "is_validator"); + self.check(Capabilities::KEYSTORE, "is_validator"); self.externalities.is_validator() } fn network_state(&self) -> Result { - self.check(Capability::NetworkState, "network_state"); + self.check(Capabilities::NETWORK_STATE, "network_state"); self.externalities.network_state() } fn timestamp(&mut self) -> Timestamp { - self.check(Capability::Http, "timestamp"); + self.check(Capabilities::HTTP, "timestamp"); self.externalities.timestamp() } fn sleep_until(&mut self, deadline: Timestamp) { - self.check(Capability::Http, "sleep_until"); + self.check(Capabilities::HTTP, "sleep_until"); self.externalities.sleep_until(deadline) } fn random_seed(&mut self) -> [u8; 32] { - self.check(Capability::Randomness, "random_seed"); + self.check(Capabilities::RANDOMNESS, "random_seed"); self.externalities.random_seed() } @@ -591,7 +561,7 @@ impl Externalities for LimitedExternalities { uri: &str, meta: &[u8], ) -> Result { - self.check(Capability::Http, "http_request_start"); + self.check(Capabilities::HTTP, "http_request_start"); self.externalities.http_request_start(method, uri, meta) } @@ -601,7 +571,7 @@ impl Externalities for LimitedExternalities { name: &str, value: &str, ) -> Result<(), ()> { - self.check(Capability::Http, "http_request_add_header"); + self.check(Capabilities::HTTP, "http_request_add_header"); self.externalities.http_request_add_header(request_id, name, value) } @@ -611,7 +581,7 @@ impl Externalities for LimitedExternalities { chunk: &[u8], deadline: Option, ) -> Result<(), HttpError> { - self.check(Capability::Http, "http_request_write_body"); + self.check(Capabilities::HTTP, "http_request_write_body"); self.externalities.http_request_write_body(request_id, chunk, deadline) } @@ -620,12 +590,12 @@ impl Externalities for LimitedExternalities { ids: &[HttpRequestId], deadline: Option, ) -> Vec { - self.check(Capability::Http, "http_response_wait"); + self.check(Capabilities::HTTP, "http_response_wait"); self.externalities.http_response_wait(ids, deadline) } fn http_response_headers(&mut self, request_id: HttpRequestId) -> Vec<(Vec, Vec)> { - self.check(Capability::Http, "http_response_headers"); + self.check(Capabilities::HTTP, "http_response_headers"); self.externalities.http_response_headers(request_id) } @@ -635,12 +605,12 @@ impl Externalities for LimitedExternalities { buffer: &mut [u8], deadline: Option, ) -> Result { - self.check(Capability::Http, "http_response_read_body"); + self.check(Capabilities::HTTP, "http_response_read_body"); self.externalities.http_response_read_body(request_id, buffer, deadline) } fn set_authorized_nodes(&mut self, nodes: Vec, authorized_only: bool) { - self.check(Capability::NodeAuthorization, "set_authorized_nodes"); + self.check(Capabilities::NODE_AUTHORIZATION, "set_authorized_nodes"); self.externalities.set_authorized_nodes(nodes, authorized_only) } } @@ -724,12 +694,12 @@ impl DbExternalities for Box { impl DbExternalities for LimitedExternalities { fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) { - self.check(Capability::OffchainDbWrite, "local_storage_set"); + self.check(Capabilities::OFFCHAIN_DB_WRITE, "local_storage_set"); self.externalities.local_storage_set(kind, key, value) } fn local_storage_clear(&mut self, kind: StorageKind, key: &[u8]) { - self.check(Capability::OffchainDbWrite, "local_storage_clear"); + self.check(Capabilities::OFFCHAIN_DB_WRITE, "local_storage_clear"); self.externalities.local_storage_clear(kind, key) } @@ -740,13 +710,13 @@ impl DbExternalities for LimitedExternalities { old_value: Option<&[u8]>, new_value: &[u8], ) -> bool { - self.check(Capability::OffchainDbWrite, "local_storage_compare_and_set"); + self.check(Capabilities::OFFCHAIN_DB_WRITE, "local_storage_compare_and_set"); self.externalities .local_storage_compare_and_set(kind, key, old_value, new_value) } fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option> { - self.check(Capability::OffchainDbRead, "local_storage_get"); + self.check(Capabilities::OFFCHAIN_DB_READ, "local_storage_get"); self.externalities.local_storage_get(kind, key) } } @@ -815,15 +785,15 @@ mod tests { #[test] fn capabilities() { - let none = Capabilities::none(); + let none = Capabilities::empty(); let all = Capabilities::all(); - let some = Capabilities::from(&[Capability::Keystore, Capability::Randomness][..]); + let some = Capabilities::KEYSTORE | Capabilities::RANDOMNESS; - assert!(!none.has(Capability::Keystore)); - assert!(all.has(Capability::Keystore)); - assert!(some.has(Capability::Keystore)); - assert!(!none.has(Capability::TransactionPool)); - assert!(all.has(Capability::TransactionPool)); - assert!(!some.has(Capability::TransactionPool)); + assert!(!none.contains(Capabilities::KEYSTORE)); + assert!(all.contains(Capabilities::KEYSTORE)); + assert!(some.contains(Capabilities::KEYSTORE)); + assert!(!none.contains(Capabilities::TRANSACTION_POOL)); + assert!(all.contains(Capabilities::TRANSACTION_POOL)); + assert!(!some.contains(Capabilities::TRANSACTION_POOL)); } }