Upgrade tokio to 1.22.0 and replace async-std with tokio (#12646)

* Replace deprecated libp2p feature specs with correct ones

* Bump tokio to 1.21.2

* Replace async-std libp2p primitives with tokio ones

* minor: rustfmt

* Fix TestNet to run initialization in the tokio context

* Convert telemetry test from async-std to tokio

* Convert notifications tests from async-std to tokio

* Convert chain sync tests from async-std to tokio

* Ditch async-std completely

* Make executor mandatory

* Bump tokio to 1.22.0

* minor: rustfmt

* Explicitly use tokio runtime in tests

* Move more tests to explicit tokio runtime

* Explicitly set multithreaded runtime in tokio test

* minor: rustfmt

* minor: fix comment

* Replace async-std with tokio in MMR tests
This commit is contained in:
Dmitry Markin
2022-12-05 11:18:46 +03:00
committed by GitHub
parent 1943e25cb9
commit 5eb84f9cc6
60 changed files with 747 additions and 634 deletions
@@ -481,20 +481,25 @@ pub enum NotificationsOutError {
#[cfg(test)]
mod tests {
use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen};
use async_std::net::{TcpListener, TcpStream};
use futures::{channel::oneshot, prelude::*};
use libp2p::core::upgrade;
use tokio::{
net::{TcpListener, TcpStream},
runtime::Runtime,
};
use tokio_util::compat::TokioAsyncReadCompatExt;
#[test]
fn basic_works() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket,
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1,
)
@@ -505,13 +510,13 @@ mod tests {
substream.send(b"test message".to_vec()).await.unwrap();
});
async_std::task::block_on(async move {
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket,
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
@@ -524,7 +529,7 @@ mod tests {
assert_eq!(msg.as_ref(), b"test message");
});
async_std::task::block_on(client);
runtime.block_on(client).unwrap();
}
#[test]
@@ -534,10 +539,12 @@ mod tests {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket,
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), vec![], 1024 * 1024),
upgrade::Version::V1,
)
@@ -548,13 +555,13 @@ mod tests {
substream.send(Default::default()).await.unwrap();
});
async_std::task::block_on(async move {
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket,
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
@@ -567,7 +574,7 @@ mod tests {
assert!(msg.as_ref().is_empty());
});
async_std::task::block_on(client);
runtime.block_on(client).unwrap();
}
#[test]
@@ -575,10 +582,12 @@ mod tests {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let outcome = upgrade::apply_outbound(
socket,
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"hello"[..], 1024 * 1024),
upgrade::Version::V1,
)
@@ -590,13 +599,13 @@ mod tests {
assert!(outcome.is_err());
});
async_std::task::block_on(async move {
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, substream, .. } = upgrade::apply_inbound(
socket,
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
@@ -608,7 +617,7 @@ mod tests {
drop(substream);
});
async_std::task::block_on(client);
runtime.block_on(client).unwrap();
}
#[test]
@@ -616,10 +625,12 @@ mod tests {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let ret = upgrade::apply_outbound(
socket,
socket.compat(),
// We check that an initial message that is too large gets refused.
NotificationsOut::new(
PROTO_NAME,
@@ -633,20 +644,20 @@ mod tests {
assert!(ret.is_err());
});
async_std::task::block_on(async move {
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let ret = upgrade::apply_inbound(
socket,
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await;
assert!(ret.is_err());
});
async_std::task::block_on(client);
runtime.block_on(client).unwrap();
}
#[test]
@@ -654,10 +665,12 @@ mod tests {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
let runtime = Runtime::new().unwrap();
let client = runtime.spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let ret = upgrade::apply_outbound(
socket,
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1,
)
@@ -665,13 +678,13 @@ mod tests {
assert!(ret.is_err());
});
async_std::task::block_on(async move {
runtime.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket,
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
@@ -683,6 +696,6 @@ mod tests {
let _ = substream.next().await;
});
async_std::task::block_on(client);
runtime.block_on(client).unwrap();
}
}