mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-19 22:51:03 +00:00
Fix some flaky offchain HTTP tests (#6038)
* http: Use assert_eq in tests for better debuggability * http: Use matches! macro instead of if let ... * http: Simplify some bits and pieces * http: Don't answer immediately in HTTP test server Sometimes it can happen that we receive the response immediately when testing the HTTP api due to kernel scheduling. Because of it, we add a marginal 10ms async-friendly delay to minimize the risk. * http: Use the same async runtime when testing HTTP API/worker * http: Return a future Delay only for non-zero Duration This allows to short-circuit in the response_wait logic and only send/not wait for response.
This commit is contained in:
@@ -31,7 +31,7 @@ use fnv::FnvHashMap;
|
|||||||
use futures::{prelude::*, future, channel::mpsc};
|
use futures::{prelude::*, future, channel::mpsc};
|
||||||
use log::error;
|
use log::error;
|
||||||
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
|
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
|
||||||
use std::{fmt, io::Read as _, mem, pin::Pin, task::Context, task::Poll};
|
use std::{convert::TryFrom, fmt, io::Read as _, pin::Pin, task::{Context, Poll}};
|
||||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||||
|
|
||||||
/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
|
/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
|
||||||
@@ -151,8 +151,8 @@ impl HttpApi {
|
|||||||
_ => return Err(())
|
_ => return Err(())
|
||||||
};
|
};
|
||||||
|
|
||||||
let name = hyper::header::HeaderName::from_bytes(name.as_bytes()).map_err(|_| ())?;
|
let name = hyper::header::HeaderName::try_from(name).map_err(drop)?;
|
||||||
let value = hyper::header::HeaderValue::from_str(value).map_err(|_| ())?;
|
let value = hyper::header::HeaderValue::try_from(value).map_err(drop)?;
|
||||||
// Note that we're always appending headers and never replacing old values.
|
// Note that we're always appending headers and never replacing old values.
|
||||||
// We assume here that the user knows what they're doing.
|
// We assume here that the user knows what they're doing.
|
||||||
request.headers_mut().append(name, value);
|
request.headers_mut().append(name, value);
|
||||||
@@ -185,7 +185,7 @@ impl HttpApi {
|
|||||||
future::MaybeDone::Done(Err(_)) => return Err(HttpError::IoError),
|
future::MaybeDone::Done(Err(_)) => return Err(HttpError::IoError),
|
||||||
future::MaybeDone::Future(_) |
|
future::MaybeDone::Future(_) |
|
||||||
future::MaybeDone::Gone => {
|
future::MaybeDone::Gone => {
|
||||||
debug_assert!(if let future::MaybeDone::Done(_) = deadline { true } else { false });
|
debug_assert!(matches!(deadline, future::MaybeDone::Done(..)));
|
||||||
return Err(HttpError::DeadlineReached)
|
return Err(HttpError::DeadlineReached)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -347,7 +347,7 @@ impl HttpApi {
|
|||||||
if let future::MaybeDone::Done(msg) = next_msg {
|
if let future::MaybeDone::Done(msg) = next_msg {
|
||||||
msg
|
msg
|
||||||
} else {
|
} else {
|
||||||
debug_assert!(if let future::MaybeDone::Done(_) = deadline { true } else { false });
|
debug_assert!(matches!(deadline, future::MaybeDone::Done(..)));
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -585,25 +585,21 @@ impl Future for HttpWorker {
|
|||||||
match request {
|
match request {
|
||||||
HttpWorkerRequest::Dispatched(mut future) => {
|
HttpWorkerRequest::Dispatched(mut future) => {
|
||||||
// Check for an HTTP response from the Internet.
|
// Check for an HTTP response from the Internet.
|
||||||
let mut response = match Future::poll(Pin::new(&mut future), cx) {
|
let response = match Future::poll(Pin::new(&mut future), cx) {
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
me.requests.push((id, HttpWorkerRequest::Dispatched(future)));
|
me.requests.push((id, HttpWorkerRequest::Dispatched(future)));
|
||||||
continue
|
continue
|
||||||
},
|
},
|
||||||
Poll::Ready(Ok(response)) => response,
|
Poll::Ready(Ok(response)) => response,
|
||||||
Poll::Ready(Err(err)) => {
|
Poll::Ready(Err(error)) => {
|
||||||
let _ = me.to_api.unbounded_send(WorkerToApi::Fail {
|
let _ = me.to_api.unbounded_send(WorkerToApi::Fail { id, error });
|
||||||
id,
|
|
||||||
error: err,
|
|
||||||
});
|
|
||||||
continue; // don't insert the request back
|
continue; // don't insert the request back
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// We received a response! Decompose it into its parts.
|
// We received a response! Decompose it into its parts.
|
||||||
let status_code = response.status();
|
let (head, body) = response.into_parts();
|
||||||
let headers = mem::replace(response.headers_mut(), hyper::HeaderMap::new());
|
let (status_code, headers) = (head.status, head.headers);
|
||||||
let body = response.into_body();
|
|
||||||
|
|
||||||
let (body_tx, body_rx) = mpsc::channel(3);
|
let (body_tx, body_rx) = mpsc::channel(3);
|
||||||
let _ = me.to_api.unbounded_send(WorkerToApi::Response {
|
let _ = me.to_api.unbounded_send(WorkerToApi::Response {
|
||||||
@@ -691,15 +687,12 @@ mod tests {
|
|||||||
use crate::api::timestamp;
|
use crate::api::timestamp;
|
||||||
use super::http;
|
use super::http;
|
||||||
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration};
|
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration};
|
||||||
|
use futures::future;
|
||||||
|
|
||||||
// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
|
// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
|
||||||
// server that runs in the background as well.
|
// server that runs in the background as well.
|
||||||
macro_rules! build_api_server {
|
macro_rules! build_api_server {
|
||||||
() => {{
|
() => {{
|
||||||
fn tokio_run<T>(future: impl std::future::Future<Output = T>) {
|
|
||||||
let _ = tokio::runtime::Runtime::new().unwrap().block_on(future);
|
|
||||||
}
|
|
||||||
|
|
||||||
// We spawn quite a bit of HTTP servers here due to how async API
|
// We spawn quite a bit of HTTP servers here due to how async API
|
||||||
// works for offchain workers, so be sure to raise the FD limit
|
// works for offchain workers, so be sure to raise the FD limit
|
||||||
// (particularly useful for macOS where the default soft limit may
|
// (particularly useful for macOS where the default soft limit may
|
||||||
@@ -707,11 +700,12 @@ mod tests {
|
|||||||
fdlimit::raise_fd_limit();
|
fdlimit::raise_fd_limit();
|
||||||
|
|
||||||
let (api, worker) = http();
|
let (api, worker) = http();
|
||||||
std::thread::spawn(move || tokio_run(worker));
|
|
||||||
|
|
||||||
let (addr_tx, addr_rx) = std::sync::mpsc::channel();
|
let (addr_tx, addr_rx) = std::sync::mpsc::channel();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
tokio_run(async move {
|
let mut rt = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
let worker = rt.spawn(worker);
|
||||||
|
let server = rt.spawn(async move {
|
||||||
let server = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap())
|
let server = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap())
|
||||||
.serve(hyper::service::make_service_fn(|_| { async move {
|
.serve(hyper::service::make_service_fn(|_| { async move {
|
||||||
Ok::<_, Infallible>(hyper::service::service_fn(move |_req| async move {
|
Ok::<_, Infallible>(hyper::service::service_fn(move |_req| async move {
|
||||||
@@ -721,8 +715,9 @@ mod tests {
|
|||||||
}))
|
}))
|
||||||
}}));
|
}}));
|
||||||
let _ = addr_tx.send(server.local_addr());
|
let _ = addr_tx.send(server.local_addr());
|
||||||
server.await
|
server.await.map_err(drop)
|
||||||
});
|
});
|
||||||
|
let _ = rt.block_on(future::join(worker, server));
|
||||||
});
|
});
|
||||||
(api, addr_rx.recv().unwrap())
|
(api, addr_rx.recv().unwrap())
|
||||||
}};
|
}};
|
||||||
@@ -891,10 +886,10 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn response_headers_invalid_call() {
|
fn response_headers_invalid_call() {
|
||||||
let (mut api, addr) = build_api_server!();
|
let (mut api, addr) = build_api_server!();
|
||||||
assert!(api.response_headers(HttpRequestId(0xdead)).is_empty());
|
assert_eq!(api.response_headers(HttpRequestId(0xdead)), &[]);
|
||||||
|
|
||||||
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
|
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
|
||||||
assert!(api.response_headers(id).is_empty());
|
assert_eq!(api.response_headers(id), &[]);
|
||||||
|
|
||||||
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
|
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
|
||||||
api.request_write_body(id, &[], None).unwrap();
|
api.request_write_body(id, &[], None).unwrap();
|
||||||
@@ -904,12 +899,12 @@ mod tests {
|
|||||||
|
|
||||||
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
|
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
|
||||||
api.response_wait(&[id], None);
|
api.response_wait(&[id], None);
|
||||||
assert!(!api.response_headers(id).is_empty());
|
assert_ne!(api.response_headers(id), &[]);
|
||||||
|
|
||||||
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
|
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
|
||||||
let mut buf = [0; 128];
|
let mut buf = [0; 128];
|
||||||
while api.response_read_body(id, &mut buf, None).unwrap() != 0 {}
|
while api.response_read_body(id, &mut buf, None).unwrap() != 0 {}
|
||||||
assert!(api.response_headers(id).is_empty());
|
assert_eq!(api.response_headers(id), &[]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -917,11 +912,11 @@ mod tests {
|
|||||||
let (mut api, addr) = build_api_server!();
|
let (mut api, addr) = build_api_server!();
|
||||||
|
|
||||||
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
|
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
|
||||||
assert!(api.response_headers(id).is_empty());
|
assert_eq!(api.response_headers(id), &[]);
|
||||||
|
|
||||||
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
|
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
|
||||||
api.request_add_header(id, "Foo", "Bar").unwrap();
|
api.request_add_header(id, "Foo", "Bar").unwrap();
|
||||||
assert!(api.response_headers(id).is_empty());
|
assert_eq!(api.response_headers(id), &[]);
|
||||||
|
|
||||||
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
|
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
|
||||||
api.request_add_header(id, "Foo", "Bar").unwrap();
|
api.request_add_header(id, "Foo", "Bar").unwrap();
|
||||||
@@ -930,7 +925,7 @@ mod tests {
|
|||||||
// where we haven't received any response yet. This test can theoretically fail if the
|
// where we haven't received any response yet. This test can theoretically fail if the
|
||||||
// HTTP response comes back faster than the kernel schedules our thread, but that is highly
|
// HTTP response comes back faster than the kernel schedules our thread, but that is highly
|
||||||
// unlikely.
|
// unlikely.
|
||||||
assert!(api.response_headers(id).is_empty());
|
assert_eq!(api.response_headers(id), &[]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -51,12 +51,14 @@ pub fn timestamp_from_now(timestamp: Timestamp) -> Duration {
|
|||||||
pub fn deadline_to_future(
|
pub fn deadline_to_future(
|
||||||
deadline: Option<Timestamp>,
|
deadline: Option<Timestamp>,
|
||||||
) -> futures::future::MaybeDone<impl futures::Future> {
|
) -> futures::future::MaybeDone<impl futures::Future> {
|
||||||
use futures::future;
|
use futures::future::{self, Either};
|
||||||
|
|
||||||
future::maybe_done(match deadline {
|
future::maybe_done(match deadline.map(timestamp_from_now) {
|
||||||
Some(deadline) => future::Either::Left(
|
None => Either::Left(future::pending()),
|
||||||
futures_timer::Delay::new(timestamp_from_now(deadline))
|
// Only apply delay if we need to wait a non-zero duration
|
||||||
),
|
Some(duration) if duration <= Duration::from_secs(0) =>
|
||||||
None => future::Either::Right(future::pending())
|
Either::Right(Either::Left(future::ready(()))),
|
||||||
|
Some(duration) =>
|
||||||
|
Either::Right(Either::Right(futures_timer::Delay::new(duration))),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user