Clean up tests from runtime.block_on() and moving around Runtime and Handle (#12941)

This commit is contained in:
Dmitry Markin
2022-12-16 16:29:39 +03:00
committed by GitHub
parent a5cab922a5
commit e14d6a5c4e
13 changed files with 996 additions and 1148 deletions
@@ -483,20 +483,15 @@ mod tests {
use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen};
use futures::{channel::oneshot, prelude::*};
use libp2p::core::upgrade;
use tokio::{
net::{TcpListener, TcpStream},
runtime::Runtime,
};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::TokioAsyncReadCompatExt;
#[test]
fn basic_works() {
#[tokio::test]
async fn basic_works() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket.compat(),
@@ -510,38 +505,34 @@ mod tests {
substream.send(b"test message".to_vec()).await.unwrap();
});
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
assert_eq!(handshake, b"initial message");
substream.send_handshake(&b"hello world"[..]);
assert_eq!(handshake, b"initial message");
substream.send_handshake(&b"hello world"[..]);
let msg = substream.next().await.unwrap().unwrap();
assert_eq!(msg.as_ref(), b"test message");
});
let msg = substream.next().await.unwrap().unwrap();
assert_eq!(msg.as_ref(), b"test message");
runtime.block_on(client).unwrap();
client.await.unwrap();
}
#[test]
fn empty_handshake() {
#[tokio::test]
async fn empty_handshake() {
// Check that everything still works when the handshake messages are empty.
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket.compat(),
@@ -555,36 +546,32 @@ mod tests {
substream.send(Default::default()).await.unwrap();
});
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
assert!(handshake.is_empty());
substream.send_handshake(vec![]);
assert!(handshake.is_empty());
substream.send_handshake(vec![]);
let msg = substream.next().await.unwrap().unwrap();
assert!(msg.as_ref().is_empty());
});
let msg = substream.next().await.unwrap().unwrap();
assert!(msg.as_ref().is_empty());
runtime.block_on(client).unwrap();
client.await.unwrap();
}
#[test]
fn refused() {
#[tokio::test]
async fn refused() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let outcome = upgrade::apply_outbound(
socket.compat(),
@@ -599,35 +586,31 @@ mod tests {
assert!(outcome.is_err());
});
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
assert_eq!(handshake, b"hello");
assert_eq!(handshake, b"hello");
// We successfully upgrade to the protocol, but then close the substream.
drop(substream);
});
// We successfully upgrade to the protocol, but then close the substream.
drop(substream);
runtime.block_on(client).unwrap();
client.await.unwrap();
}
#[test]
fn large_initial_message_refused() {
#[tokio::test]
async fn large_initial_message_refused() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let ret = upgrade::apply_outbound(
socket.compat(),
@@ -644,30 +627,26 @@ mod tests {
assert!(ret.is_err());
});
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let ret = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await;
assert!(ret.is_err());
});
let (socket, _) = listener.accept().await.unwrap();
let ret = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await;
assert!(ret.is_err());
runtime.block_on(client).unwrap();
client.await.unwrap();
}
#[test]
fn large_handshake_refused() {
#[tokio::test]
async fn large_handshake_refused() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let ret = upgrade::apply_outbound(
socket.compat(),
@@ -678,24 +657,22 @@ mod tests {
assert!(ret.is_err());
});
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
assert_eq!(handshake, b"initial message");
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
assert_eq!(handshake, b"initial message");
// We check that a handshake that is too large gets refused.
substream.send_handshake((0..32768).map(|_| 0).collect::<Vec<_>>());
let _ = substream.next().await;
});
// We check that a handshake that is too large gets refused.
substream.send_handshake((0..32768).map(|_| 0).collect::<Vec<_>>());
let _ = substream.next().await;
runtime.block_on(client).unwrap();
client.await.unwrap();
}
}