chore: update libp2p to 0.52.1 (#14429)

* update libp2p to 0.52.0

* proto name now must implement `AsRef<str>`

* update libp2p version everywhere

* ToSwarm, FromBehaviour, ToBehaviour

also LocalProtocolsChange and RemoteProtocolsChange

* new NetworkBehaviour invariants

* replace `Vec<u8>` with `StreamProtocol`

* rename ConnectionHandlerEvent::Custom to NotifyBehaviour

* remove DialError & ListenError invariants

also fix pending_events

* use connection_limits::Behaviour

See https://github.com/libp2p/rust-libp2p/pull/3885

* impl `void::Void` for `BehaviourOut`

also use `Behaviour::with_codec`

* KademliaHandler no longer public

* fix StreamProtocol construction

* update libp2p-identify to 0.2.0

* remove non-existing methods from PollParameters

rename ConnectionHandlerUpgrErr to StreamUpgradeError

* `P2p` now contains `PeerId`, not `Multihash`

* use multihash-codetable crate

* update Cargo.lock

* reformat text

* comment out tests for now

* remove `.into()` from P2p

* confirm observed addr manually

See https://github.com/libp2p/rust-libp2p/blob/master/protocols/identify/CHANGELOG.md#0430

* remove SwarmEvent::Banned

since we're not using `ban_peer_id`, this can be safely removed.
we may want to introduce `libp2p::allow_block_list` module in the future.

* fix imports

* replace `libp2p` with smaller deps in network-gossip

* bring back tests

* finish rewriting tests

* uncomment handler tests

* Revert "uncomment handler tests"

This reverts commit 720a06815887f4e10767c62b58864a7ec3a48e50.

* add a fixme

* update Cargo.lock

* remove extra From

* make void uninhabited

* fix discovery test

* use autonat protocols

confirming external addresses manually is unsafe in open networks

* fix SyncNotificationsClogged invariant

* only set server mode manually in tests

doubt that we need to set it on node since we're adding public addresses

* address @dmitry-markin comments

* remove autonat

* removed unused var

* fix EOL

* update smallvec and sha2

in attempt to compile polkadot

* bump k256

in attempt to build cumulus

---------

Co-authored-by: parity-processbot <>
This commit is contained in:
Anton
2023-07-25 15:12:24 +04:00
committed by GitHub
parent ae018a01a4
commit 59d8b86450
44 changed files with 1308 additions and 2198 deletions
@@ -999,7 +999,7 @@ impl Notifications {
impl NetworkBehaviour for Notifications {
type ConnectionHandler = NotifsHandler;
type OutEvent = NotificationsOut;
type ToSwarm = NotificationsOut;
fn handle_pending_inbound_connection(
&mut self,
@@ -1468,10 +1468,11 @@ impl NetworkBehaviour for Notifications {
FromSwarm::ListenerClosed(_) => {},
FromSwarm::ListenFailure(_) => {},
FromSwarm::ListenerError(_) => {},
FromSwarm::ExpiredExternalAddr(_) => {},
FromSwarm::ExternalAddrExpired(_) => {},
FromSwarm::NewListener(_) => {},
FromSwarm::ExpiredListenAddr(_) => {},
FromSwarm::NewExternalAddr(_) => {},
FromSwarm::NewExternalAddrCandidate(_) => {},
FromSwarm::ExternalAddrConfirmed(_) => {},
FromSwarm::AddressChange(_) => {},
FromSwarm::NewListenAddr(_) => {},
}
@@ -2008,7 +2009,7 @@ impl NetworkBehaviour for Notifications {
&mut self,
cx: &mut Context,
_params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event)
}
@@ -2108,7 +2109,6 @@ impl NetworkBehaviour for Notifications {
mod tests {
use super::*;
use crate::{peerset::IncomingIndex, protocol::notifications::handler::tests::*};
use libp2p::swarm::AddressRecord;
use std::{collections::HashSet, iter};
impl PartialEq for ConnectionState {
@@ -2127,31 +2127,14 @@ mod tests {
}
#[derive(Clone)]
struct MockPollParams {
peer_id: PeerId,
addr: Multiaddr,
}
struct MockPollParams {}
impl PollParameters for MockPollParams {
type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
type ListenedAddressesIter = std::vec::IntoIter<Multiaddr>;
type ExternalAddressesIter = std::vec::IntoIter<AddressRecord>;
fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
vec![].into_iter()
}
fn listened_addresses(&self) -> Self::ListenedAddressesIter {
vec![self.addr.clone()].into_iter()
}
fn external_addresses(&self) -> Self::ExternalAddressesIter {
vec![].into_iter()
}
fn local_peer_id(&self) -> &PeerId {
&self.peer_id
}
}
fn development_notifs() -> (Notifications, crate::peerset::PeersetHandle) {
@@ -3015,7 +2998,7 @@ mod tests {
notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
peer_id: Some(peer),
error: &libp2p::swarm::DialError::Banned,
error: &libp2p::swarm::DialError::Aborted,
connection_id: ConnectionId::new_unchecked(1337),
}));
@@ -3552,7 +3535,7 @@ mod tests {
let now = Instant::now();
notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure {
peer_id: Some(peer),
error: &libp2p::swarm::DialError::Banned,
error: &libp2p::swarm::DialError::Aborted,
connection_id: ConnectionId::new_unchecked(0),
}));
@@ -3672,7 +3655,7 @@ mod tests {
assert!(notif.peers.get(&(peer, set_id)).is_some());
if tokio::time::timeout(Duration::from_secs(5), async {
let mut params = MockPollParams { peer_id: PeerId::random(), addr: Multiaddr::empty() };
let mut params = MockPollParams {};
loop {
futures::future::poll_fn(|cx| {
@@ -3781,7 +3764,7 @@ mod tests {
// verify that the code continues to keep the peer disabled by resetting the timer
// after the first one expired.
if tokio::time::timeout(Duration::from_secs(5), async {
let mut params = MockPollParams { peer_id: PeerId::random(), addr: Multiaddr::empty() };
let mut params = MockPollParams {};
loop {
futures::future::poll_fn(|cx| {
File diff suppressed because it is too large Load Diff
@@ -151,7 +151,7 @@ impl std::ops::DerefMut for CustomProtoWithAddr {
impl NetworkBehaviour for CustomProtoWithAddr {
type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
type OutEvent = <Notifications as NetworkBehaviour>::OutEvent;
type ToSwarm = <Notifications as NetworkBehaviour>::ToSwarm;
fn handle_pending_inbound_connection(
&mut self,
@@ -229,7 +229,7 @@ impl NetworkBehaviour for CustomProtoWithAddr {
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
self.inner.poll(cx, params)
}
}
@@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use futures::prelude::*;
use libp2p::core::upgrade::{InboundUpgrade, ProtocolName, UpgradeInfo};
use libp2p::core::upgrade::{InboundUpgrade, UpgradeInfo};
use std::{
iter::FromIterator,
pin::Pin,
@@ -76,9 +76,9 @@ where
#[derive(Debug, Clone, PartialEq)]
pub struct ProtoNameWithUsize<T>(T, usize);
impl<T: ProtocolName> ProtocolName for ProtoNameWithUsize<T> {
fn protocol_name(&self) -> &[u8] {
self.0.protocol_name()
impl<T: AsRef<str>> AsRef<str> for ProtoNameWithUsize<T> {
fn as_ref(&self) -> &str {
self.0.as_ref()
}
}
@@ -104,13 +104,13 @@ impl<T: Future<Output = Result<O, E>>, O, E> Future for FutWithUsize<T> {
mod tests {
use super::*;
use crate::types::ProtocolName as ProtoName;
use libp2p::core::upgrade::{ProtocolName, UpgradeInfo};
use libp2p::core::upgrade::UpgradeInfo;
// TODO: move to mocks
mockall::mock! {
pub ProtocolUpgrade<T> {}
impl<T: Clone + ProtocolName> UpgradeInfo for ProtocolUpgrade<T> {
impl<T: Clone + AsRef<str>> UpgradeInfo for ProtocolUpgrade<T> {
type Info = T;
type InfoIter = vec::IntoIter<T>;
fn protocol_info(&self) -> vec::IntoIter<T>;
@@ -114,13 +114,6 @@ pub struct NotificationsOutSubstream<TSubstream> {
socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
}
#[cfg(test)]
impl<TSubstream> NotificationsOutSubstream<TSubstream> {
pub fn new(socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>) -> Self {
Self { socket }
}
}
impl NotificationsIn {
/// Builds a new potential upgrade.
pub fn new(
@@ -203,13 +196,13 @@ impl<TSubstream> NotificationsInSubstream<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
#[cfg(test)]
pub fn new(
socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
handshake: NotificationsInSubstreamHandshake,
) -> Self {
Self { socket, handshake }
}
// #[cfg(test)]
// pub fn new(
// socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
// handshake: NotificationsInSubstreamHandshake,
// ) -> Self {
// Self { socket, handshake }
// }
/// Sends the handshake in order to inform the remote that we accept the substream.
pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
@@ -498,41 +491,92 @@ pub enum NotificationsOutError {
#[cfg(test)]
mod tests {
use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen};
use futures::{channel::oneshot, prelude::*};
use libp2p::core::upgrade;
use super::*;
use futures::channel::oneshot;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::TokioAsyncReadCompatExt;
/// Opens a substream to the given address, negotiates the protocol, and returns the substream
/// along with the handshake message.
async fn dial(
addr: std::net::SocketAddr,
handshake: impl Into<Vec<u8>>,
) -> Result<
(
Vec<u8>,
NotificationsOutSubstream<
multistream_select::Negotiated<tokio_util::compat::Compat<TcpStream>>,
>,
),
NotificationsHandshakeError,
> {
let socket = TcpStream::connect(addr).await.unwrap();
let notifs_out = NotificationsOut::new("/test/proto/1", Vec::new(), handshake, 1024 * 1024);
let (_, substream) = multistream_select::dialer_select_proto(
socket.compat(),
notifs_out.protocol_info().into_iter(),
upgrade::Version::V1,
)
.await
.unwrap();
let NotificationsOutOpen { handshake, substream, .. } =
<NotificationsOut as OutboundUpgrade<_>>::upgrade_outbound(
notifs_out,
substream,
"/test/proto/1".into(),
)
.await?;
Ok((handshake, substream))
}
/// Listens on a localhost, negotiates the protocol, and returns the substream along with the
/// handshake message.
///
/// Also sends the listener address through the given channel.
async fn listen_on_localhost(
listener_addr_tx: oneshot::Sender<std::net::SocketAddr>,
) -> Result<
(
Vec<u8>,
NotificationsInSubstream<
multistream_select::Negotiated<tokio_util::compat::Compat<TcpStream>>,
>,
),
NotificationsHandshakeError,
> {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let notifs_in = NotificationsIn::new("/test/proto/1", Vec::new(), 1024 * 1024);
let (_, substream) =
multistream_select::listener_select_proto(socket.compat(), notifs_in.protocol_info())
.await
.unwrap();
let NotificationsInOpen { handshake, substream, .. } =
<NotificationsIn as InboundUpgrade<_>>::upgrade_inbound(
notifs_in,
substream,
"/test/proto/1".into(),
)
.await?;
Ok((handshake, substream))
}
#[tokio::test]
async fn basic_works() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1,
)
.await
.unwrap();
let (handshake, mut substream) =
dial(listener_addr_rx.await.unwrap(), &b"initial message"[..]).await.unwrap();
assert_eq!(handshake, b"hello world");
substream.send(b"test message".to_vec()).await.unwrap();
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
assert_eq!(handshake, b"initial message");
substream.send_handshake(&b"hello world"[..]);
@@ -547,33 +591,17 @@ mod tests {
async fn empty_handshake() {
// Check that everything still works when the handshake messages are empty.
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), vec![], 1024 * 1024),
upgrade::Version::V1,
)
.await
.unwrap();
let (handshake, mut substream) =
dial(listener_addr_rx.await.unwrap(), vec![]).await.unwrap();
assert!(handshake.is_empty());
substream.send(Default::default()).await.unwrap();
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
assert!(handshake.is_empty());
substream.send_handshake(vec![]);
@@ -586,17 +614,10 @@ mod tests {
#[tokio::test]
async fn refused() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let outcome = upgrade::apply_outbound(
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"hello"[..], 1024 * 1024),
upgrade::Version::V1,
)
.await;
let outcome = dial(listener_addr_rx.await.unwrap(), &b"hello"[..]).await;
// Despite the protocol negotiation being successfully conducted on the listener
// side, we have to receive an error here because the listener didn't send the
@@ -604,16 +625,7 @@ mod tests {
assert!(outcome.is_err());
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
let (handshake, substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
assert_eq!(handshake, b"hello");
@@ -625,35 +637,16 @@ mod tests {
#[tokio::test]
async fn large_initial_message_refused() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let ret = upgrade::apply_outbound(
socket.compat(),
// We check that an initial message that is too large gets refused.
NotificationsOut::new(
PROTO_NAME,
Vec::new(),
(0..32768).map(|_| 0).collect::<Vec<_>>(),
1024 * 1024,
),
upgrade::Version::V1,
)
.await;
let ret =
dial(listener_addr_rx.await.unwrap(), (0..32768).map(|_| 0).collect::<Vec<_>>())
.await;
assert!(ret.is_err());
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let ret = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await;
let ret = listen_on_localhost(listener_addr_tx).await;
assert!(ret.is_err());
client.await.unwrap();
@@ -661,30 +654,14 @@ mod tests {
#[tokio::test]
async fn large_handshake_refused() {
const PROTO_NAME: &str = "/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let ret = upgrade::apply_outbound(
socket.compat(),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1,
)
.await;
let ret = dial(listener_addr_rx.await.unwrap(), &b"initial message"[..]).await;
assert!(ret.is_err());
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket.compat(),
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap();
assert_eq!(handshake, b"initial message");
// We check that a handshake that is too large gets refused.