diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index e9b9ca337c..305b609708 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2221,6 +2221,7 @@ dependencies = [ "httparse", "itoa", "log 0.4.8", + "net2", "pin-project", "time", "tokio 0.2.11", @@ -2230,19 +2231,19 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.17.1" +version = "0.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "719d85c7df4a7f309a77d145340a063ea929dcb2e025bae46a80345cffec2952" +checksum = "f6ea6215c7314d450ee45970ab8b3851ab447a0e6bafdd19e31b20a42dbb7faf" dependencies = [ - "bytes 0.4.12", + "bytes 0.5.4", "ct-logs", - "futures 0.1.29", - "hyper 0.12.35", + "futures-util", + "hyper 0.13.2", "rustls", - "tokio-io", + "rustls-native-certs", + "tokio 0.2.11", "tokio-rustls", "webpki", - "webpki-roots 0.17.0", ] [[package]] @@ -5475,6 +5476,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls-native-certs" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51ffebdbb48c14f84eba0b715197d673aff1dd22cc1007ca647e28483bbcc307" +dependencies = [ + "openssl-probe", + "rustls", + "schannel", + "security-framework", +] + [[package]] name = "rustversion" version = "1.0.2" @@ -6243,13 +6256,12 @@ dependencies = [ name = "sc-offchain" version = "2.0.0" dependencies = [ - "bytes 0.4.12", + "bytes 0.5.4", "env_logger 0.7.1", "fnv", - "futures 0.1.29", "futures 0.3.4", "futures-timer 3.0.1", - "hyper 0.12.35", + "hyper 0.13.2", "hyper-rustls", "log 0.4.8", "num_cpus", @@ -6268,7 +6280,7 @@ dependencies = [ "sp-transaction-pool", "substrate-test-runtime-client", "threadpool", - "tokio 0.1.22", + "tokio 0.2.11", ] [[package]] @@ -6418,7 +6430,6 @@ dependencies = [ "sysinfo", "target_info", "tokio 0.2.11", - "tokio-executor 0.1.10", "tracing", "wasm-timer", ] @@ -8011,6 +8022,7 @@ checksum = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b" dependencies = [ "bytes 0.5.4", "fnv", + "iovec", "lazy_static", "libc", "memchr", @@ -8019,6 +8031,7 @@ dependencies = [ "num_cpus", "pin-project-lite", "signal-hook-registry", + "slab", "tokio-macros", "winapi 0.3.8", ] @@ -8130,15 +8143,13 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.10.3" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d7cf08f990090abd6c6a73cab46fed62f85e8aef8b99e4b918a9f4a637f0676" +checksum = "141afec0978abae6573065a48882c6bae44c5cc61db9b511ac4abf6a09bfd9cc" dependencies = [ - "bytes 0.4.12", - "futures 0.1.29", - "iovec", + "futures-core", "rustls", - "tokio-io", + "tokio 0.2.11", "webpki", ] @@ -8501,7 +8512,6 @@ checksum = "c689459fbaeb50e56c6749275f084decfd02194ac5852e6617d95d0d3cf02eaf" dependencies = [ "bytes 0.5.4", "futures_codec", - "tokio-util", ] [[package]] diff --git a/substrate/client/network/Cargo.toml b/substrate/client/network/Cargo.toml index d16f3d81c1..f2d50b988a 100644 --- a/substrate/client/network/Cargo.toml +++ b/substrate/client/network/Cargo.toml @@ -51,7 +51,7 @@ sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" } substrate-test-client = { version = "2.0.0", optional = true, path = "../../test-utils/client" } substrate-test-runtime-client = { version = "2.0.0", optional = true, path = "../../test-utils/runtime/client" } thiserror = "1" -unsigned-varint = { version = "0.3.0", features = ["codec"] } +unsigned-varint = { version = "0.3.0", features = ["futures-codec"] } void = "1.0.2" zeroize = "1.0.0" yamux = "0.4.2" diff --git a/substrate/client/offchain/Cargo.toml b/substrate/client/offchain/Cargo.toml index ed5db3f2c7..29e6dca2a7 100644 --- a/substrate/client/offchain/Cargo.toml +++ b/substrate/client/offchain/Cargo.toml @@ -7,11 +7,10 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] -bytes = "0.4.12" +bytes = "0.5" sc-client-api = { version = "2.0.0", path = "../api" } sp-api = { version = "2.0.0", path = "../../primitives/api" } fnv = "1.0.6" -futures01 = { package = "futures", version = "0.1" } futures = "0.3.1" futures-timer = "3.0.1" log = "0.4.8" @@ -27,14 +26,14 @@ sc-network = { version = "0.8", path = "../network" } sc-keystore = { version = "2.0.0", path = "../keystore" } [target.'cfg(not(target_os = "unknown"))'.dependencies] -hyper = "0.12.35" -hyper-rustls = "0.17.1" +hyper = "0.13.2" +hyper-rustls = "0.19" [dev-dependencies] sc-client-db = { version = "0.8", default-features = true, path = "../db/" } env_logger = "0.7.0" substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" } -tokio = "0.1.22" +tokio = "0.2" sc-transaction-pool = { version = "2.0.0", path = "../../client/transaction-pool" } sp-transaction-pool = { version = "2.0.0", path = "../../primitives/transaction-pool" } diff --git a/substrate/client/offchain/src/api/http.rs b/substrate/client/offchain/src/api/http.rs index 7aa0963fcf..0483f84e2f 100644 --- a/substrate/client/offchain/src/api/http.rs +++ b/substrate/client/offchain/src/api/http.rs @@ -26,9 +26,9 @@ //! actively calling any function. use crate::api::timestamp; -use bytes::Buf as _; +use bytes::buf::ext::{Reader, BufExt}; use fnv::FnvHashMap; -use futures::{prelude::*, channel::mpsc, compat::Compat01As03}; +use futures::{prelude::*, future, channel::mpsc}; use log::error; use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError}; use std::{fmt, io::Read as _, mem, pin::Pin, task::Context, task::Poll}; @@ -50,7 +50,7 @@ pub fn http() -> (HttpApi, HttpWorker) { let engine = HttpWorker { to_api, from_api, - http_client: hyper::Client::builder().build(hyper_rustls::HttpsConnector::new(1)), + http_client: hyper::Client::builder().build(hyper_rustls::HttpsConnector::new()), requests: Vec::new(), }; @@ -103,10 +103,10 @@ struct HttpApiRequestRp { /// Elements extracted from the channel are first put into `current_read_chunk`. /// If the channel produces an error, then that is translated into an `IoError` and the request /// is removed from the list. - body: stream::Fuse>>, + body: stream::Fuse>>, /// Chunk that has been extracted from the channel and that is currently being read. /// Reading data from the response should read from this field in priority. - current_read_chunk: Option>, + current_read_chunk: Option>, } impl HttpApi { @@ -122,7 +122,7 @@ impl HttpApi { let (body_sender, body) = hyper::Body::channel(); let mut request = hyper::Request::new(body); *request.method_mut() = hyper::Method::from_bytes(method.as_bytes()).map_err(|_| ())?; - *request.uri_mut() = hyper::Uri::from_shared(From::from(uri)).map_err(|_| ())?; + *request.uri_mut() = hyper::Uri::from_maybe_shared(uri.to_owned()).map_err(|_| ())?; let new_id = self.next_id; debug_assert!(!self.requests.contains_key(&new_id)); @@ -177,9 +177,7 @@ impl HttpApi { // (if the body has been written), or `DeadlineReached`, or `IoError`. // If `IoError` is returned, don't forget to remove the request from the list. let mut poll_sender = move |sender: &mut hyper::body::Sender| -> Result<(), HttpError> { - let mut when_ready = future::maybe_done(Compat01As03::new( - futures01::future::poll_fn(|| sender.poll_ready()) - )); + let mut when_ready = future::maybe_done(future::poll_fn(|cx| sender.poll_ready(cx))); futures::executor::block_on(future::select(&mut when_ready, &mut deadline)); match when_ready { future::MaybeDone::Done(Ok(())) => {} @@ -191,13 +189,11 @@ impl HttpApi { } }; - match sender.send_data(hyper::Chunk::from(chunk.to_owned())) { - Ok(()) => Ok(()), - Err(_chunk) => { + futures::executor::block_on(sender.send_data(hyper::body::Bytes::from(chunk.to_owned()))) + .map_err(|_| { error!("HTTP sender refused data despite being ready"); - Err(HttpError::IoError) - }, - } + HttpError::IoError + }) }; loop { @@ -538,7 +534,7 @@ enum WorkerToApi { /// the next item. /// Can also be used to send an error, in case an error happend on the HTTP socket. After /// an error is sent, the channel will close. - body: mpsc::Receiver>, + body: mpsc::Receiver>, }, /// A request has failed because of an error. The request is then no longer valid. Fail { @@ -564,13 +560,13 @@ pub struct HttpWorker { /// HTTP request being processed by the worker. enum HttpWorkerRequest { /// Request has been dispatched and is waiting for a response from the Internet. - Dispatched(Compat01As03), + Dispatched(hyper::client::ResponseFuture), /// Progressively reading the body of the response and sending it to the channel. ReadBody { /// Body to read `Chunk`s from. Only used if the channel is ready to accept data. - body: Compat01As03, + body: hyper::Body, /// Channel to the [`HttpApi`] where we send the chunks to. - tx: mpsc::Sender>, + tx: mpsc::Sender>, }, } @@ -608,7 +604,7 @@ impl Future for HttpWorker { // We received a response! Decompose it into its parts. let status_code = response.status(); let headers = mem::replace(response.headers_mut(), hyper::HeaderMap::new()); - let body = Compat01As03::new(response.into_body()); + let body = response.into_body(); let (body_tx, body_rx) = mpsc::channel(3); let _ = me.to_api.unbounded_send(WorkerToApi::Response { @@ -660,7 +656,7 @@ impl Future for HttpWorker { Poll::Pending => {}, Poll::Ready(None) => return Poll::Ready(()), // stops the worker Poll::Ready(Some(ApiToWorker::Dispatch { id, request })) => { - let future = Compat01As03::new(me.http_client.request(request)); + let future = me.http_client.request(request); debug_assert!(me.requests.iter().all(|(i, _)| *i != id)); me.requests.push((id, HttpWorkerRequest::Dispatched(future))); cx.waker().wake_by_ref(); // reschedule the task to poll the request @@ -692,31 +688,36 @@ impl fmt::Debug for HttpWorkerRequest { #[cfg(test)] mod tests { + use core::convert::Infallible; use crate::api::timestamp; use super::http; - use futures::prelude::*; - use futures01::Future as _; use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration}; // Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP // server that runs in the background as well. macro_rules! build_api_server { () => {{ + fn tokio_run(future: impl std::future::Future) { + let _ = tokio::runtime::Runtime::new().unwrap().block_on(future); + } + let (api, worker) = http(); - // Note: we have to use tokio because hyper still uses old futures. - std::thread::spawn(move || { - tokio::run(futures::compat::Compat::new(worker.map(|()| Ok::<(), ()>(())))) - }); + std::thread::spawn(move || tokio_run(worker)); + let (addr_tx, addr_rx) = std::sync::mpsc::channel(); std::thread::spawn(move || { - let server = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap()) - .serve(|| { - hyper::service::service_fn_ok(move |_: hyper::Request| { - hyper::Response::new(hyper::Body::from("Hello World!")) - }) - }); - let _ = addr_tx.send(server.local_addr()); - hyper::rt::run(server.map_err(|e| panic!("{:?}", e))); + tokio_run(async move { + let server = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap()) + .serve(hyper::service::make_service_fn(|_| { async move { + Ok::<_, Infallible>(hyper::service::service_fn(move |_req| async move { + Ok::<_, Infallible>( + hyper::Response::new(hyper::Body::from("Hello World!")) + ) + })) + }})); + let _ = addr_tx.send(server.local_addr()); + server.await + }); }); (api, addr_rx.recv().unwrap()) }}; diff --git a/substrate/client/service/Cargo.toml b/substrate/client/service/Cargo.toml index 7f7ef80b16..1e781aa695 100644 --- a/substrate/client/service/Cargo.toml +++ b/substrate/client/service/Cargo.toml @@ -23,7 +23,6 @@ parking_lot = "0.10.0" lazy_static = "1.4.0" log = "0.4.8" slog = { version = "2.5.2", features = ["nested-values"] } -tokio-executor = "0.1.8" futures-timer = "3.0.1" wasm-timer = "0.2" exit-future = "0.2.0" diff --git a/substrate/utils/grafana-data-source/Cargo.toml b/substrate/utils/grafana-data-source/Cargo.toml index e1bd037009..c49ee963f0 100644 --- a/substrate/utils/grafana-data-source/Cargo.toml +++ b/substrate/utils/grafana-data-source/Cargo.toml @@ -8,8 +8,6 @@ edition = "2018" [dependencies] log = "0.4.8" -hyper = { version = "0.13.1", default-features = false, features = ["stream"] } -tokio = "0.2" futures-util = { version = "0.3.1", default-features = false, features = ["io"] } serde_json = "1" serde = { version = "1", features = ["derive"] } @@ -21,3 +19,5 @@ derive_more = "0.99" [target.'cfg(not(target_os = "unknown"))'.dependencies] async-std = { version = "1.0.1", features = ["unstable"] } +hyper = { version = "0.13.1", default-features = false, features = ["stream"] } +tokio = "0.2" diff --git a/substrate/utils/grafana-data-source/src/lib.rs b/substrate/utils/grafana-data-source/src/lib.rs index fbba064706..bc40fc39bb 100644 --- a/substrate/utils/grafana-data-source/src/lib.rs +++ b/substrate/utils/grafana-data-source/src/lib.rs @@ -72,11 +72,13 @@ pub fn record_metrics_slice(metrics: &[(&str, f32)]) -> Result<(), Error> { #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { /// Hyper internal error. + #[cfg(not(target_os = "unknown"))] Hyper(hyper::Error), + /// Http request error. + #[cfg(not(target_os = "unknown"))] + Http(hyper::http::Error), /// Serialization/deserialization error. Serde(serde_json::Error), - /// Http request error. - Http(hyper::http::Error), /// Timestamp error. Timestamp(TryFromIntError), /// i/o error. @@ -86,9 +88,11 @@ pub enum Error { impl std::error::Error for Error { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { + #[cfg(not(target_os = "unknown"))] Error::Hyper(error) => Some(error), - Error::Serde(error) => Some(error), + #[cfg(not(target_os = "unknown"))] Error::Http(error) => Some(error), + Error::Serde(error) => Some(error), Error::Timestamp(error) => Some(error), Error::Io(error) => Some(error) } diff --git a/substrate/utils/grafana-data-source/src/server.rs b/substrate/utils/grafana-data-source/src/server.rs index 8ef5e03784..f2f06f7688 100644 --- a/substrate/utils/grafana-data-source/src/server.rs +++ b/substrate/utils/grafana-data-source/src/server.rs @@ -15,12 +15,15 @@ // along with Substrate. If not, see . use serde::{Serialize, de::DeserializeOwned}; -use hyper::{Body, Request, Response, header, service::{service_fn, make_service_fn}, Server}; use chrono::{Duration, Utc}; use futures_util::{FutureExt, TryStreamExt, future::{Future, select, Either}}; use futures_timer::Delay; use crate::{DATABASE, Error, types::{Target, Query, TimeseriesData, Range}}; +#[cfg(not(target_os = "unknown"))] +use hyper::{Body, Request, Response, header, service::{service_fn, make_service_fn}, Server}; + +#[cfg(not(target_os = "unknown"))] async fn api_response(req: Request) -> Result, Error> { match req.uri().path() { "/search" => { @@ -57,6 +60,7 @@ async fn api_response(req: Request) -> Result, Error> { } } +#[cfg(not(target_os = "unknown"))] async fn map_request_to_response(req: Request, transformation: T) -> Result, Error> where Req: DeserializeOwned,