Upgrade to libp2p 0.44.0 (#11009)

* Update libp2p to 0.43.0, lru to 0.7.3

* Fix websoket Incoming::Data

* Rename ProtocolsHandler -> ConnectionHandler, remove inject_dis/connected, minor fixes

* Fix args for inject_connection* callbacks

* Fix DialPeer/DialAddress

* Fix debug fmt

* Add Endpoint to NetworkState

* Fix Kad::get_record by key

* Fix Sha2_256::digest

* Fix IntoConnectionHandler

* Fix borrowchk

* Fix DialError::WrongPeerId

* Remove NodeHandlerWrapperError

* Fix KademliaEvent variants

* Fix impl Add for String

* Fix tabs in network_state

* Apply cargo fmt

* Fix a typo in req/resp

* Fix tests

* Fix peer_info:entry.info_expire

* Fix PeerInfoBehaviour inject_address_change and inject_connection_closed

* Patch libp2p to 0.44.0#6cc3b4e

* Fix inject_connection_closed kad, req/resp

* Apply cargo fmt

* Use libp2p from crates.io

* Fix review notes
This commit is contained in:
Roman
2022-04-29 12:49:05 +04:00
committed by GitHub
parent 51915ebe4a
commit 887acda7d0
24 changed files with 759 additions and 542 deletions
+4 -4
View File
@@ -33,8 +33,8 @@ use libp2p::{
identify::IdentifyInfo,
kad::record,
swarm::{
toggle::Toggle, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess,
PollParameters,
behaviour::toggle::Toggle, NetworkBehaviour, NetworkBehaviourAction,
NetworkBehaviourEventProcess, PollParameters,
},
NetworkBehaviour,
};
@@ -304,7 +304,7 @@ impl<B: BlockT> Behaviour<B> {
/// Start querying a record from the DHT. Will later produce either a `ValueFound` or a
/// `ValueNotFound` event.
pub fn get_value(&mut self, key: &record::Key) {
pub fn get_value(&mut self, key: record::Key) {
self.discovery.get_value(key);
}
@@ -519,7 +519,7 @@ impl<B: BlockT> Behaviour<B> {
&mut self,
_cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<BehaviourOut<B>, <Self as NetworkBehaviour>::ProtocolsHandler>>
) -> Poll<NetworkBehaviourAction<BehaviourOut<B>, <Self as NetworkBehaviour>::ConnectionHandler>>
{
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
+3 -7
View File
@@ -194,10 +194,10 @@ impl<B: BlockT> Bitswap<B> {
}
impl<B: BlockT> NetworkBehaviour for Bitswap<B> {
type ProtocolsHandler = OneShotHandler<BitswapConfig, BitswapMessage, HandlerEvent>;
type ConnectionHandler = OneShotHandler<BitswapConfig, BitswapMessage, HandlerEvent>;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
Default::default()
}
@@ -205,10 +205,6 @@ impl<B: BlockT> NetworkBehaviour for Bitswap<B> {
Vec::new()
}
fn inject_connected(&mut self, _peer: &PeerId) {}
fn inject_disconnected(&mut self, _peer: &PeerId) {}
fn inject_event(&mut self, peer: PeerId, _connection: ConnectionId, message: HandlerEvent) {
let request = match message {
HandlerEvent::ResponseSent => return,
@@ -300,7 +296,7 @@ impl<B: BlockT> NetworkBehaviour for Bitswap<B> {
&mut self,
_ctx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some((peer_id, message)) = self.ready_blocks.pop_front() {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
+33 -48
View File
@@ -67,8 +67,8 @@ use libp2p::{
mdns::{Mdns, MdnsConfig, MdnsEvent},
multiaddr::Protocol,
swarm::{
protocols_handler::multi::IntoMultiHandler, DialError, IntoProtocolsHandler,
NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
handler::multi::IntoMultiHandler, ConnectionHandler, DialError, IntoConnectionHandler,
NetworkBehaviour, NetworkBehaviourAction, PollParameters,
},
};
use log::{debug, error, info, trace, warn};
@@ -355,9 +355,9 @@ impl DiscoveryBehaviour {
/// Start fetching a record from the DHT.
///
/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
pub fn get_value(&mut self, key: &record::Key) {
pub fn get_value(&mut self, key: record::Key) {
for k in self.kademlias.values_mut() {
k.get_record(key, Quorum::One);
k.get_record(key.clone(), Quorum::One);
}
}
@@ -433,7 +433,7 @@ impl DiscoveryBehaviour {
&mut self,
pid: ProtocolId,
handler: KademliaHandlerProto<QueryId>,
) -> <DiscoveryBehaviour as NetworkBehaviour>::ProtocolsHandler {
) -> <DiscoveryBehaviour as NetworkBehaviour>::ConnectionHandler {
let mut handlers: HashMap<_, _> = self
.kademlias
.iter_mut()
@@ -498,10 +498,10 @@ pub enum DiscoveryOut {
}
impl NetworkBehaviour for DiscoveryBehaviour {
type ProtocolsHandler = IntoMultiHandler<ProtocolId, KademliaHandlerProto<QueryId>>;
type ConnectionHandler = IntoMultiHandler<ProtocolId, KademliaHandlerProto<QueryId>>;
type OutEvent = DiscoveryOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
let iter = self
.kademlias
.iter_mut()
@@ -568,6 +568,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.num_connections += 1;
for k in self.kademlias.values_mut() {
@@ -577,37 +578,37 @@ impl NetworkBehaviour for DiscoveryBehaviour {
conn,
endpoint,
failed_addresses,
other_established,
)
}
}
fn inject_connected(&mut self, peer_id: &PeerId) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connected(k, peer_id)
}
}
fn inject_connection_closed(
&mut self,
_peer_id: &PeerId,
_conn: &ConnectionId,
_endpoint: &ConnectedPoint,
_handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
self.num_connections -= 1;
// NetworkBehaviour::inject_connection_closed on Kademlia<MemoryStore> does nothing.
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_disconnected(k, peer_id)
for (pid, event) in handler.into_iter() {
if let Some(kad) = self.kademlias.get_mut(&pid) {
kad.inject_connection_closed(peer_id, conn, endpoint, event, remaining_established)
} else {
error!(
target: "sub-libp2p",
"inject_connection_closed: no kademlia instance registered for protocol {:?}",
pid,
)
}
}
}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_: Self::ProtocolsHandler,
_: Self::ConnectionHandler,
error: &DialError,
) {
if let Some(peer_id) = peer_id {
@@ -630,7 +631,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
&mut self,
peer_id: PeerId,
connection: ConnectionId,
(pid, event): <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
(pid, event): <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
if let Some(kad) = self.kademlias.get_mut(&pid) {
return kad.inject_event(peer_id, connection, event)
@@ -689,7 +690,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
fn inject_listen_failure(&mut self, _: &Multiaddr, _: &Multiaddr, _: Self::ProtocolsHandler) {
fn inject_listen_failure(&mut self, _: &Multiaddr, _: &Multiaddr, _: Self::ConnectionHandler) {
// NetworkBehaviour::inject_listen_failure on Kademlia<MemoryStore> does nothing.
}
@@ -709,7 +710,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// Immediately process the content of `discovered`.
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
@@ -770,12 +771,8 @@ impl NetworkBehaviour for DiscoveryBehaviour {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::InboundPutRecordRequest { .. } |
KademliaEvent::InboundAddProviderRequest { .. } => {
debug_assert!(false, "We don't use kad filtering at the moment");
},
KademliaEvent::PendingRoutablePeer { .. } |
KademliaEvent::InboundRequestServed { .. } => {
KademliaEvent::InboundRequest { .. } => {
// We are not interested in this event at the moment.
},
KademliaEvent::OutboundQueryCompleted {
@@ -890,19 +887,10 @@ impl NetworkBehaviour for DiscoveryBehaviour {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
},
},
NetworkBehaviourAction::DialAddress { address, handler } => {
NetworkBehaviourAction::Dial { opts, handler } => {
let pid = pid.clone();
let handler = self.new_handler_with_replacement(pid, handler);
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })
},
NetworkBehaviourAction::DialPeer { peer_id, condition, handler } => {
let pid = pid.clone();
let handler = self.new_handler_with_replacement(pid, handler);
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
handler,
})
return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler })
},
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
@@ -941,10 +929,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
},
MdnsEvent::Expired(_) => {},
},
NetworkBehaviourAction::DialAddress { .. } => {
unreachable!("mDNS never dials!");
},
NetworkBehaviourAction::DialPeer { .. } => {
NetworkBehaviourAction::Dial { .. } => {
unreachable!("mDNS never dials!");
},
NetworkBehaviourAction::NotifyHandler { event, .. } => match event {}, /* `event` is an enum with no variant */
@@ -995,7 +980,7 @@ impl MdnsWrapper {
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<MdnsEvent, <Mdns as NetworkBehaviour>::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<MdnsEvent, <Mdns as NetworkBehaviour>::ConnectionHandler>> {
loop {
match self {
Self::Instantiating(fut) =>
+26 -3
View File
@@ -20,7 +20,10 @@
//!
//! **Warning**: These APIs are not stable.
use libp2p::{core::ConnectedPoint, Multiaddr};
use libp2p::{
core::{ConnectedPoint, Endpoint as CoreEndpoint},
Multiaddr,
};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
@@ -80,7 +83,7 @@ pub struct NotConnectedPeer {
#[serde(rename_all = "camelCase")]
pub enum PeerEndpoint {
/// We are dialing the given address.
Dialing(Multiaddr),
Dialing(Multiaddr, Endpoint),
/// We are listening.
Listening {
/// Local address of the connection.
@@ -90,12 +93,32 @@ pub enum PeerEndpoint {
},
}
/// Part of the `NetworkState` struct. Unstable.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Endpoint {
/// The socket comes from a dialer.
Dialer,
/// The socket comes from a listener.
Listener,
}
impl From<ConnectedPoint> for PeerEndpoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { address } => Self::Dialing(address),
ConnectedPoint::Dialer { address, role_override } =>
Self::Dialing(address, role_override.into()),
ConnectedPoint::Listener { local_addr, send_back_addr } =>
Self::Listening { local_addr, send_back_addr },
}
}
}
impl From<CoreEndpoint> for Endpoint {
fn from(endpoint: CoreEndpoint) -> Self {
match endpoint {
CoreEndpoint::Dialer => Self::Dialer,
CoreEndpoint::Listener => Self::Listener,
}
}
}
+62 -58
View File
@@ -28,8 +28,8 @@ use libp2p::{
identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo},
ping::{Ping, PingConfig, PingEvent, PingSuccess},
swarm::{
IntoProtocolsHandler, IntoProtocolsHandlerSelect, NetworkBehaviour, NetworkBehaviourAction,
PollParameters, ProtocolsHandler,
ConnectionHandler, IntoConnectionHandler, IntoConnectionHandlerSelect, NetworkBehaviour,
NetworkBehaviourAction, PollParameters,
},
Multiaddr,
};
@@ -170,14 +170,14 @@ pub enum PeerInfoEvent {
}
impl NetworkBehaviour for PeerInfoBehaviour {
type ProtocolsHandler = IntoProtocolsHandlerSelect<
<Ping as NetworkBehaviour>::ProtocolsHandler,
<Identify as NetworkBehaviour>::ProtocolsHandler,
type ConnectionHandler = IntoConnectionHandlerSelect<
<Ping as NetworkBehaviour>::ConnectionHandler,
<Identify as NetworkBehaviour>::ConnectionHandler,
>;
type OutEvent = PeerInfoEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
IntoProtocolsHandler::select(self.ping.new_handler(), self.identify.new_handler())
fn new_handler(&mut self) -> Self::ConnectionHandler {
IntoConnectionHandler::select(self.ping.new_handler(), self.identify.new_handler())
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
@@ -195,11 +195,18 @@ impl NetworkBehaviour for PeerInfoBehaviour {
) {
self.ping.inject_address_change(peer_id, conn, old, new);
self.identify.inject_address_change(peer_id, conn, old, new);
}
fn inject_connected(&mut self, peer_id: &PeerId) {
self.ping.inject_connected(peer_id);
self.identify.inject_connected(peer_id);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
*endpoint = new.clone();
} else {
error!(target: "sub-libp2p",
"Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
}
} else {
error!(target: "sub-libp2p",
"Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
}
}
fn inject_connection_established(
@@ -208,11 +215,22 @@ impl NetworkBehaviour for PeerInfoBehaviour {
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.ping
.inject_connection_established(peer_id, conn, endpoint, failed_addresses);
self.identify
.inject_connection_established(peer_id, conn, endpoint, failed_addresses);
self.ping.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
);
self.identify.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
);
match self.nodes_info.entry(*peer_id) {
Entry::Vacant(e) => {
e.insert(NodeInfo::new(endpoint.clone()));
@@ -234,14 +252,29 @@ impl NetworkBehaviour for PeerInfoBehaviour {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
let (ping_handler, identity_handler) = handler.into_inner();
self.identify
.inject_connection_closed(peer_id, conn, endpoint, identity_handler);
self.ping.inject_connection_closed(peer_id, conn, endpoint, ping_handler);
self.identify.inject_connection_closed(
peer_id,
conn,
endpoint,
identity_handler,
remaining_established,
);
self.ping.inject_connection_closed(
peer_id,
conn,
endpoint,
ping_handler,
remaining_established,
);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
if remaining_established == 0 {
entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
}
entry.endpoints.retain(|ep| ep != endpoint)
} else {
error!(target: "sub-libp2p",
@@ -249,23 +282,11 @@ impl NetworkBehaviour for PeerInfoBehaviour {
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.ping.inject_disconnected(peer_id);
self.identify.inject_disconnected(peer_id);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
} else {
error!(target: "sub-libp2p",
"Disconnected from node we were not connected to {:?}", peer_id);
}
}
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
match event {
EitherOutput::First(event) => self.ping.inject_event(peer_id, connection, event),
@@ -276,7 +297,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ProtocolsHandler,
handler: Self::ConnectionHandler,
error: &libp2p::swarm::DialError,
) {
let (ping_handler, identity_handler) = handler.into_inner();
@@ -313,7 +334,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
&mut self,
local_addr: &Multiaddr,
send_back_addr: &Multiaddr,
handler: Self::ProtocolsHandler,
handler: Self::ConnectionHandler,
) {
let (ping_handler, identity_handler) = handler.into_inner();
self.identify
@@ -335,7 +356,7 @@ impl NetworkBehaviour for PeerInfoBehaviour {
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
loop {
match self.ping.poll(cx, params) {
Poll::Pending => break,
@@ -344,19 +365,10 @@ impl NetworkBehaviour for PeerInfoBehaviour {
self.handle_ping_report(&peer, rtt)
}
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => {
Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => {
let handler =
IntoProtocolsHandler::select(handler, self.identify.new_handler());
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, handler }) => {
let handler =
IntoProtocolsHandler::select(handler, self.identify.new_handler());
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
handler,
})
IntoConnectionHandler::select(handler, self.identify.new_handler());
return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler })
},
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
@@ -392,17 +404,9 @@ impl NetworkBehaviour for PeerInfoBehaviour {
IdentifyEvent::Pushed { .. } => {},
IdentifyEvent::Sent { .. } => {},
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => {
let handler = IntoProtocolsHandler::select(self.ping.new_handler(), handler);
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, handler }) => {
let handler = IntoProtocolsHandler::select(self.ping.new_handler(), handler);
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
handler,
})
Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => {
let handler = IntoConnectionHandler::select(self.ping.new_handler(), handler);
return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler })
},
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
+26 -23
View File
@@ -36,8 +36,8 @@ use libp2p::{
},
request_response::OutboundFailure,
swarm::{
IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
ProtocolsHandler,
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
},
Multiaddr, PeerId,
};
@@ -1367,10 +1367,10 @@ pub enum CustomMessageOutcome<B: BlockT> {
}
impl<B: BlockT> NetworkBehaviour for Protocol<B> {
type ProtocolsHandler = <Notifications as NetworkBehaviour>::ProtocolsHandler;
type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
type OutEvent = CustomMessageOutcome<B>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
self.behaviour.new_handler()
}
@@ -1384,9 +1384,15 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.behaviour
.inject_connection_established(peer_id, conn, endpoint, failed_addresses)
self.behaviour.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
)
}
fn inject_connection_closed(
@@ -1394,24 +1400,23 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
self.behaviour.inject_connection_closed(peer_id, conn, endpoint, handler)
}
fn inject_connected(&mut self, peer_id: &PeerId) {
self.behaviour.inject_connected(peer_id)
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.behaviour.inject_disconnected(peer_id)
self.behaviour.inject_connection_closed(
peer_id,
conn,
endpoint,
handler,
remaining_established,
)
}
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
self.behaviour.inject_event(peer_id, connection, event)
}
@@ -1420,7 +1425,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
&mut self,
cx: &mut std::task::Context,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
}
@@ -1581,10 +1586,8 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let event = match self.behaviour.poll(cx, params) {
Poll::Pending => return Poll::Pending,
Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev,
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }),
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, handler }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, handler }),
Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) =>
return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }),
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
@@ -1800,7 +1803,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ProtocolsHandler,
handler: Self::ConnectionHandler,
error: &libp2p::swarm::DialError,
) {
self.behaviour.inject_dial_failure(peer_id, handler, error);
@@ -26,8 +26,8 @@ use futures::prelude::*;
use libp2p::{
core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId},
swarm::{
DialError, DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour,
NetworkBehaviourAction, NotifyHandler, PollParameters,
DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters,
},
};
use log::{error, trace, warn};
@@ -620,10 +620,8 @@ impl Notifications {
set_id,
);
trace!(target: "sub-libp2p", "Libp2p <= Dial {}", entry.key().0);
// The `DialPeerCondition` ensures that dial attempts are de-duplicated
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: entry.key().0.clone(),
condition: DialPeerCondition::Disconnected,
self.events.push_back(NetworkBehaviourAction::Dial {
opts: entry.key().0.clone().into(),
handler,
});
entry.insert(PeerState::Requested);
@@ -657,10 +655,8 @@ impl Notifications {
set_id,
);
trace!(target: "sub-libp2p", "Libp2p <= Dial {:?}", occ_entry.key());
// The `DialPeerCondition` ensures that dial attempts are de-duplicated
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: occ_entry.key().0.clone(),
condition: DialPeerCondition::Disconnected,
self.events.push_back(NetworkBehaviourAction::Dial {
opts: occ_entry.key().0.clone().into(),
handler,
});
*occ_entry.into_mut() = PeerState::Requested;
@@ -1059,10 +1055,10 @@ impl Notifications {
}
impl NetworkBehaviour for Notifications {
type ProtocolsHandler = NotifsHandlerProto;
type ConnectionHandler = NotifsHandlerProto;
type OutEvent = NotificationsOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
NotifsHandlerProto::new(self.notif_protocols.clone())
}
@@ -1070,14 +1066,13 @@ impl NetworkBehaviour for Notifications {
Vec::new()
}
fn inject_connected(&mut self, _: &PeerId) {}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
_other_established: usize,
) {
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
match self.peers.entry((*peer_id, set_id)).or_insert(PeerState::Poisoned) {
@@ -1136,7 +1131,8 @@ impl NetworkBehaviour for Notifications {
peer_id: &PeerId,
conn: &ConnectionId,
_endpoint: &ConnectedPoint,
_handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
_handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
_remaining_established: usize,
) {
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
@@ -1394,12 +1390,10 @@ impl NetworkBehaviour for Notifications {
}
}
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_: Self::ProtocolsHandler,
_: Self::ConnectionHandler,
error: &DialError,
) {
if let DialError::Transport(errors) = error {
@@ -1989,7 +1983,7 @@ impl NetworkBehaviour for Notifications {
&mut self,
cx: &mut Context,
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event)
}
@@ -2038,12 +2032,8 @@ impl NetworkBehaviour for Notifications {
PeerState::PendingRequest { timer, .. } if *timer == delay_id => {
trace!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired", peer_id);
// The `DialPeerCondition` ensures that dial attempts are de-duplicated
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler,
});
self.events
.push_back(NetworkBehaviourAction::Dial { opts: peer_id.into(), handler });
*peer_state = PeerState::Requested;
},
@@ -16,10 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Implementations of the `IntoProtocolsHandler` and `ProtocolsHandler` traits for both incoming
//! Implementations of the `IntoConnectionHandler` and `ConnectionHandler` traits for both incoming
//! and outgoing substreams for all gossiping protocols.
//!
//! This is the main implementation of `ProtocolsHandler` in this crate, that handles all the
//! This is the main implementation of `ConnectionHandler` in this crate, that handles all the
//! gossiping protocols that are Substrate-related and outside of the scope of libp2p.
//!
//! # Usage
@@ -74,8 +74,8 @@ use libp2p::{
ConnectedPoint, PeerId,
},
swarm::{
IntoProtocolsHandler, KeepAlive, NegotiatedSubstream, ProtocolsHandler,
ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler,
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
},
};
use log::error;
@@ -107,7 +107,7 @@ const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
/// open substreams.
const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
/// Implements the `IntoProtocolsHandler` trait of libp2p.
/// Implements the `IntoConnectionHandler` trait of libp2p.
///
/// Every time a connection with a remote starts, an instance of this struct is created and
/// sent to a background task dedicated to this connection. Once the connection is established,
@@ -138,7 +138,7 @@ pub struct NotifsHandler {
/// Events to return in priority from `poll`.
events_queue: VecDeque<
ProtocolsHandlerEvent<NotificationsOut, usize, NotifsHandlerOut, NotifsHandlerError>,
ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut, NotifsHandlerError>,
>,
}
@@ -225,7 +225,7 @@ enum State {
},
}
impl IntoProtocolsHandler for NotifsHandlerProto {
impl IntoConnectionHandler for NotifsHandlerProto {
type Handler = NotifsHandler;
fn inbound_protocol(&self) -> UpgradeCollec<NotificationsIn> {
@@ -475,7 +475,7 @@ impl NotifsHandlerProto {
}
}
impl ProtocolsHandler for NotifsHandler {
impl ConnectionHandler for NotifsHandler {
type InEvent = NotifsHandlerIn;
type OutEvent = NotifsHandlerOut;
type Error = NotifsHandlerError;
@@ -505,7 +505,7 @@ impl ProtocolsHandler for NotifsHandler {
let mut protocol_info = &mut self.protocols[protocol_index];
match protocol_info.state {
State::Closed { pending_opening } => {
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
self.events_queue.push_back(ConnectionHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote { protocol_index },
));
@@ -573,7 +573,7 @@ impl ProtocolsHandler for NotifsHandler {
in_substream: in_substream.take(),
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
self.events_queue.push_back(ConnectionHandlerEvent::Custom(
NotifsHandlerOut::OpenResultOk {
protocol_index,
negotiated_fallback: new_open.negotiated_fallback,
@@ -601,7 +601,7 @@ impl ProtocolsHandler for NotifsHandler {
);
self.events_queue.push_back(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, protocol_index)
.with_timeout(OPEN_TIMEOUT),
},
@@ -622,7 +622,7 @@ impl ProtocolsHandler for NotifsHandler {
);
self.events_queue.push_back(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, protocol_index)
.with_timeout(OPEN_TIMEOUT),
},
@@ -660,7 +660,7 @@ impl ProtocolsHandler for NotifsHandler {
self.protocols[protocol_index].state =
State::Closed { pending_opening: true };
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
self.events_queue.push_back(ConnectionHandlerEvent::Custom(
NotifsHandlerOut::OpenResultErr { protocol_index },
));
},
@@ -670,7 +670,7 @@ impl ProtocolsHandler for NotifsHandler {
State::Closed { .. } => {},
}
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
self.events_queue.push_back(ConnectionHandlerEvent::Custom(
NotifsHandlerOut::CloseResult { protocol_index },
));
},
@@ -680,7 +680,7 @@ impl ProtocolsHandler for NotifsHandler {
fn inject_dial_upgrade_error(
&mut self,
num: usize,
_: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>,
_: ConnectionHandlerUpgrErr<NotificationsHandshakeError>,
) {
match self.protocols[num].state {
State::Closed { ref mut pending_opening } |
@@ -692,7 +692,7 @@ impl ProtocolsHandler for NotifsHandler {
State::Opening { .. } => {
self.protocols[num].state = State::Closed { pending_opening: false };
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
self.events_queue.push_back(ConnectionHandlerEvent::Custom(
NotifsHandlerOut::OpenResultErr { protocol_index: num },
));
},
@@ -717,7 +717,7 @@ impl ProtocolsHandler for NotifsHandler {
&mut self,
cx: &mut Context,
) -> Poll<
ProtocolsHandlerEvent<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
@@ -741,7 +741,7 @@ impl ProtocolsHandler for NotifsHandler {
// a substream is ready to send if there isn't actually something to send.
match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) =>
return Poll::Ready(ProtocolsHandlerEvent::Close(
return Poll::Ready(ConnectionHandlerEvent::Close(
NotifsHandlerError::SyncNotificationsClogged,
)),
Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
@@ -789,7 +789,7 @@ impl ProtocolsHandler for NotifsHandler {
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event))
return Poll::Ready(ConnectionHandlerEvent::Custom(event))
},
};
},
@@ -815,7 +815,7 @@ impl ProtocolsHandler for NotifsHandler {
Poll::Pending => {},
Poll::Ready(Some(Ok(message))) => {
let event = NotifsHandlerOut::Notification { protocol_index, message };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event))
return Poll::Ready(ConnectionHandlerEvent::Custom(event))
},
Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
},
@@ -827,7 +827,7 @@ impl ProtocolsHandler for NotifsHandler {
Poll::Ready(Err(_)) => {
self.protocols[protocol_index].state =
State::Closed { pending_opening: *pending_opening };
return Poll::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(ConnectionHandlerEvent::Custom(
NotifsHandlerOut::CloseDesired { protocol_index },
))
},
@@ -29,8 +29,8 @@ use libp2p::{
},
identity, noise,
swarm::{
DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
ProtocolsHandler, Swarm, SwarmEvent,
ConnectionHandler, DialError, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, Swarm, SwarmEvent,
},
yamux, Multiaddr, PeerId, Transport,
};
@@ -133,10 +133,10 @@ impl std::ops::DerefMut for CustomProtoWithAddr {
}
impl NetworkBehaviour for CustomProtoWithAddr {
type ProtocolsHandler = <Notifications as NetworkBehaviour>::ProtocolsHandler;
type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
type OutEvent = <Notifications as NetworkBehaviour>::OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
self.inner.new_handler()
}
@@ -150,23 +150,21 @@ impl NetworkBehaviour for CustomProtoWithAddr {
list
}
fn inject_connected(&mut self, peer_id: &PeerId) {
self.inner.inject_connected(peer_id)
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.inner.inject_disconnected(peer_id)
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.inner
.inject_connection_established(peer_id, conn, endpoint, failed_addresses)
self.inner.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
)
}
fn inject_connection_closed(
@@ -174,16 +172,18 @@ impl NetworkBehaviour for CustomProtoWithAddr {
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
self.inner.inject_connection_closed(peer_id, conn, endpoint, handler)
self.inner
.inject_connection_closed(peer_id, conn, endpoint, handler, remaining_established)
}
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
self.inner.inject_event(peer_id, connection, event)
}
@@ -192,14 +192,14 @@ impl NetworkBehaviour for CustomProtoWithAddr {
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
self.inner.poll(cx, params)
}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ProtocolsHandler,
handler: Self::ConnectionHandler,
error: &DialError,
) {
self.inner.inject_dial_failure(peer_id, handler, error)
@@ -49,8 +49,8 @@ use libp2p::{
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
},
swarm::{
protocols_handler::multi::MultiHandler, IntoProtocolsHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, ProtocolsHandler,
handler::multi::MultiHandler, ConnectionHandler, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters,
},
};
use std::{
@@ -381,7 +381,7 @@ impl RequestResponsesBehaviour {
&mut self,
protocol: String,
handler: RequestResponseHandler<GenericCodec>,
) -> <RequestResponsesBehaviour as NetworkBehaviour>::ProtocolsHandler {
) -> <RequestResponsesBehaviour as NetworkBehaviour>::ConnectionHandler {
let mut handlers: HashMap<_, _> = self
.protocols
.iter_mut()
@@ -400,11 +400,13 @@ impl RequestResponsesBehaviour {
}
impl NetworkBehaviour for RequestResponsesBehaviour {
type ProtocolsHandler =
MultiHandler<String, <RequestResponse<GenericCodec> as NetworkBehaviour>::ProtocolsHandler>;
type ConnectionHandler = MultiHandler<
String,
<RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
>;
type OutEvent = Event;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
let iter = self
.protocols
.iter_mut()
@@ -426,6 +428,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_connection_established(
@@ -434,32 +437,35 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
conn,
endpoint,
failed_addresses,
other_established,
)
}
}
fn inject_connected(&mut self, peer_id: &PeerId) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_connected(p, peer_id)
}
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
_handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
for (p, _) in self.protocols.values_mut() {
let handler = p.new_handler();
NetworkBehaviour::inject_connection_closed(p, peer_id, conn, endpoint, handler);
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_disconnected(p, peer_id)
for (p_name, event) in handler.into_iter() {
if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
proto.inject_connection_closed(
peer_id,
conn,
endpoint,
event,
remaining_established,
)
} else {
log::error!(
target: "sub-libp2p",
"inject_connection_closed: no request-response instance registered for protocol {:?}",
p_name,
)
}
}
}
@@ -467,7 +473,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
&mut self,
peer_id: PeerId,
connection: ConnectionId,
(p_name, event): <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
(p_name, event): <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
return proto.inject_event(peer_id, connection, event)
@@ -499,7 +505,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_: Self::ProtocolsHandler,
_: Self::ConnectionHandler,
error: &libp2p::swarm::DialError,
) {
for (p, _) in self.protocols.values_mut() {
@@ -536,7 +542,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
'poll_all: loop {
if let Some(message_request) = self.message_request.take() {
// Now we can can poll `MessageRequest` until we get the reputation
@@ -677,25 +683,15 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// Other events generated by the underlying behaviour are transparently
// passed through.
NetworkBehaviourAction::DialAddress { address, handler } => {
log::error!(
"The request-response isn't supposed to start dialing peers"
);
NetworkBehaviourAction::Dial { opts, handler } => {
if opts.get_peer_id().is_none() {
log::error!(
"The request-response isn't supposed to start dialing addresses"
);
}
let protocol = protocol.to_string();
let handler = self.new_handler_with_replacement(protocol, handler);
return Poll::Ready(NetworkBehaviourAction::DialAddress {
address,
handler,
})
},
NetworkBehaviourAction::DialPeer { peer_id, condition, handler } => {
let protocol = protocol.to_string();
let handler = self.new_handler_with_replacement(protocol, handler);
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
handler,
})
return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler })
},
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
@@ -1146,7 +1142,7 @@ mod tests {
// this test, so they wouldn't connect to each other.
{
let dial_addr = swarms[1].1.clone();
Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap();
Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
}
let (mut swarm, _, peerset) = swarms.remove(0);
@@ -1246,7 +1242,7 @@ mod tests {
// this test, so they wouldn't connect to each other.
{
let dial_addr = swarms[1].1.clone();
Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap();
Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
}
// Running `swarm[0]` in the background until a `InboundRequest` event happens,
@@ -1375,7 +1371,7 @@ mod tests {
// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
// so they wouldn't connect to each other.
swarm_1.dial_addr(listen_add_2).unwrap();
swarm_1.dial(listen_add_2).unwrap();
// Run swarm 2 in the background, receiving two requests.
pool.spawner()
+22 -26
View File
@@ -49,16 +49,12 @@ use crate::{
use codec::Encode as _;
use futures::{channel::oneshot, prelude::*};
use libp2p::{
core::{
connection::{ConnectionError, ConnectionLimits, PendingConnectionError},
either::EitherError,
upgrade, ConnectedPoint, Executor,
},
core::{either::EitherError, upgrade, ConnectedPoint, Executor},
multiaddr,
ping::Failure as PingFailure,
swarm::{
protocols_handler::NodeHandlerWrapperError, AddressScore, DialError, NetworkBehaviour,
SwarmBuilder, SwarmEvent,
AddressScore, ConnectionError, ConnectionLimits, DialError, NetworkBehaviour,
PendingConnectionError, SwarmBuilder, SwarmEvent,
},
Multiaddr, PeerId,
};
@@ -1531,7 +1527,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
ServiceToWorkerMsg::PropagateTransactions =>
this.tx_handler_controller.propagate_transactions(),
ServiceToWorkerMsg::GetValue(key) =>
this.network_service.behaviour_mut().get_value(&key),
this.network_service.behaviour_mut().get_value(key),
ServiceToWorkerMsg::PutValue(key, value) =>
this.network_service.behaviour_mut().put_value(key, value),
ServiceToWorkerMsg::SetReservedOnly(reserved_only) => this
@@ -1897,21 +1893,18 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
};
let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(
EitherError::A(EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout),
)))),
))) => "ping-timeout",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(
EitherError::A(EitherError::A(EitherError::A(EitherError::A(
Some(ConnectionError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::B(EitherError::A(
PingFailure::Timeout,
))),
)))) => "ping-timeout",
Some(ConnectionError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged,
)))),
))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) =>
"protocol-error",
Some(ConnectionError::Handler(
NodeHandlerWrapperError::KeepAliveTimeout,
)) => "keep-alive-timeout",
)),
)))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(_)) => "protocol-error",
Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
None => "actively-closed",
};
metrics
@@ -1946,10 +1939,12 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
);
if this.boot_node_ids.contains(&peer_id) {
if let DialError::InvalidPeerId = error {
if let DialError::WrongPeerId { obtained, endpoint } = &error {
error!(
"💔 The bootnode you want to connect provided a different peer ID than the one you expect: `{}`.",
"💔 The bootnode you want to connect provided a different peer ID than the one you expect: `{}` with `{}`:`{:?}`.",
peer_id,
obtained,
endpoint,
);
}
}
@@ -1958,13 +1953,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
if let Some(metrics) = this.metrics.as_ref() {
let reason = match error {
DialError::ConnectionLimit(_) => Some("limit-reached"),
DialError::InvalidPeerId => Some("invalid-peer-id"),
DialError::InvalidPeerId(_) => Some("invalid-peer-id"),
DialError::Transport(_) | DialError::ConnectionIo(_) =>
Some("transport-error"),
DialError::Banned |
DialError::LocalPeerId |
DialError::NoAddresses |
DialError::DialPeerConditionFalse(_) |
DialError::WrongPeerId { .. } |
DialError::Aborted => None, // ignore them
};
if let Some(reason) = reason {
@@ -1998,7 +1994,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
if let Some(metrics) = this.metrics.as_ref() {
let reason = match error {
PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"),
PendingConnectionError::InvalidPeerId => Some("invalid-peer-id"),
PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"),
PendingConnectionError::Transport(_) |
PendingConnectionError::IO(_) => Some("transport-error"),
PendingConnectionError::Aborted => None, // ignore it