From 7df089241c599bc8e363179053524a3dced2c423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 24 Aug 2019 06:54:14 +0200 Subject: [PATCH] Implement HTTP request in offchain workers (#3461) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Implement HTTP request in offchain workers * Bump impl_version * Don't compile offchain workers for WASM anymore * Initialize HttpConnector as a fallback. * Apply review suggestions 😳 --- substrate/.gitlab-ci.yml | 3 +- substrate/Cargo.lock | 21 + substrate/core/offchain/Cargo.toml | 8 + substrate/core/offchain/src/api.rs | 91 +- substrate/core/offchain/src/api/http.rs | 990 +++++++++++++++++++ substrate/core/offchain/src/api/timestamp.rs | 60 ++ substrate/core/offchain/src/testing.rs | 2 +- substrate/core/primitives/src/offchain.rs | 85 +- substrate/core/sr-io/src/offchain/http.rs | 6 +- substrate/core/sr-io/without_std.rs | 2 +- 10 files changed, 1194 insertions(+), 74 deletions(-) create mode 100644 substrate/core/offchain/src/api/http.rs create mode 100644 substrate/core/offchain/src/api/timestamp.rs diff --git a/substrate/.gitlab-ci.yml b/substrate/.gitlab-ci.yml index 7889b52afa..872f916f4c 100644 --- a/substrate/.gitlab-ci.yml +++ b/substrate/.gitlab-ci.yml @@ -202,7 +202,6 @@ check-web-wasm: - time cargo web build -p substrate-keystore - time cargo web build -p substrate-executor - time cargo web build -p substrate-network - - time cargo web build -p substrate-offchain - time cargo web build -p substrate-panic-handler - time cargo web build -p substrate-peerset - time cargo web build -p substrate-primitives @@ -336,7 +335,7 @@ check_warnings: - docker push $CONTAINER_IMAGE:$VERSION - docker push $CONTAINER_IMAGE:latest -publish-docker-substrate: +publish-docker-substrate: stage: publish <<: *publish-docker-release # collect VERSION artifact here to pass it on to kubernetes diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index f69b2f4063..2967d167da 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -1322,6 +1322,18 @@ dependencies = [ "want 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hyper-tls" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.12.33 (registry+https://github.com/rust-lang/crates.io-index)", + "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "idna" version = "0.1.5" @@ -4866,11 +4878,18 @@ dependencies = [ name = "substrate-offchain" version = "2.0.0" dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.12.33 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 2.0.0", "substrate-client 2.0.0", "substrate-client-db 2.0.0", @@ -4880,6 +4899,7 @@ dependencies = [ "substrate-primitives 2.0.0", "substrate-test-runtime-client 2.0.0", "substrate-transaction-pool 2.0.0", + "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -6366,6 +6386,7 @@ dependencies = [ "checksum humantime 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3ca7e5f2e110db35f93b837c81797f3714500b81d517bf20c431b16d3ca4f114" "checksum hyper 0.10.16 (registry+https://github.com/rust-lang/crates.io-index)" = "0a0652d9a2609a968c14be1a9ea00bf4b1d64e2e1f53a1b51b6fff3a6e829273" "checksum hyper 0.12.33 (registry+https://github.com/rust-lang/crates.io-index)" = "7cb44cbce9d8ee4fb36e4c0ad7b794ac44ebaad924b9c8291a63215bb44c2c8f" +"checksum hyper-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3a800d6aa50af4b5850b2b0f659625ce9504df908e9733b635720483be26174f" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum impl-codec 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "78c441b3d2b5e24b407161e76d482b7bbd29b5da357707839ac40d95152f031f" "checksum impl-serde 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5158079de9d4158e0ce1de3ae0bd7be03904efc40b3d7dd8b8c301cbf6b52b56" diff --git a/substrate/core/offchain/Cargo.toml b/substrate/core/offchain/Cargo.toml index 4c8891eb6b..691fdea91c 100644 --- a/substrate/core/offchain/Cargo.toml +++ b/substrate/core/offchain/Cargo.toml @@ -7,13 +7,20 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] +bytes = "0.4" client = { package = "substrate-client", path = "../../core/client" } +fnv = "1.0" +futures01 = { package = "futures", version = "0.1" } futures-preview = "=0.3.0-alpha.17" +futures-timer = "0.2.1" +hyper = "0.12.33" +hyper-tls = "0.3.2" log = "0.4" offchain-primitives = { package = "substrate-offchain-primitives", path = "./primitives" } codec = { package = "parity-scale-codec", version = "1.0.0", features = ["derive"] } parking_lot = "0.9.0" primitives = { package = "substrate-primitives", path = "../../core/primitives" } +rand = "0.7" sr-primitives = { path = "../../core/sr-primitives" } transaction_pool = { package = "substrate-transaction-pool", path = "../../core/transaction-pool" } network = { package = "substrate-network", path = "../../core/network" } @@ -23,6 +30,7 @@ keystore = { package = "substrate-keystore", path = "../keystore" } env_logger = "0.6" client-db = { package = "substrate-client-db", path = "../../core/client/db/", default-features = true } test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } +tokio = "0.1" [features] default = [] diff --git a/substrate/core/offchain/src/api.rs b/substrate/core/offchain/src/api.rs index 225e7c3f72..0057dfd273 100644 --- a/substrate/core/offchain/src/api.rs +++ b/substrate/core/offchain/src/api.rs @@ -17,13 +17,12 @@ use std::{ str::FromStr, sync::Arc, - convert::{TryFrom, TryInto}, - time::{SystemTime, Duration}, + convert::TryFrom, thread::sleep, }; use client::backend::OffchainStorage; -use futures::{StreamExt as _, Future, future, channel::mpsc}; +use futures::{StreamExt as _, Future, FutureExt as _, future, channel::mpsc}; use log::{info, debug, warn, error}; use network::{PeerId, Multiaddr, NetworkStateInfo}; use codec::{Encode, Decode}; @@ -34,6 +33,9 @@ use primitives::offchain::{ use sr_primitives::{generic::BlockId, traits::{self, Extrinsic}}; use transaction_pool::txpool::{Pool, ChainApi}; +mod http; +mod timestamp; + /// A message between the offchain extension and the processing thread. enum ExtMessage { SubmitExtrinsic(Vec), @@ -49,6 +51,8 @@ pub(crate) struct Api { _at: BlockId, /// Is this node a potential validator? is_validator: bool, + /// Everything HTTP-related is handled by a different struct. + http: http::HttpApi, } fn unavailable_yet(name: &str) -> R { @@ -89,29 +93,11 @@ where } fn timestamp(&mut self) -> Timestamp { - let now = SystemTime::now(); - let epoch_duration = now.duration_since(SystemTime::UNIX_EPOCH); - match epoch_duration { - Err(_) => { - // Current time is earlier than UNIX_EPOCH. - Timestamp::from_unix_millis(0) - }, - Ok(d) => { - let duration = d.as_millis(); - // Assuming overflow won't happen for a few hundred years. - Timestamp::from_unix_millis(duration.try_into() - .expect("epoch milliseconds won't overflow u64 for hundreds of years; qed")) - } - } + timestamp::now() } fn sleep_until(&mut self, deadline: Timestamp) { - // Get current timestamp. - let now = self.timestamp(); - // Calculate the diff with the deadline. - let diff = deadline.diff(&now); - // Call thread::sleep for the diff duration. - sleep(Duration::from_millis(diff.millis())); + sleep(timestamp::timestamp_from_now(deadline)); } fn random_seed(&mut self) -> [u8; 32] { @@ -149,58 +135,53 @@ where fn http_request_start( &mut self, - _method: &str, - _uri: &str, + method: &str, + uri: &str, _meta: &[u8] ) -> Result { - unavailable_yet::<()>("http_request_start"); - Err(()) + self.http.request_start(method, uri) } fn http_request_add_header( &mut self, - _request_id: HttpRequestId, - _name: &str, - _value: &str + request_id: HttpRequestId, + name: &str, + value: &str ) -> Result<(), ()> { - unavailable_yet::<()>("http_request_add_header"); - Err(()) + self.http.request_add_header(request_id, name, value) } fn http_request_write_body( &mut self, - _request_id: HttpRequestId, - _chunk: &[u8], - _deadline: Option + request_id: HttpRequestId, + chunk: &[u8], + deadline: Option ) -> Result<(), HttpError> { - unavailable_yet::<()>("http_request_write_body"); - Err(HttpError::IoError) + self.http.request_write_body(request_id, chunk, deadline) } fn http_response_wait( &mut self, ids: &[HttpRequestId], - _deadline: Option + deadline: Option ) -> Vec { - unavailable_yet::<()>("http_response_wait"); - ids.iter().map(|_| HttpRequestStatus::Unknown).collect() + self.http.response_wait(ids, deadline) } fn http_response_headers( &mut self, - _request_id: HttpRequestId + request_id: HttpRequestId ) -> Vec<(Vec, Vec)> { - unavailable_yet("http_response_headers") + self.http.response_headers(request_id) } fn http_response_read_body( &mut self, - _request_id: HttpRequestId, - _buffer: &mut [u8], - _deadline: Option + request_id: HttpRequestId, + buffer: &mut [u8], + deadline: Option ) -> Result { - unavailable_yet::<()>("http_response_read_body"); - Err(HttpError::IoError) + self.http.response_read_body(request_id, buffer, deadline) } } @@ -276,6 +257,8 @@ pub(crate) struct AsyncApi { receiver: Option>, transaction_pool: Arc>, at: BlockId, + /// Everything HTTP-related is handled by a different struct. + http: Option, } impl AsyncApi { @@ -289,18 +272,22 @@ impl AsyncApi { ) -> (Api, AsyncApi) { let (sender, rx) = mpsc::unbounded(); + let (http_api, http_worker) = http::http(); + let api = Api { sender, db, network_state, _at: at, is_validator, + http: http_api, }; let async_api = AsyncApi { receiver: Some(rx), transaction_pool, at, + http: Some(http_worker), }; (api, async_api) @@ -309,13 +296,17 @@ impl AsyncApi { /// Run a processing task for the API pub fn process(mut self) -> impl Future { let receiver = self.receiver.take().expect("Take invoked only once."); + let http = self.http.take().expect("Take invoked only once."); - receiver.for_each(move |msg| { + let extrinsics = receiver.for_each(move |msg| { match msg { ExtMessage::SubmitExtrinsic(ext) => self.submit_extrinsic(ext), } future::ready(()) - }) + }); + + future::join(extrinsics, http) + .map(|((), ())| ()) } fn submit_extrinsic(&mut self, ext: Vec) { @@ -340,7 +331,7 @@ impl AsyncApi { #[cfg(test)] mod tests { use super::*; - use std::convert::TryFrom; + use std::{convert::{TryFrom, TryInto}, time::SystemTime}; use sr_primitives::traits::Zero; use client_db::offchain::LocalStorage; use network::PeerId; diff --git a/substrate/core/offchain/src/api/http.rs b/substrate/core/offchain/src/api/http.rs new file mode 100644 index 0000000000..287e3f99be --- /dev/null +++ b/substrate/core/offchain/src/api/http.rs @@ -0,0 +1,990 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! This module is composed of two structs: [`HttpApi`] and [`HttpWorker`]. Calling the [`http`] +//! function returns a pair of [`HttpApi`] and [`HttpWorker`] that share some state. +//! +//! The [`HttpApi`] is (indirectly) passed to the runtime when calling an offchain worker, while +//! the [`HttpWorker`] must be processed in the background. The [`HttpApi`] mimicks the API of the +//! HTTP-related methods available to offchain workers. +//! +//! The reason for this design is driven by the fact that HTTP requests should continue running +//! (i.e.: the socket should continue being processed) in the background even if the runtime isn't +//! actively calling any function. + +use crate::api::timestamp; +use bytes::Buf as _; +use fnv::FnvHashMap; +use futures::{prelude::*, channel::mpsc, compat::Compat01As03}; +use log::{warn, error}; +use primitives::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError}; +use std::{fmt, io::Read as _, mem, pin::Pin, task::Context, task::Poll}; + +/// Creates a pair of [`HttpApi`] and [`HttpWorker`]. +pub fn http() -> (HttpApi, HttpWorker) { + let (to_worker, from_api) = mpsc::unbounded(); + let (to_api, from_worker) = mpsc::unbounded(); + + let api = HttpApi { + to_worker, + from_worker: from_worker.fuse(), + // We start with a random ID for the first HTTP request, to prevent mischievous people from + // writing runtime code with hardcoded IDs. + next_id: HttpRequestId(rand::random::() % 2000), + requests: FnvHashMap::default(), + }; + + let engine = HttpWorker { + to_api, + from_api, + // TODO: don't unwrap; we should fall back to the HttpConnector if we fail to create the + // Https one; there doesn't seem to be any built-in way to do this + http_client: HyperClient::new(), + requests: Vec::new(), + }; + + (api, engine) +} + +/// Provides HTTP capabilities. +/// +/// Since this struct is a helper for offchain workers, its API is mimicking the API provided +/// to offchain workers. +pub struct HttpApi { + /// Used to sends messages to the worker. + to_worker: mpsc::UnboundedSender, + /// Used to receive messages from the worker. + /// We use a `Fuse` in order to have an extra protection against panicking. + from_worker: stream::Fuse>, + /// Id to assign to the next HTTP request that is started. + next_id: HttpRequestId, + /// List of HTTP requests in preparation or in progress. + requests: FnvHashMap, +} + +/// One active request within `HttpApi`. +enum HttpApiRequest { + /// The request object is being constructed locally and not started yet. + NotDispatched(hyper::Request, hyper::body::Sender), + /// The request has been dispatched and we're in the process of sending out the body (if the + /// field is `Some`) or waiting for a response (if the field is `None`). + Dispatched(Option), + /// Received a response. + Response(HttpApiRequestRp), + /// A request has been dispatched then produced an error. + Fail(hyper::Error), +} + +/// A request within `HttpApi` that has received a response. +struct HttpApiRequestRp { + /// We might still be writing the request's body when the response comes. + /// This field allows to continue writing that body. + sending_body: Option, + /// Status code of the response. + status_code: hyper::StatusCode, + /// Headers of the response. + headers: hyper::HeaderMap, + /// Body of the response, as a channel of `Chunk` objects. + /// While the code is designed to drop the `Receiver` once it ends, we wrap it within a + /// `Fuse` in order to be extra precautious about panics. + body: stream::Fuse>>, + /// Chunk that has been extracted from the channel and that is currently being read. + /// Reading data from the response should read from this field in priority. + current_read_chunk: Option>, +} + +impl HttpApi { + /// Mimicks the corresponding method in the offchain API. + pub fn request_start( + &mut self, + method: &str, + uri: &str + ) -> Result { + // Start by building the prototype of the request. + // We do this first so that we don't touch anything in `self` if building the prototype + // fails. + let (body_sender, body) = hyper::Body::channel(); + let mut request = hyper::Request::new(body); + *request.method_mut() = hyper::Method::from_bytes(method.as_bytes()).map_err(|_| ())?; + *request.uri_mut() = hyper::Uri::from_shared(From::from(uri)).map_err(|_| ())?; + + let new_id = self.next_id; + debug_assert!(!self.requests.contains_key(&new_id)); + match self.next_id.0.checked_add(1) { + Some(new_id) => self.next_id.0 = new_id, + None => { + error!("Overflow in offchain worker HTTP request ID assignment"); + return Err(()); + } + }; + self.requests.insert(new_id, HttpApiRequest::NotDispatched(request, body_sender)); + + Ok(new_id) + } + + /// Mimicks the corresponding method in the offchain API. + pub fn request_add_header( + &mut self, + request_id: HttpRequestId, + name: &str, + value: &str + ) -> Result<(), ()> { + let request = match self.requests.get_mut(&request_id) { + Some(&mut HttpApiRequest::NotDispatched(ref mut rq, _)) => rq, + _ => return Err(()) + }; + + let name = hyper::header::HeaderName::from_bytes(name.as_bytes()).map_err(|_| ())?; + let value = hyper::header::HeaderValue::from_str(value).map_err(|_| ())?; + // Note that we're always appending headers and never replacing old values. + // We assume here that the user knows what they're doing. + request.headers_mut().append(name, value); + Ok(()) + } + + /// Mimicks the corresponding method in the offchain API. + pub fn request_write_body( + &mut self, + request_id: HttpRequestId, + chunk: &[u8], + deadline: Option + ) -> Result<(), HttpError> { + // Extract the request from the list. + // Don't forget to add it back if necessary when returning. + let mut request = match self.requests.remove(&request_id) { + None => return Err(HttpError::Invalid), + Some(r) => r, + }; + + let mut deadline = timestamp::deadline_to_future(deadline); + // Closure that writes data to a sender, taking the deadline into account. + // If `IoError` is returned, don't forget to destroy the request. + let mut poll_sender = move |sender: &mut hyper::body::Sender| -> Result<(), HttpError> { + let mut when_ready = future::maybe_done(Compat01As03::new( + futures01::future::poll_fn(|| sender.poll_ready()) + )); + futures::executor::block_on(future::select(&mut when_ready, &mut deadline)); + match when_ready { + future::MaybeDone::Done(Ok(())) => {} + future::MaybeDone::Done(Err(_)) => return Err(HttpError::IoError), + future::MaybeDone::Future(_) | + future::MaybeDone::Gone => { + debug_assert!(if let future::MaybeDone::Done(_) = deadline { true } else { false }); + return Err(HttpError::DeadlineReached) + } + }; + + match sender.send_data(hyper::Chunk::from(chunk.to_owned())) { + Ok(()) => Ok(()), + Err(_chunk) => { + error!("HTTP sender refused data despite being ready"); + Err(HttpError::IoError) + }, + } + }; + + loop { + request = match request { + HttpApiRequest::NotDispatched(request, sender) => { + // If the request is not dispatched yet, dispatch it and loop again. + let _ = self.to_worker.unbounded_send(ApiToWorker::Dispatch { + id: request_id, + request + }); + HttpApiRequest::Dispatched(Some(sender)) + } + + HttpApiRequest::Dispatched(Some(mut sender)) => + if !chunk.is_empty() { + match poll_sender(&mut sender) { + Err(HttpError::IoError) => return Err(HttpError::IoError), + other => { + self.requests.insert( + request_id, + HttpApiRequest::Dispatched(Some(sender)) + ); + return other + } + } + } else { + // Dropping the sender to finish writing. + self.requests.insert(request_id, HttpApiRequest::Dispatched(None)); + return Ok(()) + } + + HttpApiRequest::Response(mut response @ HttpApiRequestRp { sending_body: Some(_), .. }) => + if !chunk.is_empty() { + match poll_sender(response.sending_body.as_mut() + .expect("Can only enter this match branch if Some; qed")) { + Err(HttpError::IoError) => return Err(HttpError::IoError), + other => { + self.requests.insert(request_id, HttpApiRequest::Response(response)); + return other + } + } + + } else { + // Dropping the sender to finish writing. + self.requests.insert(request_id, HttpApiRequest::Response(HttpApiRequestRp { + sending_body: None, + ..response + })); + return Ok(()) + } + + HttpApiRequest::Fail(_) => + // If the request has already failed, return without putting back the request + // in the list. + return Err(HttpError::IoError), + + v @ HttpApiRequest::Dispatched(None) | + v @ HttpApiRequest::Response(HttpApiRequestRp { sending_body: None, .. }) => { + // We have already finished sending this body. + self.requests.insert(request_id, v); + return Err(HttpError::Invalid) + } + } + } + } + + /// Mimicks the corresponding method in the offchain API. + pub fn response_wait( + &mut self, + ids: &[HttpRequestId], + deadline: Option + ) -> Vec { + // First of all, dispatch all the non-dispatched requests and drop all senders so that the + // user can't write anymore data. + for id in ids { + match self.requests.get_mut(id) { + Some(HttpApiRequest::NotDispatched(_, _)) => {} + Some(HttpApiRequest::Dispatched(sending_body)) | + Some(HttpApiRequest::Response(HttpApiRequestRp { sending_body, .. })) => { + let _ = sending_body.take(); + continue + } + _ => continue + }; + + let (request, _sender) = match self.requests.remove(id) { + Some(HttpApiRequest::NotDispatched(rq, s)) => (rq, s), + _ => unreachable!("we checked for NotDispatched above; qed") + }; + + let _ = self.to_worker.unbounded_send(ApiToWorker::Dispatch { + id: *id, + request + }); + + // We also destroy the sender in order to forbid writing more data. + self.requests.insert(*id, HttpApiRequest::Dispatched(None)); + } + + let mut deadline = timestamp::deadline_to_future(deadline); + + loop { + // Within that loop, first try to see if we have all the elements for a response. + { + let mut output = Vec::with_capacity(ids.len()); + let mut must_wait_more = false; + for id in ids { + output.push(match self.requests.get_mut(id) { + None => HttpRequestStatus::Invalid, + Some(HttpApiRequest::NotDispatched(_, _)) => + unreachable!("we replaced all the NotDispatched with Dispatched earlier; qed"), + Some(HttpApiRequest::Dispatched(_)) => { + must_wait_more = true; + HttpRequestStatus::DeadlineReached + }, + Some(HttpApiRequest::Fail(_)) => HttpRequestStatus::IoError, + Some(HttpApiRequest::Response(HttpApiRequestRp { status_code, .. })) => + HttpRequestStatus::Finished(status_code.as_u16()), + }); + } + debug_assert_eq!(output.len(), ids.len()); + + // Are we ready to call `return`? + let is_done = if let future::MaybeDone::Done(_) = deadline { + true + } else if !must_wait_more { + true + } else { + false + }; + + if is_done { + // Requests in "fail" mode are purged before returning. + debug_assert_eq!(output.len(), ids.len()); + for n in (0..ids.len()).rev() { + if let HttpRequestStatus::IoError = output[n] { + self.requests.remove(&ids[n]); + } + } + return output + } + } + + // Grab next message, or call `continue` if deadline is reached. + let next_message = { + let mut next_msg = future::maybe_done(self.from_worker.next()); + futures::executor::block_on(future::select(&mut next_msg, &mut deadline)); + if let future::MaybeDone::Done(msg) = next_msg { + msg + } else { + debug_assert!(if let future::MaybeDone::Done(_) = deadline { true } else { false }); + continue + } + }; + + // Update internal state based on received message. + match next_message { + Some(WorkerToApi::Response { id, status_code, headers, body }) => + match self.requests.remove(&id) { + Some(HttpApiRequest::Dispatched(sending_body)) => { + self.requests.insert(id, HttpApiRequest::Response(HttpApiRequestRp { + sending_body, + status_code, + headers, + body: body.fuse(), + current_read_chunk: None, + })); + } + None => {} // can happen if we detected an IO error when sending the body + _ => error!("State mismatch between the API and worker"), + } + + Some(WorkerToApi::Fail { id, error }) => + match self.requests.remove(&id) { + Some(HttpApiRequest::Dispatched(_)) => { + self.requests.insert(id, HttpApiRequest::Fail(error)); + } + None => {} // can happen if we detected an IO error when sending the body + _ => error!("State mismatch between the API and worker"), + } + + None => { + error!("Worker has crashed"); + return ids.iter().map(|_| HttpRequestStatus::IoError).collect() + } + } + + } + } + + /// Mimicks the corresponding method in the offchain API. + pub fn response_headers( + &mut self, + request_id: HttpRequestId + ) -> Vec<(Vec, Vec)> { + // Do an implicit non-blocking wait on the request. + let _ = self.response_wait(&[request_id], Some(timestamp::now())); + + let headers = match self.requests.get(&request_id) { + Some(HttpApiRequest::Response(HttpApiRequestRp { headers, .. })) => headers, + _ => return Vec::new() + }; + + headers + .iter() + .map(|(name, value)| (name.as_str().as_bytes().to_owned(), value.as_bytes().to_owned())) + .collect() + } + + /// Mimicks the corresponding method in the offchain API. + pub fn response_read_body( + &mut self, + request_id: HttpRequestId, + buffer: &mut [u8], + deadline: Option + ) -> Result { + // Do an implicit non-blocking wait on the request. + let _ = self.response_wait(&[request_id], deadline); + + // Remove the request from the list and handle situations where the request is invalid or + // in the wrong state. + let mut response = match self.requests.remove(&request_id) { + Some(HttpApiRequest::Response(r)) => r, + // Because we called `response_wait` above, we know that the deadline has been reached + // and we still haven't received a response. + Some(HttpApiRequest::Dispatched(_)) => return Err(HttpError::DeadlineReached), + // The request has failed. + Some(HttpApiRequest::Fail { .. }) => + return Err(HttpError::IoError), + // Request hasn't been dispatched yet; reading the body is invalid. + Some(rq) => { + self.requests.insert(request_id, rq); + return Err(HttpError::Invalid) + } + None => return Err(HttpError::Invalid) + }; + + // Convert the deadline into a `Future` that resolves when the deadline is reached. + let mut deadline = future::maybe_done(match deadline { + Some(deadline) => future::Either::Left( + futures_timer::Delay::new(timestamp::timestamp_from_now(deadline)) + ), + None => future::Either::Right(future::pending()) + }); + + loop { + // First read from `current_read_chunk`. + if let Some(mut current_read_chunk) = response.current_read_chunk.take() { + match current_read_chunk.read(buffer) { + Ok(0) => {} + Ok(n) => { + self.requests.insert(request_id, HttpApiRequest::Response(HttpApiRequestRp { + current_read_chunk: Some(current_read_chunk), + .. response + })); + return Ok(n) + }, + Err(err) => { + // This code should never be reached unless there's a logic error somewhere. + error!("Failed to read from current read chunk: {:?}", err); + return Err(HttpError::IoError) + } + } + } + + // If we reach here, that means the `current_read_chunk` is empty and needs to be + // filled with a new chunk from `body`. We block on either the next body or the + // deadline. + let mut next_body = future::maybe_done(response.body.next()); + futures::executor::block_on(future::select(&mut next_body, &mut deadline)); + + if let future::MaybeDone::Done(next_body) = next_body { + match next_body { + Some(Ok(chunk)) => response.current_read_chunk = Some(chunk.reader()), + Some(Err(_)) => return Err(HttpError::IoError), + None => return Ok(0), // eof + } + } + + if let future::MaybeDone::Done(_) = deadline { + self.requests.insert(request_id, HttpApiRequest::Response(response)); + return Err(HttpError::DeadlineReached) + } + } + } +} + +impl fmt::Debug for HttpApi { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_list() + .entries(self.requests.iter()) + .finish() + } +} + +impl fmt::Debug for HttpApiRequest { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + HttpApiRequest::NotDispatched(_, _) => + f.debug_tuple("HttpApiRequest::NotDispatched").finish(), + HttpApiRequest::Dispatched(_) => + f.debug_tuple("HttpApiRequest::Dispatched").finish(), + HttpApiRequest::Response(HttpApiRequestRp { status_code, headers, .. }) => + f.debug_tuple("HttpApiRequest::Response").field(status_code).field(headers).finish(), + HttpApiRequest::Fail(err) => + f.debug_tuple("HttpApiRequest::Fail").field(err).finish(), + } + } +} + +/// Message send from the API to the worker. +enum ApiToWorker { + /// Dispatches a new HTTP request. + Dispatch { + /// ID to send back when the response comes back. + id: HttpRequestId, + /// Request to start executing. + request: hyper::Request, + } +} + +/// Message send from the API to the worker. +enum WorkerToApi { + /// A request has succeeded. + Response { + /// The ID that was passed to the worker. + id: HttpRequestId, + /// Status code of the response. + status_code: hyper::StatusCode, + /// Headers of the response. + headers: hyper::HeaderMap, + /// Body of the response, as a channel of `Chunk` objects. + /// We send the body back through a channel instead of returning the hyper `Body` object + /// because we don't want the `HttpApi` to have to drive the reading. + /// Instead, reading an item from the channel will notify the worker task, which will push + /// the next item. + body: mpsc::Receiver>, + }, + /// A request has failed because of an error. + Fail { + /// The ID that was passed to the worker. + id: HttpRequestId, + /// Error that happened. + error: hyper::Error, + }, +} + +enum HyperClient { + Http(hyper::Client), + Https(hyper::Client, hyper::Body>), +} + +impl HyperClient { + /// Creates new hyper client. + /// + /// By default we will try to initialize the `HttpsConnector`, + /// if that's not possible we'll fall back to `HttpConnector`. + pub fn new() -> Self { + match hyper_tls::HttpsConnector::new(1) { + Ok(tls) => HyperClient::Https(hyper::Client::builder().build(tls)), + Err(e) => { + warn!("Unable to initialize TLS client. Falling back to HTTP-only: {:?}", e); + HyperClient::Http(hyper::Client::new()) + }, + } + } +} + +/// Must be continuously polled for the [`HttpApi`] to properly work. +pub struct HttpWorker { + /// Used to sends messages to the `HttpApi`. + to_api: mpsc::UnboundedSender, + /// Used to receive messages from the `HttpApi`. + from_api: mpsc::UnboundedReceiver, + /// The engine that runs HTTP requests. + http_client: HyperClient, + /// HTTP requests that are being worked on by the engine. + requests: Vec<(HttpRequestId, HttpWorkerRequest)>, +} + +/// HTTP request being processed by the worker. +enum HttpWorkerRequest { + /// Request has been dispatched and is waiting for a response. + Dispatched(Compat01As03), + /// Reading the body of the response and sending it to the channel. + ReadBody { + /// Body to read `Chunk`s from. + body: Compat01As03, + /// Where to send the chunks. + tx: mpsc::Sender>, + }, +} + +impl Future for HttpWorker { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + // Reminder: this is continuously run in the background. + + // We use a `me` variable because the compiler isn't smart enough to allow borrowing + // multiple fields at once through a `Deref`. + let me = &mut *self; + + // We remove each element from `requests` one by one and add them back only if necessary. + for n in (0..me.requests.len()).rev() { + let (id, request) = me.requests.swap_remove(n); + match request { + HttpWorkerRequest::Dispatched(mut future) => { + // Check for an HTTP response from the Internet. + let mut response = match Future::poll(Pin::new(&mut future), cx) { + Poll::Pending => { + me.requests.push((id, HttpWorkerRequest::Dispatched(future))); + continue + }, + Poll::Ready(Ok(response)) => response, + Poll::Ready(Err(err)) => { + let _ = me.to_api.unbounded_send(WorkerToApi::Fail { + id, + error: err, + }); + continue; // don't insert the request back + } + }; + + // We received a response! Decompose it into its parts. + let status_code = response.status(); + let headers = mem::replace(response.headers_mut(), hyper::HeaderMap::new()); + let body = Compat01As03::new(response.into_body()); + + let (body_tx, body_rx) = mpsc::channel(3); + let _ = me.to_api.unbounded_send(WorkerToApi::Response { + id, + status_code, + headers, + body: body_rx, + }); + + me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx: body_tx })); + cx.waker().wake_by_ref(); // wake up in order to poll the new future + continue + } + + HttpWorkerRequest::ReadBody { mut body, mut tx } => { + // Before reading from the HTTP response, check that `tx` is ready to accept + // a new chunk. + match tx.poll_ready(cx) { + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(_)) => continue, // don't insert the request back + Poll::Pending => { + me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx })); + continue + } + } + + match Stream::poll_next(Pin::new(&mut body), cx) { + Poll::Ready(Some(Ok(chunk))) => { + let _ = tx.start_send(Ok(chunk)); + me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx })); + cx.waker().wake_by_ref(); // notify in order to poll again + } + Poll::Ready(Some(Err(err))) => { + let _ = tx.start_send(Err(err)); + // don't insert the request back + }, + Poll::Ready(None) => {} // EOF; don't insert the request back + Poll::Pending => { + me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx })); + }, + } + } + } + } + + // Check for messages coming from the [`HttpApi`]. + match Stream::poll_next(Pin::new(&mut me.from_api), cx) { + Poll::Pending => {}, + Poll::Ready(None) => return Poll::Ready(()), // stops the worker + Poll::Ready(Some(ApiToWorker::Dispatch { id, request })) => { + let future = Compat01As03::new(match me.http_client { + HyperClient::Http(ref mut c) => c.request(request), + HyperClient::Https(ref mut c) => c.request(request), + }); + debug_assert!(me.requests.iter().all(|(i, _)| *i != id)); + me.requests.push((id, HttpWorkerRequest::Dispatched(future))); + cx.waker().wake_by_ref(); // reschedule the task to poll the request + } + } + + Poll::Pending + } +} + +impl fmt::Debug for HttpWorker { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_list() + .entries(self.requests.iter()) + .finish() + } +} + +impl fmt::Debug for HttpWorkerRequest { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + HttpWorkerRequest::Dispatched(_) => + f.debug_tuple("HttpWorkerRequest::Dispatched").finish(), + HttpWorkerRequest::ReadBody { .. } => + f.debug_tuple("HttpWorkerRequest::Response").finish(), + } + } +} + +#[cfg(test)] +mod tests { + use crate::api::timestamp; + use super::http; + use futures::prelude::*; + use futures01::Future as _; + use primitives::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration}; + + // Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP + // server that runs in the background as well. + macro_rules! build_api_server { + () => {{ + let (api, worker) = http(); + // Note: we have to use tokio because hyper still uses old futures. + std::thread::spawn(move || { + tokio::run(futures::compat::Compat::new(worker.map(|()| Ok::<(), ()>(())))) + }); + let (addr_tx, addr_rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let server = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap()) + .serve(|| { + hyper::service::service_fn_ok(move |_: hyper::Request| { + hyper::Response::new(hyper::Body::from("Hello World!")) + }) + }); + let _ = addr_tx.send(server.local_addr()); + hyper::rt::run(server.map_err(|e| panic!("{:?}", e))); + }); + (api, addr_rx.recv().unwrap()) + }}; + } + + #[test] + fn basic_localhost() { + let deadline = timestamp::now().add(Duration::from_millis(10_000)); + + // Performs an HTTP query to a background HTTP server. + + let (mut api, addr) = build_api_server!(); + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.request_write_body(id, &[], Some(deadline)).unwrap(); + + match api.response_wait(&[id], Some(deadline))[0] { + HttpRequestStatus::Finished(200) => {}, + v => panic!("Connecting to localhost failed: {:?}", v) + } + + let headers = api.response_headers(id); + assert!(headers.iter().any(|(h, _)| h.eq_ignore_ascii_case(b"Date"))); + + let mut buf = vec![0; 2048]; + let n = api.response_read_body(id, &mut buf, Some(deadline)).unwrap(); + assert_eq!(&buf[..n], b"Hello World!"); + } + + #[test] + fn request_start_invalid_call() { + let (mut api, addr) = build_api_server!(); + + match api.request_start("\0", &format!("http://{}", addr)) { + Err(()) => {} + Ok(_) => panic!() + }; + + match api.request_start("GET", "http://\0localhost") { + Err(()) => {} + Ok(_) => panic!() + }; + } + + #[test] + fn request_add_header_invalid_call() { + let (mut api, addr) = build_api_server!(); + + match api.request_add_header(HttpRequestId(0xdead), "Foo", "bar") { + Err(()) => {} + Ok(_) => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + match api.request_add_header(id, "\0", "bar") { + Err(()) => {} + Ok(_) => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + match api.request_add_header(id, "Foo", "\0") { + Err(()) => {} + Ok(_) => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.request_add_header(id, "Foo", "Bar").unwrap(); + api.request_write_body(id, &[1, 2, 3, 4], None).unwrap(); + match api.request_add_header(id, "Foo2", "Bar") { + Err(()) => {} + Ok(_) => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.response_headers(id); + match api.request_add_header(id, "Foo2", "Bar") { + Err(()) => {} + Ok(_) => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.response_read_body(id, &mut [], None).unwrap(); + match api.request_add_header(id, "Foo2", "Bar") { + Err(()) => {} + Ok(_) => panic!() + }; + } + + #[test] + fn request_write_body_invalid_call() { + let (mut api, addr) = build_api_server!(); + + match api.request_write_body(HttpRequestId(0xdead), &[1, 2, 3], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + + match api.request_write_body(HttpRequestId(0xdead), &[], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.request_write_body(id, &[1, 2, 3, 4], None).unwrap(); + api.request_write_body(id, &[1, 2, 3, 4], None).unwrap(); + api.request_write_body(id, &[], None).unwrap(); + match api.request_write_body(id, &[], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.request_write_body(id, &[1, 2, 3, 4], None).unwrap(); + api.request_write_body(id, &[1, 2, 3, 4], None).unwrap(); + api.request_write_body(id, &[], None).unwrap(); + match api.request_write_body(id, &[1, 2, 3, 4], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.request_write_body(id, &[1, 2, 3, 4], None).unwrap(); + api.response_wait(&[id], None); + match api.request_write_body(id, &[], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.request_write_body(id, &[1, 2, 3, 4], None).unwrap(); + api.response_wait(&[id], None); + match api.request_write_body(id, &[1, 2, 3, 4], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.response_headers(id); + match api.request_write_body(id, &[1, 2, 3, 4], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.response_headers(id); + match api.request_write_body(id, &[], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.response_read_body(id, &mut [], None).unwrap(); + match api.request_write_body(id, &[1, 2, 3, 4], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.response_read_body(id, &mut [], None).unwrap(); + match api.request_write_body(id, &[], None) { + Err(HttpError::Invalid) => {} + _ => panic!() + }; + } + + #[test] + fn response_headers_invalid_call() { + let (mut api, addr) = build_api_server!(); + assert!(api.response_headers(HttpRequestId(0xdead)).is_empty()); + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + assert!(api.response_headers(id).is_empty()); + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.request_write_body(id, &[], None).unwrap(); + while api.response_headers(id).is_empty() { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.response_wait(&[id], None); + assert!(!api.response_headers(id).is_empty()); + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + let mut buf = [0; 128]; + while api.response_read_body(id, &mut buf, None).unwrap() != 0 {} + assert!(api.response_headers(id).is_empty()); + } + + #[test] + fn response_header_invalid_call() { + let (mut api, addr) = build_api_server!(); + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + assert!(api.response_headers(id).is_empty()); + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.request_add_header(id, "Foo", "Bar").unwrap(); + assert!(api.response_headers(id).is_empty()); + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + api.request_add_header(id, "Foo", "Bar").unwrap(); + api.request_write_body(id, &[], None).unwrap(); + // Note: this test actually sends out the request, and is supposed to test a situation + // where we haven't received any response yet. This test can theoretically fail if the + // HTTP response comes back faster than the kernel schedules our thread, but that is highly + // unlikely. + assert!(api.response_headers(id).is_empty()); + } + + #[test] + fn response_read_body_invalid_call() { + let (mut api, addr) = build_api_server!(); + let mut buf = [0; 512]; + + match api.response_read_body(HttpRequestId(0xdead), &mut buf, None) { + Err(HttpError::Invalid) => {} + _ => panic!() + } + + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + while api.response_read_body(id, &mut buf, None).unwrap() != 0 {} + match api.response_read_body(id, &mut buf, None) { + Err(HttpError::Invalid) => {} + _ => panic!() + } + } + + #[test] + fn fuzzing() { + // Uses the API in random ways to try to trigger panicks. + // Doesn't test some paths, such as waiting for multiple requests. Also doesn't test what + // happens if the server force-closes our socket. + + let (mut api, addr) = build_api_server!(); + + for _ in 0..50 { + let id = api.request_start("GET", &format!("http://{}", addr)).unwrap(); + + for _ in 0..250 { + match rand::random::() % 6 { + 0 => { let _ = api.request_add_header(id, "Foo", "Bar"); } + 1 => { let _ = api.request_write_body(id, &[1, 2, 3, 4], None); } + 2 => { let _ = api.request_write_body(id, &[], None); } + 3 => { let _ = api.response_wait(&[id], None); } + 4 => { let _ = api.response_headers(id); } + 5 => { + let mut buf = [0; 512]; + let _ = api.response_read_body(id, &mut buf, None); + } + 6 ..= 255 => unreachable!() + } + } + } + } +} diff --git a/substrate/core/offchain/src/api/timestamp.rs b/substrate/core/offchain/src/api/timestamp.rs new file mode 100644 index 0000000000..f106ac7273 --- /dev/null +++ b/substrate/core/offchain/src/api/timestamp.rs @@ -0,0 +1,60 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Helper methods dedicated to timestamps. + +use primitives::offchain::Timestamp; +use std::convert::TryInto; +use std::time::{SystemTime, Duration}; + +/// Returns the current time as a `Timestamp`. +pub fn now() -> Timestamp { + let now = SystemTime::now(); + let epoch_duration = now.duration_since(SystemTime::UNIX_EPOCH); + match epoch_duration { + Err(_) => { + // Current time is earlier than UNIX_EPOCH. + Timestamp::from_unix_millis(0) + }, + Ok(d) => { + let duration = d.as_millis(); + // Assuming overflow won't happen for a few hundred years. + Timestamp::from_unix_millis(duration.try_into() + .expect("epoch milliseconds won't overflow u64 for hundreds of years; qed")) + } + } +} + +/// Returns how a `Timestamp` compares to "now". +/// +/// In other words, returns `timestamp - now()`. +pub fn timestamp_from_now(timestamp: Timestamp) -> Duration { + Duration::from_millis(timestamp.diff(&now()).millis()) +} + +/// Converts the deadline into a `Future` that resolves when the deadline is reached. +pub fn deadline_to_future( + deadline: Option, +) -> futures::future::MaybeDone { + use futures::future; + + future::maybe_done(match deadline { + Some(deadline) => future::Either::Left( + futures_timer::Delay::new(timestamp_from_now(deadline)) + ), + None => future::Either::Right(future::pending()) + }) +} diff --git a/substrate/core/offchain/src/testing.rs b/substrate/core/offchain/src/testing.rs index cdf2878c13..8724ca7546 100644 --- a/substrate/core/offchain/src/testing.rs +++ b/substrate/core/offchain/src/testing.rs @@ -248,7 +248,7 @@ impl offchain::Externalities for TestOffchainExt { ids.iter().map(|id| match state.requests.get(id) { Some(req) if req.response.is_empty() => RequestStatus::DeadlineReached, - None => RequestStatus::Unknown, + None => RequestStatus::Invalid, _ => RequestStatus::Finished(200), }).collect() } diff --git a/substrate/core/primitives/src/offchain.rs b/substrate/core/primitives/src/offchain.rs index 52dbf5fbee..8aa097cec0 100644 --- a/substrate/core/primitives/src/offchain.rs +++ b/substrate/core/primitives/src/offchain.rs @@ -61,7 +61,7 @@ impl From for u32 { /// Opaque type for offchain http requests. #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -#[cfg_attr(feature = "std", derive(Debug))] +#[cfg_attr(feature = "std", derive(Debug, Hash))] pub struct HttpRequestId(pub u16); impl From for u32 { @@ -79,6 +79,8 @@ pub enum HttpError { DeadlineReached = 1, /// There was an IO Error while processing the request. IoError = 2, + /// The ID of the request is invalid in this context. + Invalid = 3, } impl TryFrom for HttpError { @@ -88,6 +90,7 @@ impl TryFrom for HttpError { match error { e if e == HttpError::DeadlineReached as u8 as u32 => Ok(HttpError::DeadlineReached), e if e == HttpError::IoError as u8 as u32 => Ok(HttpError::IoError), + e if e == HttpError::Invalid as u8 as u32 => Ok(HttpError::Invalid), _ => Err(()) } } @@ -105,18 +108,17 @@ impl From for u32 { pub enum HttpRequestStatus { /// Deadline was reached while we waited for this request to finish. /// - /// Note the deadline is controlled by the calling part, it not necessarily means - /// that the request has timed out. + /// Note the deadline is controlled by the calling part, it not necessarily + /// means that the request has timed out. DeadlineReached, - /// Request timed out. + /// An error has occured during the request, for example a timeout or the + /// remote has closed our socket. /// - /// This means that the request couldn't be completed by the host environment - /// within a reasonable time (according to the host), has now been terminated - /// and is considered finished. - /// To retry the request you need to construct it again. - Timeout, - /// Request status of this ID is not known. - Unknown, + /// The request is now considered destroyed. To retry the request you need + /// to construct it again. + IoError, + /// The passed ID is invalid in this context. + Invalid, /// The request has finished with given status code. Finished(u16), } @@ -124,9 +126,9 @@ pub enum HttpRequestStatus { impl From for u32 { fn from(status: HttpRequestStatus) -> Self { match status { - HttpRequestStatus::Unknown => 0, + HttpRequestStatus::Invalid => 0, HttpRequestStatus::DeadlineReached => 10, - HttpRequestStatus::Timeout => 20, + HttpRequestStatus::IoError => 20, HttpRequestStatus::Finished(code) => u32::from(code), } } @@ -137,9 +139,9 @@ impl TryFrom for HttpRequestStatus { fn try_from(status: u32) -> Result { match status { - 0 => Ok(HttpRequestStatus::Unknown), + 0 => Ok(HttpRequestStatus::Invalid), 10 => Ok(HttpRequestStatus::DeadlineReached), - 20 => Ok(HttpRequestStatus::Timeout), + 20 => Ok(HttpRequestStatus::IoError), 100..=999 => u16::try_from(status).map(HttpRequestStatus::Finished).map_err(|_| ()), _ => Err(()), } @@ -291,6 +293,11 @@ pub trait Externalities { /// /// Meta is a future-reserved field containing additional, parity-scale-codec encoded parameters. /// Returns the id of newly started request. + /// + /// Returns an error if: + /// - No new request identifier could be allocated. + /// - The method or URI contain invalid characters. + /// fn http_request_start( &mut self, method: &str, @@ -299,6 +306,18 @@ pub trait Externalities { ) -> Result; /// Append header to the request. + /// + /// Calling this function multiple times with the same header name continues appending new + /// headers. In other words, headers are never replaced. + /// + /// Returns an error if: + /// - The request identifier is invalid. + /// - You have called `http_request_write_body` on that request. + /// - The name or value contain invalid characters. + /// + /// An error doesn't poison the request, and you can continue as if the call had never been + /// made. + /// fn http_request_add_header( &mut self, request_id: HttpRequestId, @@ -308,10 +327,19 @@ pub trait Externalities { /// Write a chunk of request body. /// - /// Writing an empty chunks finalises the request. + /// Calling this function with a non-empty slice may or may not start the + /// HTTP request. Calling this function with an empty chunks finalizes the + /// request and always starts it. It is no longer valid to write more data + /// afterwards. /// Passing `None` as deadline blocks forever. /// - /// Returns an error in case deadline is reached or the chunk couldn't be written. + /// Returns an error if: + /// - The request identifier is invalid. + /// - `http_response_wait` has already been called on this request. + /// - The deadline is reached. + /// - An I/O error has happened, for example the remote has closed our + /// request. The request is then considered invalid. + /// fn http_request_write_body( &mut self, request_id: HttpRequestId, @@ -325,6 +353,9 @@ pub trait Externalities { /// Note that if deadline is not provided the method will block indefinitely, /// otherwise unready responses will produce `DeadlineReached` status. /// + /// If a response returns an `IoError`, it is then considered destroyed. + /// Its id is then invalid. + /// /// Passing `None` as deadline blocks forever. fn http_response_wait( &mut self, @@ -335,6 +366,12 @@ pub trait Externalities { /// Read all response headers. /// /// Returns a vector of pairs `(HeaderKey, HeaderValue)`. + /// + /// Dispatches the request if it hasn't been done yet. It is no longer + /// valid to modify the headers or write data to the request. + /// + /// Returns an empty list if the identifier is unknown/invalid, hasn't + /// received a response, or has finished. fn http_response_headers( &mut self, request_id: HttpRequestId @@ -342,9 +379,23 @@ pub trait Externalities { /// Read a chunk of body response to given buffer. /// + /// Dispatches the request if it hasn't been done yet. It is no longer + /// valid to modify the headers or write data to the request. + /// /// Returns the number of bytes written or an error in case a deadline /// is reached or server closed the connection. /// Passing `None` as a deadline blocks forever. + /// + /// If `Ok(0)` or `Err(IoError)` is returned, the request is considered + /// destroyed. Doing another read or getting the response's headers, for + /// example, is then invalid. + /// + /// Returns an error if: + /// - The request identifier is invalid. + /// - The deadline is reached. + /// - An I/O error has happened, for example the remote has closed our + /// request. The request is then considered invalid. + /// fn http_response_read_body( &mut self, request_id: HttpRequestId, diff --git a/substrate/core/sr-io/src/offchain/http.rs b/substrate/core/sr-io/src/offchain/http.rs index 6685dd023f..7aab309f13 100644 --- a/substrate/core/sr-io/src/offchain/http.rs +++ b/substrate/core/sr-io/src/offchain/http.rs @@ -224,7 +224,7 @@ pub enum Error { /// Deadline has been reached. DeadlineReached, /// Request had timed out. - Timeout, + IoError, /// Unknown error has been ecountered. Unknown, } @@ -283,8 +283,8 @@ impl PendingRequest { .zip(requests.into_iter()) .map(|(status, req)| match status { RequestStatus::DeadlineReached => Err(req), - RequestStatus::Timeout => Ok(Err(Error::Timeout)), - RequestStatus::Unknown => Ok(Err(Error::Unknown)), + RequestStatus::IoError => Ok(Err(Error::IoError)), + RequestStatus::Invalid => Ok(Err(Error::Unknown)), RequestStatus::Finished(code) => Ok(Ok(Response::new(req.id, code))), }) .collect() diff --git a/substrate/core/sr-io/without_std.rs b/substrate/core/sr-io/without_std.rs index 954eccc9cf..c5f2ac483f 100644 --- a/substrate/core/sr-io/without_std.rs +++ b/substrate/core/sr-io/without_std.rs @@ -1163,7 +1163,7 @@ impl OffchainApi for () { statuses .into_iter() - .map(|status| status.try_into().unwrap_or(offchain::HttpRequestStatus::Unknown)) + .map(|status| status.try_into().unwrap_or(offchain::HttpRequestStatus::Invalid)) .collect() }