offchain: Upgrade hyper to 0.13, which uses tokio 0.2 (#4860)

* service: Don't depend on tokio-executor

Seems to be a leftover dependency that's not used anymore.

* offchain: Upgrade hyper to 0.13, which uses tokio 0.2

* offchain: Adapt HTTP tests to Tokio 0.2

* network: Don't transitively include tokio 0.2 in WASM

1) We don't specifically depend on Tokio codec impls
2) Conflating features in Cargo means that enabling Tokio runtime
in the native environment will also do so in WASM, where it's
obviously not implemented and causes a compilation error.

* grafana-data-source: Pull hyper/tokio only in native environment
This commit is contained in:
Igor Matuszewski
2020-02-18 10:54:00 +01:00
committed by GitHub
parent 0049a93af0
commit aea2e9427b
8 changed files with 84 additions and 67 deletions
+36 -35
View File
@@ -26,9 +26,9 @@
//! actively calling any function.
use crate::api::timestamp;
use bytes::Buf as _;
use bytes::buf::ext::{Reader, BufExt};
use fnv::FnvHashMap;
use futures::{prelude::*, channel::mpsc, compat::Compat01As03};
use futures::{prelude::*, future, channel::mpsc};
use log::error;
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
use std::{fmt, io::Read as _, mem, pin::Pin, task::Context, task::Poll};
@@ -50,7 +50,7 @@ pub fn http() -> (HttpApi, HttpWorker) {
let engine = HttpWorker {
to_api,
from_api,
http_client: hyper::Client::builder().build(hyper_rustls::HttpsConnector::new(1)),
http_client: hyper::Client::builder().build(hyper_rustls::HttpsConnector::new()),
requests: Vec::new(),
};
@@ -103,10 +103,10 @@ struct HttpApiRequestRp {
/// Elements extracted from the channel are first put into `current_read_chunk`.
/// If the channel produces an error, then that is translated into an `IoError` and the request
/// is removed from the list.
body: stream::Fuse<mpsc::Receiver<Result<hyper::Chunk, hyper::Error>>>,
body: stream::Fuse<mpsc::Receiver<Result<hyper::body::Bytes, hyper::Error>>>,
/// Chunk that has been extracted from the channel and that is currently being read.
/// Reading data from the response should read from this field in priority.
current_read_chunk: Option<bytes::Reader<hyper::Chunk>>,
current_read_chunk: Option<Reader<hyper::body::Bytes>>,
}
impl HttpApi {
@@ -122,7 +122,7 @@ impl HttpApi {
let (body_sender, body) = hyper::Body::channel();
let mut request = hyper::Request::new(body);
*request.method_mut() = hyper::Method::from_bytes(method.as_bytes()).map_err(|_| ())?;
*request.uri_mut() = hyper::Uri::from_shared(From::from(uri)).map_err(|_| ())?;
*request.uri_mut() = hyper::Uri::from_maybe_shared(uri.to_owned()).map_err(|_| ())?;
let new_id = self.next_id;
debug_assert!(!self.requests.contains_key(&new_id));
@@ -177,9 +177,7 @@ impl HttpApi {
// (if the body has been written), or `DeadlineReached`, or `IoError`.
// If `IoError` is returned, don't forget to remove the request from the list.
let mut poll_sender = move |sender: &mut hyper::body::Sender| -> Result<(), HttpError> {
let mut when_ready = future::maybe_done(Compat01As03::new(
futures01::future::poll_fn(|| sender.poll_ready())
));
let mut when_ready = future::maybe_done(future::poll_fn(|cx| sender.poll_ready(cx)));
futures::executor::block_on(future::select(&mut when_ready, &mut deadline));
match when_ready {
future::MaybeDone::Done(Ok(())) => {}
@@ -191,13 +189,11 @@ impl HttpApi {
}
};
match sender.send_data(hyper::Chunk::from(chunk.to_owned())) {
Ok(()) => Ok(()),
Err(_chunk) => {
futures::executor::block_on(sender.send_data(hyper::body::Bytes::from(chunk.to_owned())))
.map_err(|_| {
error!("HTTP sender refused data despite being ready");
Err(HttpError::IoError)
},
}
HttpError::IoError
})
};
loop {
@@ -538,7 +534,7 @@ enum WorkerToApi {
/// the next item.
/// Can also be used to send an error, in case an error happend on the HTTP socket. After
/// an error is sent, the channel will close.
body: mpsc::Receiver<Result<hyper::Chunk, hyper::Error>>,
body: mpsc::Receiver<Result<hyper::body::Bytes, hyper::Error>>,
},
/// A request has failed because of an error. The request is then no longer valid.
Fail {
@@ -564,13 +560,13 @@ pub struct HttpWorker {
/// HTTP request being processed by the worker.
enum HttpWorkerRequest {
/// Request has been dispatched and is waiting for a response from the Internet.
Dispatched(Compat01As03<hyper::client::ResponseFuture>),
Dispatched(hyper::client::ResponseFuture),
/// Progressively reading the body of the response and sending it to the channel.
ReadBody {
/// Body to read `Chunk`s from. Only used if the channel is ready to accept data.
body: Compat01As03<hyper::Body>,
body: hyper::Body,
/// Channel to the [`HttpApi`] where we send the chunks to.
tx: mpsc::Sender<Result<hyper::Chunk, hyper::Error>>,
tx: mpsc::Sender<Result<hyper::body::Bytes, hyper::Error>>,
},
}
@@ -608,7 +604,7 @@ impl Future for HttpWorker {
// We received a response! Decompose it into its parts.
let status_code = response.status();
let headers = mem::replace(response.headers_mut(), hyper::HeaderMap::new());
let body = Compat01As03::new(response.into_body());
let body = response.into_body();
let (body_tx, body_rx) = mpsc::channel(3);
let _ = me.to_api.unbounded_send(WorkerToApi::Response {
@@ -660,7 +656,7 @@ impl Future for HttpWorker {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(()), // stops the worker
Poll::Ready(Some(ApiToWorker::Dispatch { id, request })) => {
let future = Compat01As03::new(me.http_client.request(request));
let future = me.http_client.request(request);
debug_assert!(me.requests.iter().all(|(i, _)| *i != id));
me.requests.push((id, HttpWorkerRequest::Dispatched(future)));
cx.waker().wake_by_ref(); // reschedule the task to poll the request
@@ -692,31 +688,36 @@ impl fmt::Debug for HttpWorkerRequest {
#[cfg(test)]
mod tests {
use core::convert::Infallible;
use crate::api::timestamp;
use super::http;
use futures::prelude::*;
use futures01::Future as _;
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration};
// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
// server that runs in the background as well.
macro_rules! build_api_server {
() => {{
fn tokio_run<T>(future: impl std::future::Future<Output = T>) {
let _ = tokio::runtime::Runtime::new().unwrap().block_on(future);
}
let (api, worker) = http();
// Note: we have to use tokio because hyper still uses old futures.
std::thread::spawn(move || {
tokio::run(futures::compat::Compat::new(worker.map(|()| Ok::<(), ()>(()))))
});
std::thread::spawn(move || tokio_run(worker));
let (addr_tx, addr_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let server = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap())
.serve(|| {
hyper::service::service_fn_ok(move |_: hyper::Request<hyper::Body>| {
hyper::Response::new(hyper::Body::from("Hello World!"))
})
});
let _ = addr_tx.send(server.local_addr());
hyper::rt::run(server.map_err(|e| panic!("{:?}", e)));
tokio_run(async move {
let server = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap())
.serve(hyper::service::make_service_fn(|_| { async move {
Ok::<_, Infallible>(hyper::service::service_fn(move |_req| async move {
Ok::<_, Infallible>(
hyper::Response::new(hyper::Body::from("Hello World!"))
)
}))
}}));
let _ = addr_tx.send(server.local_addr());
server.await
});
});
(api, addr_rx.recv().unwrap())
}};