Remove the last bits of unknown_os in the code base (#9718)

* Remove the last bits of unknown_os in the code base

* Fmt
This commit is contained in:
Bastian Köcher
2021-09-09 11:17:16 +02:00
committed by GitHub
parent a443944167
commit 129c9ed09e
28 changed files with 197 additions and 523 deletions
-11
View File
@@ -7126,16 +7126,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd"
[[package]]
name = "ruzstd"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d425143485a37727c7a46e689bbe3b883a00f42b4a52c4ac0f44855c1009b00"
dependencies = [
"byteorder",
"twox-hash",
]
[[package]]
name = "rw-stream-sink"
version = "0.2.1"
@@ -9139,7 +9129,6 @@ dependencies = [
name = "sp-maybe-compressed-blob"
version = "4.0.0-dev"
dependencies = [
"ruzstd",
"zstd",
]
-2
View File
@@ -43,8 +43,6 @@ sc-tracing = { version = "4.0.0-dev", path = "../tracing" }
chrono = "0.4.10"
serde = "1.0.126"
thiserror = "1.0.21"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
rpassword = "5.0.0"
[dev-dependencies]
@@ -76,13 +76,7 @@ impl KeystoreParams {
/// Returns a vector of remote-urls and the local Keystore configuration
pub fn keystore_config(&self, config_dir: &Path) -> Result<(Option<String>, KeystoreConfig)> {
let password = if self.password_interactive {
#[cfg(not(target_os = "unknown"))]
{
let password = input_keystore_password()?;
Some(SecretString::new(password))
}
#[cfg(target_os = "unknown")]
None
Some(SecretString::new(input_keystore_password()?))
} else if let Some(ref file) = self.password_filename {
let password = fs::read_to_string(file).map_err(|e| format!("{}", e))?;
Some(SecretString::new(password))
@@ -113,7 +107,6 @@ impl KeystoreParams {
}
}
#[cfg(not(target_os = "unknown"))]
fn input_keystore_password() -> Result<String> {
rpassword::read_password_from_tty(Some("Keystore password: "))
.map_err(|e| format!("{:?}", e).into())
-9
View File
@@ -65,8 +65,6 @@ pub struct LightStorage<Block: BlockT> {
meta: RwLock<Meta<NumberFor<Block>, Block::Hash>>,
cache: Arc<DbCacheSync<Block>>,
header_metadata_cache: Arc<HeaderMetadataCache<Block>>,
#[cfg(not(target_os = "unknown"))]
io_stats: FrozenForDuration<kvdb::IoStats>,
}
@@ -102,7 +100,6 @@ impl<Block: BlockT> LightStorage<Block> {
meta: RwLock::new(meta),
cache: Arc::new(DbCacheSync(RwLock::new(cache))),
header_metadata_cache,
#[cfg(not(target_os = "unknown"))]
io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1)),
})
}
@@ -589,7 +586,6 @@ where
Some(self.cache.clone())
}
#[cfg(not(target_os = "unknown"))]
fn usage_info(&self) -> Option<UsageInfo> {
use sc_client_api::{IoInfo, MemoryInfo, MemorySize};
@@ -619,11 +615,6 @@ where
},
})
}
#[cfg(target_os = "unknown")]
fn usage_info(&self) -> Option<UsageInfo> {
None
}
}
impl<Block> ProvideChtRoots<Block> for LightStorage<Block>
@@ -237,7 +237,6 @@ impl RuntimeCache {
None => {
let code = runtime_code.fetch_runtime_code().ok_or(WasmError::CodeNotFound)?;
#[cfg(not(target_os = "unknown"))]
let time = std::time::Instant::now();
let result = create_versioned_wasm_runtime(
@@ -254,7 +253,6 @@ impl RuntimeCache {
match result {
Ok(ref result) => {
#[cfg(not(target_os = "unknown"))]
log::debug!(
target: "wasm-runtime",
"Prepared new runtime version {:?} in {} ms.",
+3 -19
View File
@@ -52,31 +52,16 @@ impl Default for OutputFormat {
}
}
/// Marker trait for a type that implements `TransactionPool` and `MallocSizeOf` on `not(target_os =
/// "unknown")`.
#[cfg(target_os = "unknown")]
pub trait TransactionPoolAndMaybeMallogSizeOf: TransactionPool {}
/// Marker trait for a type that implements `TransactionPool` and `MallocSizeOf` on `not(target_os =
/// "unknown")`.
#[cfg(not(target_os = "unknown"))]
pub trait TransactionPoolAndMaybeMallogSizeOf: TransactionPool + MallocSizeOf {}
#[cfg(target_os = "unknown")]
impl<T: TransactionPool> TransactionPoolAndMaybeMallogSizeOf for T {}
#[cfg(not(target_os = "unknown"))]
impl<T: TransactionPool + MallocSizeOf> TransactionPoolAndMaybeMallogSizeOf for T {}
/// Builds the informant and returns a `Future` that drives the informant.
pub async fn build<B: BlockT, C>(
pub async fn build<B: BlockT, C, P>(
client: Arc<C>,
network: Arc<NetworkService<B, <B as BlockT>::Hash>>,
pool: Arc<impl TransactionPoolAndMaybeMallogSizeOf>,
pool: Arc<P>,
format: OutputFormat,
) where
C: UsageProvider<B> + HeaderMetadata<B> + BlockchainEvents<B>,
<C as HeaderMetadata<B>>::Error: Display,
P: TransactionPool + MallocSizeOf,
{
let mut display = display::InformantDisplay::new(format.clone());
@@ -97,7 +82,6 @@ pub async fn build<B: BlockT, C>(
"Usage statistics not displayed as backend does not provide it",
)
}
#[cfg(not(target_os = "unknown"))]
trace!(
target: "usage",
"Subsystems memory [txpool: {} kB]",
+1 -20
View File
@@ -64,26 +64,7 @@ unsigned-varint = { version = "0.6.0", features = [
] }
void = "1.0.2"
zeroize = "1.2.0"
[dependencies.libp2p]
version = "0.39.1"
[target.'cfg(target_os = "unknown")'.dependencies.libp2p]
version = "0.39.1"
default-features = false
features = [
"identify",
"kad",
"mdns",
"mplex",
"noise",
"ping",
"request-response",
"tcp-async-io",
"websocket",
"yamux",
]
libp2p = "0.39.1"
[dev-dependencies]
assert_matches = "1.3"
+1 -11
View File
@@ -50,8 +50,6 @@ use crate::{config::ProtocolId, utils::LruHashSet};
use futures::prelude::*;
use futures_timer::Delay;
use ip_network::IpNetwork;
#[cfg(not(target_os = "unknown"))]
use libp2p::mdns::{Mdns, MdnsConfig, MdnsEvent};
use libp2p::{
core::{
connection::{ConnectionId, ListenerId},
@@ -66,6 +64,7 @@ use libp2p::{
GetClosestPeersError, Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent,
QueryId, QueryResult, Quorum, Record,
},
mdns::{Mdns, MdnsConfig, MdnsEvent},
multiaddr::Protocol,
swarm::{
protocols_handler::multi::IntoMultiHandler, IntoProtocolsHandler, NetworkBehaviour,
@@ -156,9 +155,6 @@ impl DiscoveryConfig {
/// Should MDNS discovery be supported?
pub fn with_mdns(&mut self, value: bool) -> &mut Self {
if value && cfg!(target_os = "unknown") {
log::warn!(target: "sub-libp2p", "mDNS is not available on this platform")
}
self.enable_mdns = value;
self
}
@@ -234,7 +230,6 @@ impl DiscoveryConfig {
num_connections: 0,
allow_private_ipv4,
discovery_only_if_under_num,
#[cfg(not(target_os = "unknown"))]
mdns: if enable_mdns {
MdnsWrapper::Instantiating(Mdns::new(MdnsConfig::default()).boxed())
} else {
@@ -257,7 +252,6 @@ pub struct DiscoveryBehaviour {
/// Kademlia requests and answers.
kademlias: HashMap<ProtocolId, Kademlia<MemoryStore>>,
/// Discovers nodes on the local network.
#[cfg(not(target_os = "unknown"))]
mdns: MdnsWrapper,
/// Stream that fires when we need to perform the next random Kademlia query. `None` if
/// random walking is disabled.
@@ -505,7 +499,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
list_to_filter.extend(k.addresses_of_peer(peer_id))
}
#[cfg(not(target_os = "unknown"))]
list_to_filter.extend(self.mdns.addresses_of_peer(peer_id));
if !self.allow_private_ipv4 {
@@ -840,7 +833,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
// Poll mDNS.
#[cfg(not(target_os = "unknown"))]
while let Poll::Ready(ev) = self.mdns.poll(cx, params) {
match ev {
NetworkBehaviourAction::GenerateEvent(event) => match event {
@@ -890,14 +882,12 @@ fn protocol_name_from_protocol_id(id: &ProtocolId) -> Vec<u8> {
/// [`Mdns::new`] returns a future. Instead of forcing [`DiscoveryConfig::finish`] and all its
/// callers to be async, lazily instantiate [`Mdns`].
#[cfg(not(target_os = "unknown"))]
enum MdnsWrapper {
Instantiating(futures::future::BoxFuture<'static, std::io::Result<Mdns>>),
Ready(Mdns),
Disabled,
}
#[cfg(not(target_os = "unknown"))]
impl MdnsWrapper {
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
match self {
+1 -3
View File
@@ -25,10 +25,8 @@ use libp2p::{
transport::{Boxed, OptionalTransport},
upgrade,
},
identity, mplex, noise, PeerId, Transport,
dns, identity, mplex, noise, tcp, websocket, PeerId, Transport,
};
#[cfg(not(target_os = "unknown"))]
use libp2p::{dns, tcp, websocket};
use std::{sync::Arc, time::Duration};
pub use self::bandwidth::BandwidthSinks;
-2
View File
@@ -31,8 +31,6 @@ sp-offchain = { version = "4.0.0-dev", path = "../../primitives/offchain" }
sp-runtime = { version = "4.0.0-dev", path = "../../primitives/runtime" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
threadpool = "1.7"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
hyper = "0.14.11"
hyper-rustls = "0.22.1"
-6
View File
@@ -32,14 +32,8 @@ use sp_core::{
};
pub use sp_offchain::STORAGE_PREFIX;
#[cfg(not(target_os = "unknown"))]
mod http;
#[cfg(target_os = "unknown")]
use http_dummy as http;
#[cfg(target_os = "unknown")]
mod http_dummy;
mod timestamp;
fn unavailable_yet<R: Default>(name: &str) -> R {
@@ -1,124 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Contains the same API as the `http` module, except that everything returns an error.
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Wrapper struct (wrapping nothing in case of http_dummy) used for keeping the hyper_rustls client
/// running.
#[derive(Clone)]
pub struct SharedClient;
impl SharedClient {
pub fn new() -> Self {
Self
}
}
/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
pub fn http(_: SharedClient) -> (HttpApi, HttpWorker) {
(HttpApi, HttpWorker)
}
/// Dummy implementation of HTTP capabilities.
#[derive(Debug)]
pub struct HttpApi;
/// Dummy implementation of HTTP capabilities.
#[derive(Debug)]
pub struct HttpWorker;
impl HttpApi {
/// Mimics the corresponding method in the offchain API.
pub fn request_start(&mut self, _: &str, _: &str) -> Result<HttpRequestId, ()> {
/// Because this always returns an error, none of the other methods should ever be called.
Err(())
}
/// Mimics the corresponding method in the offchain API.
pub fn request_add_header(&mut self, _: HttpRequestId, _: &str, _: &str) -> Result<(), ()> {
unreachable!(
"Creating a request always fails, thus this function will \
never be called; qed"
)
}
/// Mimics the corresponding method in the offchain API.
pub fn request_write_body(
&mut self,
_: HttpRequestId,
_: &[u8],
_: Option<Timestamp>,
) -> Result<(), HttpError> {
unreachable!(
"Creating a request always fails, thus this function will \
never be called; qed"
)
}
/// Mimics the corresponding method in the offchain API.
pub fn response_wait(
&mut self,
requests: &[HttpRequestId],
_: Option<Timestamp>,
) -> Vec<HttpRequestStatus> {
if requests.is_empty() {
Vec::new()
} else {
unreachable!(
"Creating a request always fails, thus the list of requests should \
always be empty; qed"
)
}
}
/// Mimics the corresponding method in the offchain API.
pub fn response_headers(&mut self, _: HttpRequestId) -> Vec<(Vec<u8>, Vec<u8>)> {
unreachable!(
"Creating a request always fails, thus this function will \
never be called; qed"
)
}
/// Mimics the corresponding method in the offchain API.
pub fn response_read_body(
&mut self,
_: HttpRequestId,
_: &mut [u8],
_: Option<Timestamp>,
) -> Result<usize, HttpError> {
unreachable!(
"Creating a request always fails, thus this function will \
never be called; qed"
)
}
}
impl Future for HttpWorker {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
Poll::Ready(())
}
}
-2
View File
@@ -19,8 +19,6 @@ pubsub = { package = "jsonrpc-pubsub", version = "18.0.0" }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.9.0"}
serde_json = "1.0.41"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
http = { package = "jsonrpc-http-server", version = "18.0.0" }
ipc = { package = "jsonrpc-ipc-server", version = "18.0.0" }
ws = { package = "jsonrpc-ws-server", version = "18.0.0" }
+97 -114
View File
@@ -42,7 +42,6 @@ const HTTP_THREADS: usize = 4;
/// The RPC IoHandler containing all requested APIs.
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;
pub use self::inner::*;
pub use middleware::{method_names, RpcMetrics, RpcMiddleware};
/// Construct rpc `IoHandler`
@@ -111,122 +110,106 @@ impl ServerMetrics {
}
}
#[cfg(not(target_os = "unknown"))]
mod inner {
use super::*;
/// Type alias for ipc server
pub type IpcServer = ipc::Server;
/// Type alias for http server
pub type HttpServer = http::Server;
/// Type alias for ws server
pub type WsServer = ws::Server;
/// Type alias for ipc server
pub type IpcServer = ipc::Server;
/// Type alias for http server
pub type HttpServer = http::Server;
/// Type alias for ws server
pub type WsServer = ws::Server;
impl ws::SessionStats for ServerMetrics {
fn open_session(&self, _id: ws::SessionId) {
self.session_opened.as_ref().map(|m| m.inc());
}
fn close_session(&self, _id: ws::SessionId) {
self.session_closed.as_ref().map(|m| m.inc());
}
impl ws::SessionStats for ServerMetrics {
fn open_session(&self, _id: ws::SessionId) {
self.session_opened.as_ref().map(|m| m.inc());
}
/// Start HTTP server listening on given address.
///
/// **Note**: Only available if `not(target_os = "unknown")`.
pub fn start_http<M: pubsub::PubSubMetadata + Default + Unpin>(
addr: &std::net::SocketAddr,
thread_pool_size: Option<usize>,
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
) -> io::Result<http::Server> {
let max_request_body_size = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
http::ServerBuilder::new(io)
.threads(thread_pool_size.unwrap_or(HTTP_THREADS))
.health_api(("/health", "system_health"))
.allowed_hosts(hosts_filtering(cors.is_some()))
.rest_api(if cors.is_some() { http::RestApi::Secure } else { http::RestApi::Unsecure })
.cors(map_cors::<http::AccessControlAllowOrigin>(cors))
.max_request_body_size(max_request_body_size)
.start_http(addr)
}
/// Start IPC server listening on given path.
///
/// **Note**: Only available if `not(target_os = "unknown")`.
pub fn start_ipc<M: pubsub::PubSubMetadata + Default>(
addr: &str,
io: RpcHandler<M>,
server_metrics: ServerMetrics,
) -> io::Result<ipc::Server> {
let builder = ipc::ServerBuilder::new(io);
#[cfg(target_os = "unix")]
builder.set_security_attributes({
let security_attributes = ipc::SecurityAttributes::empty();
security_attributes.set_mode(0o600)?;
security_attributes
});
builder.session_stats(server_metrics).start(addr)
}
/// Start WS server listening on given address.
///
/// **Note**: Only available if `not(target_os = "unknown")`.
pub fn start_ws<
M: pubsub::PubSubMetadata + From<futures::channel::mpsc::UnboundedSender<String>>,
>(
addr: &std::net::SocketAddr,
max_connections: Option<usize>,
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
server_metrics: ServerMetrics,
) -> io::Result<ws::Server> {
let rpc_max_payload = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| {
context.sender().into()
})
.max_payload(rpc_max_payload)
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
.allowed_origins(map_cors(cors))
.allowed_hosts(hosts_filtering(cors.is_some()))
.session_stats(server_metrics)
.start(addr)
.map_err(|err| match err {
ws::Error::Io(io) => io,
ws::Error::ConnectionClosed => io::ErrorKind::BrokenPipe.into(),
e => {
error!("{}", e);
io::ErrorKind::Other.into()
},
})
}
fn map_cors<T: for<'a> From<&'a str>>(
cors: Option<&Vec<String>>,
) -> http::DomainsValidation<T> {
cors.map(|x| x.iter().map(AsRef::as_ref).map(Into::into).collect::<Vec<_>>())
.into()
}
fn hosts_filtering(enable: bool) -> http::DomainsValidation<http::Host> {
if enable {
// NOTE The listening address is whitelisted by default.
// Setting an empty vector here enables the validation
// and allows only the listening address.
http::DomainsValidation::AllowOnly(vec![])
} else {
http::DomainsValidation::Disabled
}
fn close_session(&self, _id: ws::SessionId) {
self.session_closed.as_ref().map(|m| m.inc());
}
}
#[cfg(target_os = "unknown")]
mod inner {}
/// Start HTTP server listening on given address.
pub fn start_http<M: pubsub::PubSubMetadata + Default + Unpin>(
addr: &std::net::SocketAddr,
thread_pool_size: Option<usize>,
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
) -> io::Result<http::Server> {
let max_request_body_size = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
http::ServerBuilder::new(io)
.threads(thread_pool_size.unwrap_or(HTTP_THREADS))
.health_api(("/health", "system_health"))
.allowed_hosts(hosts_filtering(cors.is_some()))
.rest_api(if cors.is_some() { http::RestApi::Secure } else { http::RestApi::Unsecure })
.cors(map_cors::<http::AccessControlAllowOrigin>(cors))
.max_request_body_size(max_request_body_size)
.start_http(addr)
}
/// Start IPC server listening on given path.
pub fn start_ipc<M: pubsub::PubSubMetadata + Default>(
addr: &str,
io: RpcHandler<M>,
server_metrics: ServerMetrics,
) -> io::Result<ipc::Server> {
let builder = ipc::ServerBuilder::new(io);
#[cfg(target_os = "unix")]
builder.set_security_attributes({
let security_attributes = ipc::SecurityAttributes::empty();
security_attributes.set_mode(0o600)?;
security_attributes
});
builder.session_stats(server_metrics).start(addr)
}
/// Start WS server listening on given address.
pub fn start_ws<
M: pubsub::PubSubMetadata + From<futures::channel::mpsc::UnboundedSender<String>>,
>(
addr: &std::net::SocketAddr,
max_connections: Option<usize>,
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
server_metrics: ServerMetrics,
) -> io::Result<ws::Server> {
let rpc_max_payload = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| {
context.sender().into()
})
.max_payload(rpc_max_payload)
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
.allowed_origins(map_cors(cors))
.allowed_hosts(hosts_filtering(cors.is_some()))
.session_stats(server_metrics)
.start(addr)
.map_err(|err| match err {
ws::Error::Io(io) => io,
ws::Error::ConnectionClosed => io::ErrorKind::BrokenPipe.into(),
e => {
error!("{}", e);
io::ErrorKind::Other.into()
},
})
}
fn map_cors<T: for<'a> From<&'a str>>(cors: Option<&Vec<String>>) -> http::DomainsValidation<T> {
cors.map(|x| x.iter().map(AsRef::as_ref).map(Into::into).collect::<Vec<_>>())
.into()
}
fn hosts_filtering(enable: bool) -> http::DomainsValidation<http::Host> {
if enable {
// NOTE The listening address is whitelisted by default.
// Setting an empty vector here enables the validation
// and allows only the listening address.
http::DomainsValidation::AllowOnly(vec![])
} else {
http::DomainsValidation::Disabled
}
}
@@ -175,7 +175,6 @@ impl<M: Metadata> RequestMiddleware<M> for RpcMiddleware {
F: Fn(jsonrpc_core::Call, M) -> X + Send + Sync,
X: Future<Output = Option<jsonrpc_core::Output>> + Send + 'static,
{
#[cfg(not(target_os = "unknown"))]
let start = std::time::Instant::now();
let name = call_name(&call, &self.known_rpc_method_names).to_owned();
let metrics = self.metrics.clone();
@@ -191,11 +190,7 @@ impl<M: Metadata> RequestMiddleware<M> for RpcMiddleware {
Either::Left(
async move {
let r = r.await;
#[cfg(not(target_os = "unknown"))]
let micros = start.elapsed().as_micros();
// seems that std::time is not implemented for browser target
#[cfg(target_os = "unknown")]
let micros = 1;
if let Some(ref metrics) = metrics {
metrics
.calls_time
-2
View File
@@ -79,8 +79,6 @@ parity-util-mem = { version = "0.10.0", default-features = false, features = [
"primitive-types",
] }
async-trait = "0.1.50"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
tempfile = "3.1.0"
directories = "3.0.2"
+2 -3
View File
@@ -22,8 +22,7 @@ use crate::{
config::{Configuration, KeystoreConfig, PrometheusConfig, TransactionStorageMode},
error::Error,
metrics::MetricsService,
start_rpc_servers, MallocSizeOfWasm, RpcHandlers, SpawnTaskHandle, TaskManager,
TransactionPoolAdapter,
start_rpc_servers, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
};
use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
use jsonrpc_pubsub::manager::SubscriptionManager;
@@ -552,7 +551,7 @@ where
TBl::Header: Unpin,
TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash>
+ MallocSizeOfWasm
+ parity_util_mem::MallocSizeOf
+ 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
-5
View File
@@ -43,7 +43,6 @@ use std::{
pin::Pin,
sync::Arc,
};
#[cfg(not(target_os = "unknown"))]
use tempfile::TempDir;
/// Service configuration.
@@ -253,7 +252,6 @@ impl Default for RpcMethods {
#[derive(Debug)]
pub enum BasePath {
/// A temporary directory is used as base path and will be deleted when dropped.
#[cfg(not(target_os = "unknown"))]
Temporary(TempDir),
/// A path on the disk.
Permanenent(PathBuf),
@@ -265,7 +263,6 @@ impl BasePath {
///
/// Note: the temporary directory will be created automatically and deleted when the `BasePath`
/// instance is dropped.
#[cfg(not(target_os = "unknown"))]
pub fn new_temp_dir() -> io::Result<BasePath> {
Ok(BasePath::Temporary(tempfile::Builder::new().prefix("substrate").tempdir()?))
}
@@ -279,7 +276,6 @@ impl BasePath {
}
/// Create a base path from values describing the project.
#[cfg(not(target_os = "unknown"))]
pub fn from_project(qualifier: &str, organization: &str, application: &str) -> BasePath {
BasePath::new(
directories::ProjectDirs::from(qualifier, organization, application)
@@ -291,7 +287,6 @@ impl BasePath {
/// Retrieve the base path.
pub fn path(&self) -> &Path {
match self {
#[cfg(not(target_os = "unknown"))]
BasePath::Temporary(temp_dir) => temp_dir.path(),
BasePath::Permanenent(path) => path.as_path(),
}
-30
View File
@@ -39,7 +39,6 @@ use std::{collections::HashMap, io, net::SocketAddr, pin::Pin, task::Poll};
use codec::{Decode, Encode};
use futures::{stream, Future, FutureExt, Stream, StreamExt};
use log::{debug, error, warn};
use parity_util_mem::MallocSizeOf;
use sc_network::PeerId;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_runtime::{
@@ -81,16 +80,6 @@ pub use task_manager::{SpawnTaskHandle, TaskManager};
const DEFAULT_PROTOCOL_ID: &str = "sup";
/// A type that implements `MallocSizeOf` on native but not wasm.
#[cfg(not(target_os = "unknown"))]
pub trait MallocSizeOfWasm: MallocSizeOf {}
#[cfg(target_os = "unknown")]
pub trait MallocSizeOfWasm {}
#[cfg(not(target_os = "unknown"))]
impl<T: MallocSizeOf> MallocSizeOfWasm for T {}
#[cfg(target_os = "unknown")]
impl<T> MallocSizeOfWasm for T {}
/// RPC handlers that can perform RPC queries.
#[derive(Clone)]
pub struct RpcHandlers(
@@ -305,7 +294,6 @@ async fn build_network_future<
}
// Wrapper for HTTP and WS servers that makes sure they are properly shut down.
#[cfg(not(target_os = "unknown"))]
mod waiting {
pub struct HttpServer(pub Option<sc_rpc_server::HttpServer>);
impl Drop for HttpServer {
@@ -340,7 +328,6 @@ mod waiting {
/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them
/// alive.
#[cfg(not(target_os = "unknown"))]
fn start_rpc_servers<
H: FnMut(
sc_rpc::DenyUnsafe,
@@ -445,23 +432,6 @@ fn start_rpc_servers<
)))
}
/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them
/// alive.
#[cfg(target_os = "unknown")]
fn start_rpc_servers<
H: FnMut(
sc_rpc::DenyUnsafe,
sc_rpc_server::RpcMiddleware,
) -> Result<sc_rpc_server::RpcHandler<sc_rpc::Metadata>, Error>,
>(
_: &Configuration,
_: H,
_: Option<sc_rpc_server::RpcMetrics>,
_: sc_rpc_server::ServerMetrics,
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
Ok(Box::new(()))
}
/// An RPC session. Used to perform in-memory RPC queries (ie. RPC queries that don't go through
/// the HTTP or WebSockets server).
#[derive(Clone)]
@@ -207,8 +207,7 @@ const RECENTLY_PRUNED_TAGS: usize = 2;
/// as-is for the second time will fail or produce unwanted results.
/// Most likely it is required to revalidate them and recompute set of
/// required tags.
#[derive(Debug)]
#[cfg_attr(not(target_os = "unknown"), derive(parity_util_mem::MallocSizeOf))]
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
reject_future_transactions: bool,
future: FutureTransactions<Hash, Ex>,
@@ -28,7 +28,7 @@ use std::time::Instant;
use super::base_pool::Transaction;
#[cfg_attr(not(target_os = "unknown"), derive(parity_util_mem::MallocSizeOf))]
#[derive(parity_util_mem::MallocSizeOf)]
/// Transaction with partially satisfied dependencies.
pub struct WaitingTransaction<Hash, Ex> {
/// Transaction details.
@@ -108,8 +108,7 @@ impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
///
/// Contains transactions that are still awaiting for some other transactions that
/// could provide a tag that they require.
#[derive(Debug)]
#[cfg_attr(not(target_os = "unknown"), derive(parity_util_mem::MallocSizeOf))]
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {
/// tags that are not yet provided by any transaction and we await for them
wanted_tags: HashMap<Tag, HashSet<Hash>>,
@@ -133,7 +133,6 @@ pub struct Pool<B: ChainApi> {
validated_pool: Arc<ValidatedPool<B>>,
}
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for Pool<B>
where
ExtrinsicFor<B>: parity_util_mem::MallocSizeOf,
@@ -111,7 +111,6 @@ pub struct ValidatedPool<B: ChainApi> {
rotator: PoolRotator<ExtrinsicHash<B>>,
}
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for ValidatedPool<B>
where
ExtrinsicFor<B>: parity_util_mem::MallocSizeOf,
@@ -138,7 +138,6 @@ impl<T, Block: BlockT> ReadyPoll<T, Block> {
}
}
#[cfg(not(target_os = "unknown"))]
impl<PoolApi, Block> parity_util_mem::MallocSizeOf for BasicPool<PoolApi, Block>
where
PoolApi: graph::ChainApi<Block = Block>,
@@ -10,8 +10,5 @@ description = "Handling of blobs, usually Wasm code, which may be compresed"
documentation = "https://docs.rs/sp-maybe-compressed-blob"
readme = "README.md"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
[dependencies]
zstd = { version = "0.6.0", default-features = false }
[target.'cfg(target_os = "unknown")'.dependencies]
ruzstd = { version = "0.2.2" }
@@ -70,22 +70,12 @@ fn read_from_decoder(
}
}
#[cfg(not(target_os = "unknown"))]
fn decompress_zstd(blob: &[u8], bomb_limit: usize) -> Result<Vec<u8>, Error> {
let decoder = zstd::Decoder::new(blob).map_err(|_| Error::Invalid)?;
read_from_decoder(decoder, blob.len(), bomb_limit)
}
#[cfg(target_os = "unknown")]
fn decompress_zstd(mut blob: &[u8], bomb_limit: usize) -> Result<Vec<u8>, Error> {
let blob_len = blob.len();
let decoder =
ruzstd::streaming_decoder::StreamingDecoder::new(&mut blob).map_err(|_| Error::Invalid)?;
read_from_decoder(decoder, blob_len, bomb_limit)
}
/// Decode a blob, if it indicates that it is compressed. Provide a `bomb_limit`, which
/// is the limit of bytes which should be decompressed from the blob.
pub fn decompress(blob: &[u8], bomb_limit: usize) -> Result<Cow<[u8]>, Error> {
@@ -99,7 +89,6 @@ pub fn decompress(blob: &[u8], bomb_limit: usize) -> Result<Cow<[u8]>, Error> {
/// Encode a blob as compressed. If the blob's size is over the bomb limit,
/// this will not compress the blob, as the decoder will not be able to be
/// able to differentiate it from a compression bomb.
#[cfg(not(target_os = "unknown"))]
pub fn compress(blob: &[u8], bomb_limit: usize) -> Option<Vec<u8>> {
use std::io::Write;
-2
View File
@@ -17,8 +17,6 @@ log = "0.4.8"
prometheus = { version = "0.11.0", default-features = false }
futures-util = { version = "0.3.1", default-features = false, features = ["io"] }
derive_more = "0.99"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
async-std = { version = "1.6.5", features = ["unstable"] }
tokio = "1.10"
hyper = { version = "0.14.11", default-features = false, features = ["http1", "server", "tcp"] }
+87 -120
View File
@@ -15,9 +15,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(not(target_os = "unknown"))]
use futures_util::future::Future;
use prometheus::core::Collector;
use hyper::{
http::StatusCode,
server::Server,
service::{make_service_fn, service_fn},
Body, Request, Response,
};
pub use prometheus::{
self,
core::{
@@ -27,21 +31,14 @@ pub use prometheus::{
exponential_buckets, Error as PrometheusError, Histogram, HistogramOpts, HistogramVec, Opts,
Registry,
};
#[cfg(not(target_os = "unknown"))]
use prometheus::{Encoder, TextEncoder};
use prometheus::{core::Collector, Encoder, TextEncoder};
use std::net::SocketAddr;
#[cfg(not(target_os = "unknown"))]
mod networking;
mod sourced;
pub use sourced::{MetricSource, SourcedCounter, SourcedGauge, SourcedMetric};
#[cfg(not(target_os = "unknown"))]
pub use known_os::init_prometheus;
#[cfg(target_os = "unknown")]
pub use unknown_os::init_prometheus;
pub fn register<T: Clone + Collector + 'static>(
metric: T,
registry: &Registry,
@@ -50,126 +47,96 @@ pub fn register<T: Clone + Collector + 'static>(
Ok(metric)
}
// On WASM `init_prometheus` becomes a no-op.
#[cfg(target_os = "unknown")]
mod unknown_os {
use super::*;
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
/// Hyper internal error.
Hyper(hyper::Error),
/// Http request error.
Http(hyper::http::Error),
/// i/o error.
Io(std::io::Error),
#[display(fmt = "Prometheus port {} already in use.", _0)]
PortInUse(SocketAddr),
}
pub enum Error {}
pub async fn init_prometheus(_: SocketAddr, _registry: Registry) -> Result<(), Error> {
Ok(())
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Hyper(error) => Some(error),
Error::Http(error) => Some(error),
Error::Io(error) => Some(error),
Error::PortInUse(_) => None,
}
}
}
#[cfg(not(target_os = "unknown"))]
mod known_os {
use super::*;
use hyper::{
http::StatusCode,
server::Server,
service::{make_service_fn, service_fn},
Body, Request, Response,
};
async fn request_metrics(req: Request<Body>, registry: Registry) -> Result<Response<Body>, Error> {
if req.uri().path() == "/metrics" {
let metric_families = registry.gather();
let mut buffer = vec![];
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
/// Hyper internal error.
Hyper(hyper::Error),
/// Http request error.
Http(hyper::http::Error),
/// i/o error.
Io(std::io::Error),
#[display(fmt = "Prometheus port {} already in use.", _0)]
PortInUse(SocketAddr),
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", encoder.format_type())
.body(Body::from(buffer))
.map_err(Error::Http)
} else {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found."))
.map_err(Error::Http)
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Hyper(error) => Some(error),
Error::Http(error) => Some(error),
Error::Io(error) => Some(error),
Error::PortInUse(_) => None,
}
#[derive(Clone)]
pub struct Executor;
impl<T> hyper::rt::Executor<T> for Executor
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
fn execute(&self, future: T) {
async_std::task::spawn(future);
}
}
/// Initializes the metrics context, and starts an HTTP server
/// to serve metrics.
pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error> {
let listener = async_std::net::TcpListener::bind(&prometheus_addr)
.await
.map_err(|_| Error::PortInUse(prometheus_addr))?;
init_prometheus_with_listener(listener, registry).await
}
/// Init prometheus using the given listener.
async fn init_prometheus_with_listener(
listener: async_std::net::TcpListener,
registry: Registry,
) -> Result<(), Error> {
use networking::Incoming;
log::info!("〽️ Prometheus exporter started at {}", listener.local_addr()?);
let service = make_service_fn(move |_| {
let registry = registry.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
request_metrics(req, registry.clone())
}))
}
}
});
async fn request_metrics(
req: Request<Body>,
registry: Registry,
) -> Result<Response<Body>, Error> {
if req.uri().path() == "/metrics" {
let metric_families = registry.gather();
let mut buffer = vec![];
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
let server = Server::builder(Incoming(listener.incoming())).executor(Executor).serve(service);
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", encoder.format_type())
.body(Body::from(buffer))
.map_err(Error::Http)
} else {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found."))
.map_err(Error::Http)
}
}
let result = server.await.map_err(Into::into);
#[derive(Clone)]
pub struct Executor;
impl<T> hyper::rt::Executor<T> for Executor
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
fn execute(&self, future: T) {
async_std::task::spawn(future);
}
}
/// Initializes the metrics context, and starts an HTTP server
/// to serve metrics.
pub async fn init_prometheus(
prometheus_addr: SocketAddr,
registry: Registry,
) -> Result<(), Error> {
let listener = async_std::net::TcpListener::bind(&prometheus_addr)
.await
.map_err(|_| Error::PortInUse(prometheus_addr))?;
init_prometheus_with_listener(listener, registry).await
}
/// Init prometheus using the given listener.
pub(crate) async fn init_prometheus_with_listener(
listener: async_std::net::TcpListener,
registry: Registry,
) -> Result<(), Error> {
use networking::Incoming;
log::info!("〽️ Prometheus exporter started at {}", listener.local_addr()?);
let service = make_service_fn(move |_| {
let registry = registry.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
request_metrics(req, registry.clone())
}))
}
});
let server =
Server::builder(Incoming(listener.incoming())).executor(Executor).serve(service);
let result = server.await.map_err(Into::into);
result
}
result
}
#[cfg(test)]
@@ -197,7 +164,7 @@ mod tests {
)
.expect("Registers the test metric");
runtime.spawn(known_os::init_prometheus_with_listener(listener, registry));
runtime.spawn(init_prometheus_with_listener(listener, registry));
runtime.block_on(async {
let client = Client::new();