upgrade libp2p to 0.50.0 (#12734)

* upgrade libp2p to 0.50.0

* on_swarm_event and on_connection_handler_event

* replace `Swarm::new` with `Swarm::with_threadpool_executor`

* on_swarm_event and on_connection_handler_event part 2

* on_swarm_event and on_connection_handler_event part 3

* on_swarm_event and on_connection_handler_event part 4

* update libp2p

* libp2p 0.50.0

* rename OutboundQueryCompleted to OutboundQueryProgressed

refs https://github.com/libp2p/rust-libp2p/pull/2712

* remove unused var

* accumulate outbound_query_records until query is finished

* format code

* use p_handler instead of new_handler

https://github.com/paritytech/substrate/pull/12734#discussion_r1027640610

* pass ListenFailure to kademlia

https://github.com/paritytech/substrate/pull/12734#discussion_r1034716664

* use tokio executor in tests

https://github.com/paritytech/substrate/pull/12734#discussion_r1039291776

* use chrono Local::now

instead of deprecated Local::today

* remove unused vars from request_responses tests

* attempt to fix pallet UI tests

* restart CI

* restart CI

* restart CI

* restart CI

* restart CI

* restart CI

* restart CI

* restart CI
This commit is contained in:
Anton
2023-01-05 16:03:41 +04:00
committed by GitHub
parent 428a42752a
commit f2dcd9520c
31 changed files with 3010 additions and 2042 deletions
+1 -1
View File
@@ -29,7 +29,7 @@ use libp2p::{
core::{Multiaddr, PeerId, PublicKey},
identify::Info as IdentifyInfo,
kad::record,
NetworkBehaviour,
swarm::NetworkBehaviour,
};
use sc_network_common::{
+130 -146
View File
@@ -51,23 +51,23 @@ use futures::prelude::*;
use futures_timer::Delay;
use ip_network::IpNetwork;
use libp2p::{
core::{
connection::ConnectionId, transport::ListenerId, ConnectedPoint, Multiaddr, PeerId,
PublicKey,
},
core::{connection::ConnectionId, Multiaddr, PeerId, PublicKey},
kad::{
handler::KademliaHandlerProto,
record::{
self,
store::{MemoryStore, RecordStore},
},
GetClosestPeersError, Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent,
QueryId, QueryResult, Quorum, Record,
GetClosestPeersError, GetRecordOk, Kademlia, KademliaBucketInserts, KademliaConfig,
KademliaEvent, QueryId, QueryResult, Quorum, Record,
},
mdns::{MdnsConfig, MdnsEvent, TokioMdns},
mdns::{self, tokio::Behaviour as TokioMdns},
multiaddr::Protocol,
swarm::{
behaviour::toggle::{Toggle, ToggleIntoConnectionHandler},
behaviour::{
toggle::{Toggle, ToggleIntoConnectionHandler},
DialFailure, FromSwarm, NewExternalAddr,
},
ConnectionHandler, DialError, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters,
},
@@ -78,7 +78,6 @@ use sp_core::hexdisplay::HexDisplay;
use std::{
cmp,
collections::{HashMap, HashSet, VecDeque},
io,
num::NonZeroUsize,
task::{Context, Poll},
time::Duration,
@@ -235,7 +234,7 @@ impl DiscoveryConfig {
allow_private_ipv4,
discovery_only_if_under_num,
mdns: if enable_mdns {
match TokioMdns::new(MdnsConfig::default()) {
match TokioMdns::new(mdns::Config::default()) {
Ok(mdns) => Some(mdns),
Err(err) => {
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
@@ -250,6 +249,7 @@ impl DiscoveryConfig {
NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
.expect("value is a constant; constant is non-zero; qed."),
),
outbound_query_records: Vec::new(),
}
}
}
@@ -287,6 +287,8 @@ pub struct DiscoveryBehaviour {
allow_non_globals_in_dht: bool,
/// A cache of discovered external addresses. Only used for logging purposes.
known_external_addresses: LruHashSet<Multiaddr>,
/// A cache of outbound query records.
outbound_query_records: Vec<(record::Key, Vec<u8>)>,
}
impl DiscoveryBehaviour {
@@ -367,7 +369,7 @@ impl DiscoveryBehaviour {
/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
pub fn get_value(&mut self, key: record::Key) {
if let Some(k) = self.kademlia.as_mut() {
k.get_record(key.clone(), Quorum::One);
k.get_record(key.clone());
}
}
@@ -516,127 +518,84 @@ impl NetworkBehaviour for DiscoveryBehaviour {
list
}
fn inject_address_change(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
old: &ConnectedPoint,
new: &ConnectedPoint,
) {
self.kademlia.inject_address_change(peer_id, connection_id, old, new)
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.num_connections += 1;
self.kademlia.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
)
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
self.num_connections -= 1;
self.kademlia.inject_connection_closed(
peer_id,
conn,
endpoint,
handler,
remaining_established,
)
}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ConnectionHandler,
error: &DialError,
) {
if let Some(peer_id) = peer_id {
if let DialError::Transport(errors) = error {
if let Some(list) = self.ephemeral_addresses.get_mut(&peer_id) {
for (addr, _error) in errors {
list.retain(|a| a != addr);
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(e) => {
self.num_connections += 1;
self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
},
FromSwarm::ConnectionClosed(e) => {
self.num_connections -= 1;
self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
},
FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
if let Some(peer_id) = peer_id {
if let DialError::Transport(errors) = error {
if let Some(list) = self.ephemeral_addresses.get_mut(&peer_id) {
for (addr, _error) in errors {
list.retain(|a| a != addr);
}
}
}
}
}
}
self.kademlia.inject_dial_failure(peer_id, handler, error)
self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
},
FromSwarm::ListenerClosed(e) => {
self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
},
FromSwarm::ListenFailure(e) => {
self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
},
FromSwarm::ListenerError(e) => {
self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
},
FromSwarm::ExpiredExternalAddr(e) => {
// We intentionally don't remove the element from `known_external_addresses` in
// order to not print the log line again.
self.kademlia.on_swarm_event(FromSwarm::ExpiredExternalAddr(e));
},
FromSwarm::NewListener(e) => {
self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
},
FromSwarm::ExpiredListenAddr(e) => {
self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
},
FromSwarm::NewExternalAddr(e @ NewExternalAddr { addr }) => {
let new_addr = addr.clone().with(Protocol::P2p(self.local_peer_id.into()));
if Self::can_add_to_dht(addr) {
// NOTE: we might re-discover the same address multiple times
// in which case we just want to refrain from logging.
if self.known_external_addresses.insert(new_addr.clone()) {
info!(
target: "sub-libp2p",
"🔍 Discovered new external address for our node: {}",
new_addr,
);
}
}
self.kademlia.on_swarm_event(FromSwarm::NewExternalAddr(e));
},
FromSwarm::AddressChange(e) => {
self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
},
FromSwarm::NewListenAddr(e) => {
self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
},
}
}
fn inject_event(
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
connection_id: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as
ConnectionHandler>::OutEvent,
) {
self.kademlia.inject_event(peer_id, connection, event)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
let new_addr = addr.clone().with(Protocol::P2p(self.local_peer_id.into()));
if Self::can_add_to_dht(addr) {
// NOTE: we might re-discover the same address multiple times
// in which case we just want to refrain from logging.
if self.known_external_addresses.insert(new_addr.clone()) {
info!(
target: "sub-libp2p",
"🔍 Discovered new external address for our node: {}",
new_addr,
);
}
}
self.kademlia.inject_new_external_addr(addr)
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
// We intentionally don't remove the element from `known_external_addresses` in order
// to not print the log line again.
self.kademlia.inject_expired_external_addr(addr)
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.kademlia.inject_expired_listen_addr(id, addr)
}
fn inject_new_listener(&mut self, id: ListenerId) {
self.kademlia.inject_new_listener(id)
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.kademlia.inject_new_listen_addr(id, addr)
}
fn inject_listen_failure(&mut self, _: &Multiaddr, _: &Multiaddr, _: Self::ConnectionHandler) {
// NetworkBehaviour::inject_listen_failure on Kademlia<MemoryStore> does nothing.
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
self.kademlia.inject_listener_error(id, err)
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
self.kademlia.inject_listener_closed(id, reason)
self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
}
fn poll(
@@ -705,7 +664,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
KademliaEvent::InboundRequest { .. } => {
// We are not interested in this event at the moment.
},
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(res),
..
} => match res {
@@ -730,24 +689,36 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
},
},
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetRecord(res),
stats,
step,
..
} => {
let ev = match res {
Ok(ok) => {
let results = ok
.records
.into_iter()
.map(|r| (r.record.key, r.record.value))
.collect();
DiscoveryOut::ValueFound(
results,
stats.duration().unwrap_or_default(),
)
},
Ok(ok) =>
if let GetRecordOk::FoundRecord(r) = ok {
self.outbound_query_records
.push((r.record.key, r.record.value));
continue
} else {
debug!(
target: "sub-libp2p",
"Libp2p => Query progressed to {:?} step (last: {:?})",
step.count,
step.last,
);
if step.last {
let records =
self.outbound_query_records.drain(..).collect();
DiscoveryOut::ValueFound(
records,
stats.duration().unwrap_or_default(),
)
} else {
continue
}
},
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(
target: "sub-libp2p",
@@ -773,7 +744,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::PutRecord(res),
stats,
..
@@ -795,7 +766,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::RepublishRecord(res),
..
} => match res {
@@ -811,7 +782,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
),
},
// We never start any other type of query.
KademliaEvent::OutboundQueryCompleted { result: e, .. } => {
KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
},
},
@@ -841,7 +812,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
while let Poll::Ready(ev) = mdns.poll(cx, params) {
match ev {
NetworkBehaviourAction::GenerateEvent(event) => match event {
MdnsEvent::Discovered(list) => {
mdns::Event::Discovered(list) => {
if self.num_connections >= self.discovery_only_if_under_num {
continue
}
@@ -852,7 +823,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
}
},
MdnsEvent::Expired(_) => {},
mdns::Event::Expired(_) => {},
},
NetworkBehaviourAction::Dial { .. } => {
unreachable!("mDNS never dials!");
@@ -907,12 +878,19 @@ mod tests {
},
identity::{ed25519, Keypair},
noise,
swarm::{Swarm, SwarmEvent},
swarm::{Executor, Swarm, SwarmEvent},
yamux, Multiaddr,
};
use sc_network_common::config::ProtocolId;
use sp_core::hash::H256;
use std::{collections::HashSet, task::Poll};
use std::{collections::HashSet, pin::Pin, task::Poll};
struct TokioExecutor(tokio::runtime::Runtime);
impl Executor for TokioExecutor {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.0.spawn(f);
}
}
#[test]
fn discovery_working() {
@@ -949,7 +927,13 @@ mod tests {
config.finish()
};
let mut swarm = Swarm::new(transport, behaviour, keypair.public().to_peer_id());
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut swarm = Swarm::with_executor(
transport,
behaviour,
keypair.public().to_peer_id(),
TokioExecutor(runtime),
);
let listen_addr: Multiaddr =
format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
+137 -158
View File
@@ -19,16 +19,17 @@
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::{
core::{
connection::ConnectionId, either::EitherOutput, transport::ListenerId, ConnectedPoint,
PeerId, PublicKey,
},
core::{connection::ConnectionId, either::EitherOutput, ConnectedPoint, PeerId, PublicKey},
identify::{
Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent,
Info as IdentifyInfo,
},
ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent, Success as PingSuccess},
swarm::{
behaviour::{
AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
ListenFailure,
},
ConnectionHandler, IntoConnectionHandler, IntoConnectionHandlerSelect, NetworkBehaviour,
NetworkBehaviourAction, PollParameters,
},
@@ -39,7 +40,6 @@ use sc_network_common::utils::interval;
use smallvec::SmallVec;
use std::{
collections::hash_map::Entry,
error, io,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
@@ -188,172 +188,151 @@ impl NetworkBehaviour for PeerInfoBehaviour {
list
}
fn inject_address_change(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
old: &ConnectedPoint,
new: &ConnectedPoint,
) {
self.ping.inject_address_change(peer_id, conn, old, new);
self.identify.inject_address_change(peer_id, conn, old, new);
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(
e @ ConnectionEstablished { peer_id, endpoint, .. },
) => {
self.ping.on_swarm_event(FromSwarm::ConnectionEstablished(e));
self.identify.on_swarm_event(FromSwarm::ConnectionEstablished(e));
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
*endpoint = new.clone();
} else {
error!(target: "sub-libp2p",
"Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
}
} else {
error!(target: "sub-libp2p",
"Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
}
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.ping.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
);
self.identify.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
);
match self.nodes_info.entry(*peer_id) {
Entry::Vacant(e) => {
e.insert(NodeInfo::new(endpoint.clone()));
},
Entry::Occupied(e) => {
let e = e.into_mut();
if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false) {
e.client_version = None;
e.latest_ping = None;
match self.nodes_info.entry(peer_id) {
Entry::Vacant(e) => {
e.insert(NodeInfo::new(endpoint.clone()));
},
Entry::Occupied(e) => {
let e = e.into_mut();
if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false)
{
e.client_version = None;
e.latest_ping = None;
}
e.info_expire = None;
e.endpoints.push(endpoint.clone());
},
}
e.info_expire = None;
e.endpoints.push(endpoint.clone());
},
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler,
remaining_established,
}) => {
let (ping_handler, identity_handler) = handler.into_inner();
self.ping.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler: ping_handler,
remaining_established,
}));
self.identify.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler: identity_handler,
remaining_established,
}));
if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
if remaining_established == 0 {
entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
}
entry.endpoints.retain(|ep| ep != endpoint)
} else {
error!(target: "sub-libp2p",
"Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
}
},
FromSwarm::DialFailure(DialFailure { peer_id, handler, error }) => {
let (ping_handler, identity_handler) = handler.into_inner();
self.ping.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler: ping_handler,
error,
}));
self.identify.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler: identity_handler,
error,
}));
},
FromSwarm::ListenerClosed(e) => {
self.ping.on_swarm_event(FromSwarm::ListenerClosed(e));
self.identify.on_swarm_event(FromSwarm::ListenerClosed(e));
},
FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, handler }) => {
let (ping_handler, identity_handler) = handler.into_inner();
self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
handler: ping_handler,
}));
self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
handler: identity_handler,
}));
},
FromSwarm::ListenerError(e) => {
self.ping.on_swarm_event(FromSwarm::ListenerError(e));
self.identify.on_swarm_event(FromSwarm::ListenerError(e));
},
FromSwarm::ExpiredExternalAddr(e) => {
self.ping.on_swarm_event(FromSwarm::ExpiredExternalAddr(e));
self.identify.on_swarm_event(FromSwarm::ExpiredExternalAddr(e));
},
FromSwarm::NewListener(e) => {
self.ping.on_swarm_event(FromSwarm::NewListener(e));
self.identify.on_swarm_event(FromSwarm::NewListener(e));
},
FromSwarm::ExpiredListenAddr(e) => {
self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
},
FromSwarm::NewExternalAddr(e) => {
self.ping.on_swarm_event(FromSwarm::NewExternalAddr(e));
self.identify.on_swarm_event(FromSwarm::NewExternalAddr(e));
},
FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => {
self.ping.on_swarm_event(FromSwarm::AddressChange(e));
self.identify.on_swarm_event(FromSwarm::AddressChange(e));
if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
*endpoint = new.clone();
} else {
error!(target: "sub-libp2p",
"Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
}
} else {
error!(target: "sub-libp2p",
"Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
}
},
FromSwarm::NewListenAddr(e) => {
self.ping.on_swarm_event(FromSwarm::NewListenAddr(e));
self.identify.on_swarm_event(FromSwarm::NewListenAddr(e));
},
}
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
let (ping_handler, identity_handler) = handler.into_inner();
self.identify.inject_connection_closed(
peer_id,
conn,
endpoint,
identity_handler,
remaining_established,
);
self.ping.inject_connection_closed(
peer_id,
conn,
endpoint,
ping_handler,
remaining_established,
);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
if remaining_established == 0 {
entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
}
entry.endpoints.retain(|ep| ep != endpoint)
} else {
error!(target: "sub-libp2p",
"Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
}
}
fn inject_event(
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
connection_id: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as
ConnectionHandler>::OutEvent,
) {
match event {
EitherOutput::First(event) => self.ping.inject_event(peer_id, connection, event),
EitherOutput::Second(event) => self.identify.inject_event(peer_id, connection, event),
EitherOutput::First(event) =>
self.ping.on_connection_handler_event(peer_id, connection_id, event),
EitherOutput::Second(event) =>
self.identify.on_connection_handler_event(peer_id, connection_id, event),
}
}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ConnectionHandler,
error: &libp2p::swarm::DialError,
) {
let (ping_handler, identity_handler) = handler.into_inner();
self.identify.inject_dial_failure(peer_id, identity_handler, error);
self.ping.inject_dial_failure(peer_id, ping_handler, error);
}
fn inject_new_listener(&mut self, id: ListenerId) {
self.ping.inject_new_listener(id);
self.identify.inject_new_listener(id);
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.ping.inject_new_listen_addr(id, addr);
self.identify.inject_new_listen_addr(id, addr);
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.ping.inject_expired_listen_addr(id, addr);
self.identify.inject_expired_listen_addr(id, addr);
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.ping.inject_new_external_addr(addr);
self.identify.inject_new_external_addr(addr);
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.ping.inject_expired_external_addr(addr);
self.identify.inject_expired_external_addr(addr);
}
fn inject_listen_failure(
&mut self,
local_addr: &Multiaddr,
send_back_addr: &Multiaddr,
handler: Self::ConnectionHandler,
) {
let (ping_handler, identity_handler) = handler.into_inner();
self.identify
.inject_listen_failure(local_addr, send_back_addr, identity_handler);
self.ping.inject_listen_failure(local_addr, send_back_addr, ping_handler);
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn error::Error + 'static)) {
self.ping.inject_listener_error(id, err);
self.identify.inject_listener_error(id, err);
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
self.ping.inject_listener_closed(id, reason);
self.identify.inject_listener_closed(id, reason);
}
fn poll(
&mut self,
cx: &mut Context,
+11 -77
View File
@@ -22,10 +22,10 @@ use bytes::Bytes;
use codec::{Decode, DecodeAll, Encode};
use futures::prelude::*;
use libp2p::{
core::{connection::ConnectionId, transport::ListenerId, ConnectedPoint},
core::connection::ConnectionId,
swarm::{
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
behaviour::FromSwarm, ConnectionHandler, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters,
},
Multiaddr, PeerId,
};
@@ -49,7 +49,7 @@ use sp_arithmetic::traits::SaturatedConversion;
use sp_runtime::traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, Zero};
use std::{
collections::{HashMap, HashSet, VecDeque},
io, iter,
iter,
num::NonZeroUsize,
pin::Pin,
sync::Arc,
@@ -969,47 +969,18 @@ where
self.behaviour.addresses_of_peer(peer_id)
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.behaviour.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
)
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.behaviour.on_swarm_event(event);
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
self.behaviour.inject_connection_closed(
peer_id,
conn,
endpoint,
handler,
remaining_established,
)
}
fn inject_event(
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
connection_id: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as
ConnectionHandler>::OutEvent,
) {
self.behaviour.inject_event(peer_id, connection, event)
self.behaviour.on_connection_handler_event(peer_id, connection_id, event);
}
fn poll(
@@ -1245,41 +1216,4 @@ where
cx.waker().wake_by_ref();
Poll::Pending
}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ConnectionHandler,
error: &libp2p::swarm::DialError,
) {
self.behaviour.inject_dial_failure(peer_id, handler, error);
}
fn inject_new_listener(&mut self, id: ListenerId) {
self.behaviour.inject_new_listener(id)
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.behaviour.inject_new_listen_addr(id, addr)
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.behaviour.inject_expired_listen_addr(id, addr)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.behaviour.inject_new_external_addr(addr)
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.behaviour.inject_expired_external_addr(addr)
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
self.behaviour.inject_listener_error(id, err);
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
self.behaviour.inject_listener_closed(id, reason);
}
}
File diff suppressed because it is too large Load Diff
@@ -22,24 +22,28 @@ use crate::protocol::notifications::{Notifications, NotificationsOut, ProtocolCo
use futures::prelude::*;
use libp2p::{
core::{
connection::ConnectionId,
transport::{ListenerId, MemoryTransport},
upgrade, ConnectedPoint,
},
core::{connection::ConnectionId, transport::MemoryTransport, upgrade},
identity, noise,
swarm::{
ConnectionHandler, DialError, IntoConnectionHandler, NetworkBehaviour,
behaviour::FromSwarm, ConnectionHandler, Executor, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, Swarm, SwarmEvent,
},
yamux, Multiaddr, PeerId, Transport,
};
use std::{
error, io, iter,
iter,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
struct TokioExecutor(tokio::runtime::Runtime);
impl Executor for TokioExecutor {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.0.spawn(f);
}
}
/// Builds two nodes that have each other as bootstrap nodes.
/// This is to be used only for testing, and a panic will happen if something goes wrong.
fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
@@ -100,7 +104,13 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
.collect(),
};
let mut swarm = Swarm::new(transport, behaviour, keypairs[index].public().to_peer_id());
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut swarm = Swarm::with_executor(
transport,
behaviour,
keypairs[index].public().to_peer_id(),
TokioExecutor(runtime),
);
swarm.listen_on(addrs[index].clone()).unwrap();
out.push(swarm);
}
@@ -150,42 +160,18 @@ impl NetworkBehaviour for CustomProtoWithAddr {
list
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.inner.inject_connection_established(
peer_id,
conn,
endpoint,
failed_addresses,
other_established,
)
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.inner.on_swarm_event(event);
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
self.inner
.inject_connection_closed(peer_id, conn, endpoint, handler, remaining_established)
}
fn inject_event(
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
connection_id: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as
ConnectionHandler>::OutEvent,
) {
self.inner.inject_event(peer_id, connection, event)
self.inner.on_connection_handler_event(peer_id, connection_id, event);
}
fn poll(
@@ -195,43 +181,6 @@ impl NetworkBehaviour for CustomProtoWithAddr {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
self.inner.poll(cx, params)
}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ConnectionHandler,
error: &DialError,
) {
self.inner.inject_dial_failure(peer_id, handler, error)
}
fn inject_new_listener(&mut self, id: ListenerId) {
self.inner.inject_new_listener(id)
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_new_listen_addr(id, addr)
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_expired_listen_addr(id, addr)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_new_external_addr(addr)
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_expired_external_addr(addr)
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn error::Error + 'static)) {
self.inner.inject_listener_error(id, err);
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
self.inner.inject_listener_closed(id, reason);
}
}
#[test]
+122 -108
View File
@@ -40,14 +40,16 @@ use futures::{
prelude::*,
};
use libp2p::{
core::{connection::ConnectionId, transport::ListenerId, ConnectedPoint, Multiaddr, PeerId},
core::{connection::ConnectionId, Multiaddr, PeerId},
request_response::{
handler::RequestResponseHandler, ProtocolSupport, RequestResponse, RequestResponseCodec,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
},
swarm::{
handler::multi::MultiHandler, ConnectionHandler, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters,
behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure},
handler::multi::MultiHandler,
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
},
};
use sc_network_common::{
@@ -312,120 +314,119 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
Vec::new()
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_connection_established(
p,
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::ConnectionEstablished(e));
},
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
conn,
connection_id,
endpoint,
failed_addresses,
other_established,
)
handler,
remaining_established,
}) =>
for (p_name, p_handler) in handler.into_iter() {
if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
proto.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler: p_handler,
remaining_established,
}));
} else {
log::error!(
target: "sub-libp2p",
"on_swarm_event/connection_closed: no request-response instance registered for protocol {:?}",
p_name,
)
}
},
FromSwarm::DialFailure(DialFailure { peer_id, error, handler }) =>
for (p_name, p_handler) in handler.into_iter() {
if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
proto.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler: p_handler,
error,
}));
} else {
log::error!(
target: "sub-libp2p",
"on_swarm_event/dial_failure: no request-response instance registered for protocol {:?}",
p_name,
)
}
},
FromSwarm::ListenerClosed(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerClosed(e));
},
FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, handler }) =>
for (p_name, p_handler) in handler.into_iter() {
if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
proto.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
handler: p_handler,
}));
} else {
log::error!(
target: "sub-libp2p",
"on_swarm_event/listen_failure: no request-response instance registered for protocol {:?}",
p_name,
)
}
},
FromSwarm::ListenerError(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerError(e));
},
FromSwarm::ExpiredExternalAddr(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::ExpiredExternalAddr(e));
},
FromSwarm::NewListener(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListener(e));
},
FromSwarm::ExpiredListenAddr(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::ExpiredListenAddr(e));
},
FromSwarm::NewExternalAddr(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::NewExternalAddr(e));
},
FromSwarm::AddressChange(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::AddressChange(e));
},
FromSwarm::NewListenAddr(e) =>
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListenAddr(e));
},
}
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
for (p_name, event) in handler.into_iter() {
if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
proto.inject_connection_closed(
peer_id,
conn,
endpoint,
event,
remaining_established,
)
} else {
log::error!(
target: "sub-libp2p",
"inject_connection_closed: no request-response instance registered for protocol {:?}",
p_name,
)
}
}
}
fn inject_event(
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
(p_name, event): <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
connection_id: ConnectionId,
(p_name, event): <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as
ConnectionHandler>::OutEvent,
) {
if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
return proto.inject_event(peer_id, connection, event)
return proto.on_connection_handler_event(peer_id, connection_id, event)
}
log::warn!(target: "sub-libp2p",
"inject_node_event: no request-response instance registered for protocol {:?}",
p_name)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_new_external_addr(p, addr)
}
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_expired_external_addr(p, addr)
}
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_expired_listen_addr(p, id, addr)
}
}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_: Self::ConnectionHandler,
error: &libp2p::swarm::DialError,
) {
for (p, _) in self.protocols.values_mut() {
let handler = p.new_handler();
NetworkBehaviour::inject_dial_failure(p, peer_id, handler, error)
}
}
fn inject_new_listener(&mut self, id: ListenerId) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_new_listener(p, id)
}
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_new_listen_addr(p, id, addr)
}
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_listener_error(p, id, err)
}
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
for (p, _) in self.protocols.values_mut() {
NetworkBehaviour::inject_listener_closed(p, id, reason)
}
log::warn!(
target: "sub-libp2p",
"on_connection_handler_event: no request-response instance registered for protocol {:?}",
p_name
);
}
fn poll(
@@ -919,12 +920,19 @@ mod tests {
},
identity::Keypair,
noise,
swarm::{Swarm, SwarmEvent},
swarm::{Executor, Swarm, SwarmEvent},
Multiaddr,
};
use sc_peerset::{Peerset, PeersetConfig, SetConfig};
use std::{iter, time::Duration};
struct TokioExecutor(tokio::runtime::Runtime);
impl Executor for TokioExecutor {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.0.spawn(f);
}
}
fn build_swarm(
list: impl Iterator<Item = ProtocolConfig>,
) -> (Swarm<RequestResponsesBehaviour>, Multiaddr, Peerset) {
@@ -953,7 +961,13 @@ mod tests {
let behaviour = RequestResponsesBehaviour::new(list, handle).unwrap();
let mut swarm = Swarm::new(transport, behaviour, keypair.public().to_peer_id());
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut swarm = Swarm::with_executor(
transport,
behaviour,
keypair.public().to_peer_id(),
TokioExecutor(runtime),
);
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
swarm.listen_on(listen_addr.clone()).unwrap();
+17 -11
View File
@@ -40,13 +40,13 @@ use crate::{
use futures::{channel::oneshot, prelude::*};
use libp2p::{
core::{either::EitherError, upgrade, ConnectedPoint, Executor},
core::{either::EitherError, upgrade, ConnectedPoint},
identify::Info as IdentifyInfo,
kad::record::Key as KademliaKey,
multiaddr,
ping::Failure as PingFailure,
swarm::{
AddressScore, ConnectionError, ConnectionLimits, DialError, NetworkBehaviour,
AddressScore, ConnectionError, ConnectionLimits, DialError, Executor, NetworkBehaviour,
PendingConnectionError, Swarm, SwarmBuilder, SwarmEvent,
},
Multiaddr, PeerId,
@@ -370,7 +370,21 @@ where
}
};
let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id)
let builder = {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
SwarmBuilder::with_executor(
transport,
behaviour,
local_peer_id,
SpawnImpl(params.executor),
)
};
let builder = builder
.connection_limits(
ConnectionLimits::default()
.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
@@ -383,14 +397,6 @@ where
.connection_event_buffer_size(1024)
.max_negotiating_inbound_streams(2048);
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
builder = builder.executor(Box::new(SpawnImpl(params.executor)));
(builder.build(), bandwidth)
};
+5 -5
View File
@@ -54,17 +54,17 @@ pub fn build_transport(
) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc<BandwidthSinks>) {
// Build the base layer of the transport.
let transport = if !memory_only {
let tcp_config = tcp::GenTcpConfig::new().nodelay(true);
let desktop_trans = tcp::TokioTcpTransport::new(tcp_config.clone());
let tcp_config = tcp::Config::new().nodelay(true);
let desktop_trans = tcp::tokio::Transport::new(tcp_config.clone());
let desktop_trans = websocket::WsConfig::new(desktop_trans)
.or_transport(tcp::TokioTcpTransport::new(tcp_config.clone()));
.or_transport(tcp::tokio::Transport::new(tcp_config.clone()));
let dns_init = dns::TokioDnsConfig::system(desktop_trans);
EitherTransport::Left(if let Ok(dns) = dns_init {
EitherTransport::Left(dns)
} else {
let desktop_trans = tcp::TokioTcpTransport::new(tcp_config.clone());
let desktop_trans = tcp::tokio::Transport::new(tcp_config.clone());
let desktop_trans = websocket::WsConfig::new(desktop_trans)
.or_transport(tcp::TokioTcpTransport::new(tcp_config));
.or_transport(tcp::tokio::Transport::new(tcp_config));
EitherTransport::Right(desktop_trans.map_err(dns::DnsErr::Transport))
})
} else {