Bump libp2p to 0.40.0 (#10035)

* Bump libp2p to 0.40.0-rc.1

* Fix PingFailure import

* Reduce the number of compilation errors (this is a FIXME commit)

* Bump libp2p to 0.40.0-rc.2

* Fix sc-network::Behaviour to inject events into fields

* Fix some NetworkBehaviourAction types

* More fixes

* More fixes

* More fixes

* Fix DiscoveryBehaviour

* Fix PeerInfoBehaviour

* Fix RequestResponsesBehaviour

* Fix RequestResponsesBehaviour

* Fix Notifications

* Fix NetworkWorker

* Fix Behaviour

* Please borrowchk

* Please borrowchk

* Please borrowchk

* Fix fmt

* Cover all cases in matches

* Fix some clippy warnings

* Fix into_peer_id -> to_peer_id

* Fix some warnings

* Fix some inject_dial_failure FIXMEs

* Fix DiscoveryBehaviour::inject_dial_failure

* Fix RequestResponsesBehaviour::inject_dial_failure

* Fix the order of inject_connection_closed PeerInfoBehaviour events

* Make KademliaEvent with filtering unreachable

* Fix Notifications::inject_dial_failure

* Use concurrent_dial_errors in NetworkWorker

* Remove commented-out RequestResponsesBehaviour::inject_addr_reach_failure

* Fix tests

* Dont report new PendingConnectionError and DialError variants to metrics

* Bump libp2p to 0.40.0

* Add fn inject_listen_failure and inject_address_change

* Review fixes
This commit is contained in:
Roman
2021-11-25 11:33:33 +03:00
committed by GitHub
parent 5e2b93c2ea
commit c5ae5190b2
21 changed files with 564 additions and 492 deletions
+8 -4
View File
@@ -32,7 +32,10 @@ use libp2p::{
core::{Multiaddr, PeerId, PublicKey},
identify::IdentifyInfo,
kad::record,
swarm::{toggle::Toggle, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
swarm::{
toggle::Toggle, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess,
PollParameters,
},
NetworkBehaviour,
};
use log::debug;
@@ -58,7 +61,7 @@ pub use crate::request_responses::{
/// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll")]
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll", event_process = true)]
pub struct Behaviour<B: BlockT> {
/// All the substrate-specific protocols.
substrate: Protocol<B>,
@@ -512,11 +515,12 @@ impl<B: BlockT> NetworkBehaviourEventProcess<DiscoveryOut> for Behaviour<B> {
}
impl<B: BlockT> Behaviour<B> {
fn poll<TEv>(
fn poll(
&mut self,
_cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
) -> Poll<NetworkBehaviourAction<BehaviourOut<B>, <Self as NetworkBehaviour>::ProtocolsHandler>>
{
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
+6 -8
View File
@@ -39,8 +39,7 @@ use libp2p::{
UpgradeInfo,
},
swarm::{
IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
OneShotHandler, PollParameters, ProtocolsHandler,
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, OneShotHandler, PollParameters,
},
};
use log::{debug, error, trace};
@@ -297,12 +296,11 @@ impl<B: BlockT> NetworkBehaviour for Bitswap<B> {
self.ready_blocks.push_back((peer, response));
}
fn poll(&mut self, _ctx: &mut Context, _: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
>{
fn poll(
&mut self,
_ctx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some((peer_id, message)) = self.ready_blocks.pop_front() {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
+95 -40
View File
@@ -67,8 +67,8 @@ use libp2p::{
mdns::{Mdns, MdnsConfig, MdnsEvent},
multiaddr::Protocol,
swarm::{
protocols_handler::multi::IntoMultiHandler, IntoProtocolsHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, ProtocolsHandler,
protocols_handler::multi::IntoMultiHandler, DialError, IntoProtocolsHandler,
NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
},
};
use log::{debug, error, info, trace, warn};
@@ -107,7 +107,7 @@ impl DiscoveryConfig {
/// Create a default configuration with the given public key.
pub fn new(local_public_key: PublicKey) -> Self {
Self {
local_peer_id: local_public_key.into_peer_id(),
local_peer_id: local_public_key.to_peer_id(),
permanent_addresses: Vec::new(),
dht_random_walk: true,
allow_private_ipv4: true,
@@ -428,6 +428,29 @@ impl DiscoveryBehaviour {
};
ip.is_global()
}
fn new_handler_with_replacement(
&mut self,
pid: ProtocolId,
handler: KademliaHandlerProto<QueryId>,
) -> <DiscoveryBehaviour as NetworkBehaviour>::ProtocolsHandler {
let mut handlers: HashMap<_, _> = self
.kademlias
.iter_mut()
.map(|(p, k)| (p.clone(), NetworkBehaviour::new_handler(k)))
.collect();
if let Some(h) = handlers.get_mut(&pid) {
*h = handler
}
IntoMultiHandler::try_from_iter(handlers).expect(
"There can be at most one handler per `ProtocolId` and protocol names contain the \
`ProtocolId` so no two protocol names in `self.kademlias` can be equal which is the \
only error `try_from_iter` can return, therefore this call is guaranteed to succeed; \
qed",
)
}
}
/// Event generated by the `DiscoveryBehaviour`.
@@ -527,15 +550,34 @@ impl NetworkBehaviour for DiscoveryBehaviour {
list
}
fn inject_address_change(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
old: &ConnectedPoint,
new: &ConnectedPoint,
) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_address_change(k, peer_id, connection_id, old, new);
}
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
) {
self.num_connections += 1;
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connection_established(k, peer_id, conn, endpoint)
NetworkBehaviour::inject_connection_established(
k,
peer_id,
conn,
endpoint,
failed_addresses,
)
}
}
@@ -547,14 +589,13 @@ impl NetworkBehaviour for DiscoveryBehaviour {
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
_peer_id: &PeerId,
_conn: &ConnectionId,
_endpoint: &ConnectedPoint,
_handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
self.num_connections -= 1;
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connection_closed(k, peer_id, conn, endpoint)
}
// NetworkBehaviour::inject_connection_closed on Kademlia<MemoryStore> does nothing.
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
@@ -563,20 +604,25 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
fn inject_addr_reach_failure(
fn inject_dial_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
peer_id: Option<PeerId>,
_: Self::ProtocolsHandler,
error: &DialError,
) {
if let Some(peer_id) = peer_id {
if let Some(list) = self.ephemeral_addresses.get_mut(peer_id) {
list.retain(|a| a != addr);
if let DialError::Transport(errors) = error {
if let Some(list) = self.ephemeral_addresses.get_mut(&peer_id) {
for (addr, _error) in errors {
list.retain(|a| a != addr);
}
}
}
}
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_addr_reach_failure(k, peer_id, addr, error)
let handler = k.new_handler();
NetworkBehaviour::inject_dial_failure(k, peer_id, handler, error);
}
}
@@ -631,12 +677,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_dial_failure(k, peer_id)
}
}
fn inject_new_listener(&mut self, id: ListenerId) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_new_listener(k, id)
@@ -649,6 +689,10 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
fn inject_listen_failure(&mut self, _: &Multiaddr, _: &Multiaddr, _: Self::ProtocolsHandler) {
// NetworkBehaviour::inject_listen_failure on Kademlia<MemoryStore> does nothing.
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_listener_error(k, id, err)
@@ -665,12 +709,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
>{
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
// Immediately process the content of `discovered`.
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
@@ -731,6 +770,10 @@ impl NetworkBehaviour for DiscoveryBehaviour {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::InboundPutRecordRequest { .. } |
KademliaEvent::InboundAddProviderRequest { .. } => {
debug_assert!(false, "We don't use kad filtering at the moment");
},
KademliaEvent::PendingRoutablePeer { .. } |
KademliaEvent::InboundRequestServed { .. } => {
// We are not interested in this event at the moment.
@@ -847,10 +890,20 @@ impl NetworkBehaviour for DiscoveryBehaviour {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
},
},
NetworkBehaviourAction::DialAddress { address } =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
NetworkBehaviourAction::DialAddress { address, handler } => {
let pid = pid.clone();
let handler = self.new_handler_with_replacement(pid, handler);
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })
},
NetworkBehaviourAction::DialPeer { peer_id, condition, handler } => {
let pid = pid.clone();
let handler = self.new_handler_with_replacement(pid, handler);
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
handler,
})
},
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
@@ -888,10 +941,12 @@ impl NetworkBehaviour for DiscoveryBehaviour {
},
MdnsEvent::Expired(_) => {},
},
NetworkBehaviourAction::DialAddress { address } =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
NetworkBehaviourAction::DialAddress { .. } => {
unreachable!("mDNS never dials!");
},
NetworkBehaviourAction::DialPeer { .. } => {
unreachable!("mDNS never dials!");
},
NetworkBehaviourAction::NotifyHandler { event, .. } => match event {}, /* `event` is an enum with no variant */
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
@@ -940,7 +995,7 @@ impl MdnsWrapper {
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<void::Void, MdnsEvent>> {
) -> Poll<NetworkBehaviourAction<MdnsEvent, <Mdns as NetworkBehaviour>::ProtocolsHandler>> {
loop {
match self {
Self::Instantiating(fut) =>
@@ -1007,13 +1062,13 @@ mod tests {
config.finish()
};
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let mut swarm = Swarm::new(transport, behaviour, keypair.public().to_peer_id());
let listen_addr: Multiaddr =
format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
if i == 0 {
first_swarm_peer_id_and_addr =
Some((keypair.public().into_peer_id(), listen_addr.clone()))
Some((keypair.public().to_peer_id(), listen_addr.clone()))
}
swarm.listen_on(listen_addr.clone()).unwrap();
+68 -30
View File
@@ -186,6 +186,17 @@ impl NetworkBehaviour for PeerInfoBehaviour {
list
}
fn inject_address_change(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
old: &ConnectedPoint,
new: &ConnectedPoint,
) {
self.ping.inject_address_change(peer_id, conn, old, new);
self.identify.inject_address_change(peer_id, conn, old, new);
}
fn inject_connected(&mut self, peer_id: &PeerId) {
self.ping.inject_connected(peer_id);
self.identify.inject_connected(peer_id);
@@ -196,9 +207,12 @@ impl NetworkBehaviour for PeerInfoBehaviour {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
) {
self.ping.inject_connection_established(peer_id, conn, endpoint);
self.identify.inject_connection_established(peer_id, conn, endpoint);
self.ping
.inject_connection_established(peer_id, conn, endpoint, failed_addresses);
self.identify
.inject_connection_established(peer_id, conn, endpoint, failed_addresses);
match self.nodes_info.entry(*peer_id) {
Entry::Vacant(e) => {
e.insert(NodeInfo::new(endpoint.clone()));
@@ -220,9 +234,12 @@ impl NetworkBehaviour for PeerInfoBehaviour {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
self.ping.inject_connection_closed(peer_id, conn, endpoint);
self.identify.inject_connection_closed(peer_id, conn, endpoint);
let (ping_handler, identity_handler) = handler.into_inner();
self.identify
.inject_connection_closed(peer_id, conn, endpoint, identity_handler);
self.ping.inject_connection_closed(peer_id, conn, endpoint, ping_handler);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.endpoints.retain(|ep| ep != endpoint)
@@ -256,19 +273,15 @@ impl NetworkBehaviour for PeerInfoBehaviour {
}
}
fn inject_addr_reach_failure(
fn inject_dial_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
peer_id: Option<PeerId>,
handler: Self::ProtocolsHandler,
error: &libp2p::swarm::DialError,
) {
self.ping.inject_addr_reach_failure(peer_id, addr, error);
self.identify.inject_addr_reach_failure(peer_id, addr, error);
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
self.ping.inject_dial_failure(peer_id);
self.identify.inject_dial_failure(peer_id);
let (ping_handler, identity_handler) = handler.into_inner();
self.identify.inject_dial_failure(peer_id, identity_handler, error);
self.ping.inject_dial_failure(peer_id, ping_handler, error);
}
fn inject_new_listener(&mut self, id: ListenerId) {
@@ -296,6 +309,18 @@ impl NetworkBehaviour for PeerInfoBehaviour {
self.identify.inject_expired_external_addr(addr);
}
fn inject_listen_failure(
&mut self,
local_addr: &Multiaddr,
send_back_addr: &Multiaddr,
handler: Self::ProtocolsHandler,
) {
let (ping_handler, identity_handler) = handler.into_inner();
self.identify
.inject_listen_failure(local_addr, send_back_addr, identity_handler);
self.ping.inject_listen_failure(local_addr, send_back_addr, ping_handler);
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn error::Error + 'static)) {
self.ping.inject_listener_error(id, err);
self.identify.inject_listener_error(id, err);
@@ -309,13 +334,8 @@ impl NetworkBehaviour for PeerInfoBehaviour {
fn poll(
&mut self,
cx: &mut Context,
params: &mut impl PollParameters
) -> Poll<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent
>
>{
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
loop {
match self.ping.poll(cx, params) {
Poll::Pending => break,
@@ -324,10 +344,20 @@ impl NetworkBehaviour for PeerInfoBehaviour {
self.handle_ping_report(&peer, rtt)
}
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => {
let handler =
IntoProtocolsHandler::select(handler, self.identify.new_handler());
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, handler }) => {
let handler =
IntoProtocolsHandler::select(handler, self.identify.new_handler());
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
handler,
})
},
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
@@ -362,10 +392,18 @@ impl NetworkBehaviour for PeerInfoBehaviour {
IdentifyEvent::Pushed { .. } => {},
IdentifyEvent::Sent { .. } => {},
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => {
let handler = IntoProtocolsHandler::select(self.ping.new_handler(), handler);
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, handler }) => {
let handler = IntoProtocolsHandler::select(self.ping.new_handler(), handler);
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
handler,
})
},
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
+15 -21
View File
@@ -1362,8 +1362,10 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
) {
self.behaviour.inject_connection_established(peer_id, conn, endpoint)
self.behaviour
.inject_connection_established(peer_id, conn, endpoint, failed_addresses)
}
fn inject_connection_closed(
@@ -1371,8 +1373,9 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
self.behaviour.inject_connection_closed(peer_id, conn, endpoint)
self.behaviour.inject_connection_closed(peer_id, conn, endpoint, handler)
}
fn inject_connected(&mut self, peer_id: &PeerId) {
@@ -1396,12 +1399,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
&mut self,
cx: &mut std::task::Context,
params: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent
>
>{
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
}
@@ -1562,10 +1560,10 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let event = match self.behaviour.poll(cx, params) {
Poll::Pending => return Poll::Pending,
Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev,
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }),
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, handler }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, handler }),
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
@@ -1778,17 +1776,13 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
Poll::Pending
}
fn inject_addr_reach_failure(
fn inject_dial_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
peer_id: Option<PeerId>,
handler: Self::ProtocolsHandler,
error: &libp2p::swarm::DialError,
) {
self.behaviour.inject_addr_reach_failure(peer_id, addr, error)
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
self.behaviour.inject_dial_failure(peer_id)
self.behaviour.inject_dial_failure(peer_id, handler, error);
}
fn inject_new_listener(&mut self, id: ListenerId) {
@@ -26,7 +26,8 @@ use futures::prelude::*;
use libp2p::{
core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId},
swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
DialError, DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour,
NetworkBehaviourAction, NotifyHandler, PollParameters,
},
};
use log::{error, trace, warn};
@@ -38,7 +39,7 @@ use std::{
borrow::Cow,
cmp,
collections::{hash_map::Entry, VecDeque},
error, mem,
mem,
pin::Pin,
str,
sync::Arc,
@@ -132,7 +133,7 @@ pub struct Notifications {
next_incoming_index: sc_peerset::IncomingIndex,
/// Events to produce from `poll()`.
events: VecDeque<NetworkBehaviourAction<NotifsHandlerIn, NotificationsOut>>,
events: VecDeque<NetworkBehaviourAction<NotificationsOut, NotifsHandlerProto>>,
}
/// Configuration for a notifications protocol.
@@ -628,6 +629,7 @@ impl Notifications {
/// Function that is called when the peerset wants us to connect to a peer.
fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: sc_peerset::SetId) {
// If `PeerId` is unknown to us, insert an entry, start dialing, and return early.
let handler = self.new_handler();
let mut occ_entry = match self.peers.entry((peer_id, set_id)) {
Entry::Occupied(entry) => entry,
Entry::Vacant(entry) => {
@@ -643,6 +645,7 @@ impl Notifications {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: entry.key().0.clone(),
condition: DialPeerCondition::Disconnected,
handler,
});
entry.insert(PeerState::Requested);
return
@@ -679,6 +682,7 @@ impl Notifications {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: occ_entry.key().0.clone(),
condition: DialPeerCondition::Disconnected,
handler,
});
*occ_entry.into_mut() = PeerState::Requested;
},
@@ -1094,6 +1098,7 @@ impl NetworkBehaviour for Notifications {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
) {
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
match self.peers.entry((*peer_id, set_id)).or_insert(PeerState::Poisoned) {
@@ -1152,6 +1157,7 @@ impl NetworkBehaviour for Notifications {
peer_id: &PeerId,
conn: &ConnectionId,
_endpoint: &ConnectedPoint,
_handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
@@ -1411,70 +1417,74 @@ impl NetworkBehaviour for Notifications {
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
fn inject_addr_reach_failure(
fn inject_dial_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn error::Error,
peer_id: Option<PeerId>,
_: Self::ProtocolsHandler,
error: &DialError,
) {
trace!(target: "sub-libp2p", "Libp2p => Reach failure for {:?} through {:?}: {:?}", peer_id, addr, error);
}
if let DialError::Transport(errors) = error {
for (addr, error) in errors.iter() {
trace!(target: "sub-libp2p", "Libp2p => Reach failure for {:?} through {:?}: {:?}", peer_id, addr, error);
}
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);
if let Some(peer_id) = peer_id {
trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
if let Entry::Occupied(mut entry) = self.peers.entry((peer_id.clone(), set_id)) {
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
// The peer is not in our list.
st @ PeerState::Backoff { .. } => {
*entry.into_mut() = st;
},
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
if let Entry::Occupied(mut entry) = self.peers.entry((peer_id.clone(), set_id)) {
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
// The peer is not in our list.
st @ PeerState::Backoff { .. } => {
*entry.into_mut() = st;
},
// "Basic" situation: we failed to reach a peer that the peerset requested.
st @ PeerState::Requested | st @ PeerState::PendingRequest { .. } => {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, *peer_id, DropReason::Unknown);
// "Basic" situation: we failed to reach a peer that the peerset requested.
st @ PeerState::Requested | st @ PeerState::PendingRequest { .. } => {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, peer_id, DropReason::Unknown);
let now = Instant::now();
let ban_duration = match st {
PeerState::PendingRequest { timer_deadline, .. }
if timer_deadline > now =>
cmp::max(timer_deadline - now, Duration::from_secs(5)),
_ => Duration::from_secs(5),
};
let now = Instant::now();
let ban_duration = match st {
PeerState::PendingRequest { timer_deadline, .. }
if timer_deadline > now =>
cmp::max(timer_deadline - now, Duration::from_secs(5)),
_ => Duration::from_secs(5),
};
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(ban_duration);
let peer_id = *peer_id;
self.delays.push(
async move {
delay.await;
(delay_id, peer_id, set_id)
}
.boxed(),
);
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(ban_duration);
let peer_id = peer_id;
self.delays.push(
async move {
delay.await;
(delay_id, peer_id, set_id)
}
.boxed(),
);
*entry.into_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: now + ban_duration,
};
},
*entry.into_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: now + ban_duration,
};
},
// We can still get dial failures even if we are already connected to the peer,
// as an extra diagnostic for an earlier attempt.
st @ PeerState::Disabled { .. } |
st @ PeerState::Enabled { .. } |
st @ PeerState::DisabledPendingEnable { .. } |
st @ PeerState::Incoming { .. } => {
*entry.into_mut() = st;
},
// We can still get dial failures even if we are already connected to the
// peer, as an extra diagnostic for an earlier attempt.
st @ PeerState::Disabled { .. } |
st @ PeerState::Enabled { .. } |
st @ PeerState::DisabledPendingEnable { .. } |
st @ PeerState::Incoming { .. } => {
*entry.into_mut() = st;
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id);
debug_assert!(false);
},
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id);
debug_assert!(false);
},
}
}
}
}
@@ -2000,7 +2010,7 @@ impl NetworkBehaviour for Notifications {
&mut self,
cx: &mut Context,
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<NotifsHandlerIn, Self::OutEvent>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event)
}
@@ -2032,6 +2042,8 @@ impl NetworkBehaviour for Notifications {
while let Poll::Ready(Some((delay_id, peer_id, set_id))) =
Pin::new(&mut self.delays).poll_next(cx)
{
let handler = self.new_handler();
let peer_state = match self.peers.get_mut(&(peer_id, set_id)) {
Some(s) => s,
// We intentionally never remove elements from `delays`, and it may
@@ -2051,6 +2063,7 @@ impl NetworkBehaviour for Notifications {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler,
});
*peer_state = PeerState::Requested;
},
@@ -29,7 +29,7 @@ use libp2p::{
},
identity, noise,
swarm::{
IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
ProtocolsHandler, Swarm, SwarmEvent,
},
yamux, Multiaddr, PeerId, Transport,
@@ -68,7 +68,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
in_peers: 25,
out_peers: 25,
bootnodes: if index == 0 {
keypairs.iter().skip(1).map(|keypair| keypair.public().into_peer_id()).collect()
keypairs.iter().skip(1).map(|keypair| keypair.public().to_peer_id()).collect()
} else {
vec![]
},
@@ -92,7 +92,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
.enumerate()
.filter_map(|(n, a)| {
if n != index {
Some((keypairs[n].public().into_peer_id(), a.clone()))
Some((keypairs[n].public().to_peer_id(), a.clone()))
} else {
None
}
@@ -100,7 +100,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
.collect(),
};
let mut swarm = Swarm::new(transport, behaviour, keypairs[index].public().into_peer_id());
let mut swarm = Swarm::new(transport, behaviour, keypairs[index].public().to_peer_id());
swarm.listen_on(addrs[index].clone()).unwrap();
out.push(swarm);
}
@@ -163,8 +163,10 @@ impl NetworkBehaviour for CustomProtoWithAddr {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
) {
self.inner.inject_connection_established(peer_id, conn, endpoint)
self.inner
.inject_connection_established(peer_id, conn, endpoint, failed_addresses)
}
fn inject_connection_closed(
@@ -172,8 +174,9 @@ impl NetworkBehaviour for CustomProtoWithAddr {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
self.inner.inject_connection_closed(peer_id, conn, endpoint)
self.inner.inject_connection_closed(peer_id, conn, endpoint, handler)
}
fn inject_event(
@@ -188,27 +191,18 @@ impl NetworkBehaviour for CustomProtoWithAddr {
fn poll(
&mut self,
cx: &mut Context,
params: &mut impl PollParameters
) -> Poll<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent
>
>{
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
self.inner.poll(cx, params)
}
fn inject_addr_reach_failure(
fn inject_dial_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
peer_id: Option<PeerId>,
handler: Self::ProtocolsHandler,
error: &DialError,
) {
self.inner.inject_addr_reach_failure(peer_id, addr, error)
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
self.inner.inject_dial_failure(peer_id)
self.inner.inject_dial_failure(peer_id, handler, error)
}
fn inject_new_listener(&mut self, id: ListenerId) {
@@ -45,12 +45,12 @@ use libp2p::{
ConnectedPoint, Multiaddr, PeerId,
},
request_response::{
ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage, ResponseChannel,
handler::RequestResponseHandler, ProtocolSupport, RequestResponse, RequestResponseCodec,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
},
swarm::{
protocols_handler::multi::MultiHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters, ProtocolsHandler,
protocols_handler::multi::MultiHandler, IntoProtocolsHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, ProtocolsHandler,
},
};
use std::{
@@ -377,6 +377,27 @@ impl RequestResponsesBehaviour {
};
}
}
fn new_handler_with_replacement(
&mut self,
protocol: String,
handler: RequestResponseHandler<GenericCodec>,
) -> <RequestResponsesBehaviour as NetworkBehaviour>::ProtocolsHandler {
let mut handlers: HashMap<_, _> = self
.protocols
.iter_mut()
.map(|(p, (r, _))| (p.to_string(), NetworkBehaviour::new_handler(r)))
.collect();
if let Some(h) = handlers.get_mut(&protocol) {
*h = handler
}
MultiHandler::try_from_iter(handlers).expect(
"Protocols are in a HashMap and there can be at most one handler per protocol name, \
which is the only possible error; qed",
)
}
}
impl NetworkBehaviour for RequestResponsesBehaviour {
@@ -405,9 +426,16 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_connection_established(p, peer_id, conn, endpoint)
NetworkBehaviour::inject_connection_established(
p,
peer_id,
conn,
endpoint,
failed_addresses,
)
}
}
@@ -422,9 +450,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
_handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_connection_closed(p, peer_id, conn, endpoint)
let handler = p.new_handler();
NetworkBehaviour::inject_connection_closed(p, peer_id, conn, endpoint, handler);
}
}
@@ -434,17 +464,6 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
}
}
fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_addr_reach_failure(p, peer_id, addr, error)
}
}
fn inject_event(
&mut self,
peer_id: PeerId,
@@ -478,9 +497,15 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
}
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_: Self::ProtocolsHandler,
error: &libp2p::swarm::DialError,
) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_dial_failure(p, peer_id)
let handler = p.new_handler();
NetworkBehaviour::inject_dial_failure(p, peer_id, handler, error)
}
}
@@ -512,12 +537,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
'poll_all: loop {
if let Some(message_request) = self.message_request.take() {
// Now we can can poll `MessageRequest` until we get the reputation
@@ -658,17 +678,26 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// Other events generated by the underlying behaviour are transparently
// passed through.
NetworkBehaviourAction::DialAddress { address } => {
NetworkBehaviourAction::DialAddress { address, handler } => {
log::error!(
"The request-response isn't supposed to start dialing peers"
);
return Poll::Ready(NetworkBehaviourAction::DialAddress { address })
let protocol = protocol.to_string();
let handler = self.new_handler_with_replacement(protocol, handler);
return Poll::Ready(NetworkBehaviourAction::DialAddress {
address,
handler,
})
},
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
NetworkBehaviourAction::DialPeer { peer_id, condition, handler } => {
let protocol = protocol.to_string();
let handler = self.new_handler_with_replacement(protocol, handler);
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
}),
handler,
})
},
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
@@ -1061,7 +1090,7 @@ mod tests {
let behaviour = RequestResponsesBehaviour::new(list, handle).unwrap();
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let mut swarm = Swarm::new(transport, behaviour, keypair.public().to_peer_id());
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
swarm.listen_on(listen_addr.clone()).unwrap();
+49 -40
View File
@@ -56,10 +56,10 @@ use libp2p::{
},
kad::record,
multiaddr,
ping::handler::PingFailure,
ping::Failure as PingFailure,
swarm::{
protocols_handler::NodeHandlerWrapperError, AddressScore, NetworkBehaviour, SwarmBuilder,
SwarmEvent,
protocols_handler::NodeHandlerWrapperError, AddressScore, DialError, NetworkBehaviour,
SwarmBuilder, SwarmEvent,
},
Multiaddr, PeerId,
};
@@ -176,7 +176,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
// Private and public keys configuration.
let local_identity = params.network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
let local_peer_id = local_public.clone().into_peer_id();
let local_peer_id = local_public.clone().to_peer_id();
info!(
target: "sub-libp2p",
"🏷 Local node identity is: {}",
@@ -1845,8 +1845,13 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
peer_id,
endpoint,
num_established,
concurrent_dial_errors,
}) => {
debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
if let Some(errors) = concurrent_dial_errors {
debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
} else {
debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
}
if let Some(metrics) = this.metrics.as_ref() {
let direction = match endpoint {
@@ -1914,37 +1919,41 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
metrics.listeners_local_addresses.dec();
}
},
Poll::Ready(SwarmEvent::UnreachableAddr { peer_id, address, error, .. }) => {
trace!(
target: "sub-libp2p",
"Libp2p => Failed to reach {:?} through {:?}: {}",
peer_id, address, error,
);
Poll::Ready(SwarmEvent::OutgoingConnectionError { peer_id, error }) => {
if let Some(peer_id) = peer_id {
trace!(
target: "sub-libp2p",
"Libp2p => Failed to reach {:?}: {}",
peer_id, error,
);
if this.boot_node_ids.contains(&peer_id) {
if let PendingConnectionError::InvalidPeerId = error {
error!(
"💔 The bootnode you want to connect to at `{}` provided a different peer ID than the one you expect: `{}`.",
address, peer_id,
);
if this.boot_node_ids.contains(&peer_id) {
if let DialError::InvalidPeerId = error {
error!(
"💔 The bootnode you want to connect provided a different peer ID than the one you expect: `{}`.",
peer_id,
);
}
}
}
if let Some(metrics) = this.metrics.as_ref() {
match error {
PendingConnectionError::ConnectionLimit(_) => metrics
let reason = match error {
DialError::ConnectionLimit(_) => Some("limit-reached"),
DialError::InvalidPeerId => Some("invalid-peer-id"),
DialError::Transport(_) | DialError::ConnectionIo(_) =>
Some("transport-error"),
DialError::Banned |
DialError::LocalPeerId |
DialError::NoAddresses |
DialError::DialPeerConditionFalse(_) |
DialError::Aborted => None, // ignore them
};
if let Some(reason) = reason {
metrics
.pending_connections_errors_total
.with_label_values(&["limit-reached"])
.inc(),
PendingConnectionError::InvalidPeerId => metrics
.pending_connections_errors_total
.with_label_values(&["invalid-peer-id"])
.inc(),
PendingConnectionError::Transport(_) |
PendingConnectionError::IO(_) => metrics
.pending_connections_errors_total
.with_label_values(&["transport-error"])
.inc(),
.with_label_values(&[reason])
.inc();
}
}
},
@@ -1970,16 +1979,19 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
);
if let Some(metrics) = this.metrics.as_ref() {
let reason = match error {
PendingConnectionError::ConnectionLimit(_) => "limit-reached",
PendingConnectionError::InvalidPeerId => "invalid-peer-id",
PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"),
PendingConnectionError::InvalidPeerId => Some("invalid-peer-id"),
PendingConnectionError::Transport(_) |
PendingConnectionError::IO(_) => "transport-error",
PendingConnectionError::IO(_) => Some("transport-error"),
PendingConnectionError::Aborted => None, // ignore it
};
metrics
.incoming_connections_errors_total
.with_label_values(&[reason])
.inc();
if let Some(reason) = reason {
metrics
.incoming_connections_errors_total
.with_label_values(&[reason])
.inc();
}
}
},
Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => {
@@ -1995,9 +2007,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
.inc();
}
},
Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, error }) => {
trace!(target: "sub-libp2p", "Libp2p => UnknownPeerUnreachableAddr({}): {}", address, error)
},
Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses, .. }) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.listeners_local_addresses.sub(addresses.len() as u64);