Upgrade to libp2p-0.31. (#7606)

* Upgrade to libp2p-0.31.

* Address line width.

* Add generous incoming connection limit.

* Remove old noise configuration.
This commit is contained in:
Roman Borschel
2020-11-27 15:29:18 +01:00
committed by GitHub
parent 4f97481da7
commit b4ee48ee18
23 changed files with 167 additions and 127 deletions
+4 -4
View File
@@ -736,8 +736,8 @@ impl NetworkBehaviour for DiscoveryBehaviour {
handler,
event: (pid.clone(), event)
}),
NetworkBehaviourAction::ReportObservedAddr { address } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }),
}
}
}
@@ -767,8 +767,8 @@ impl NetworkBehaviour for DiscoveryBehaviour {
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
NetworkBehaviourAction::NotifyHandler { event, .. } =>
match event {}, // `event` is an enum with no variant
NetworkBehaviourAction::ReportObservedAddr { address } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }),
}
}
+3
View File
@@ -283,6 +283,9 @@ use sp_runtime::traits::{Block as BlockT, NumberFor};
/// two peers, the per-peer connection limit is not set to 1 but 2.
const MAX_CONNECTIONS_PER_PEER: usize = 2;
/// The maximum number of concurrent established connections that were incoming.
const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000;
/// Minimum Requirements for a Hash within Networking
pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {}
@@ -44,6 +44,7 @@ use libp2p::{
upgrade::{OutboundUpgrade, read_one, write_one}
},
swarm::{
AddressRecord,
NegotiatedSubstream,
NetworkBehaviour,
NetworkBehaviourAction,
@@ -1463,7 +1464,7 @@ mod tests {
impl PollParameters for EmptyPollParams {
type SupportedProtocolsIter = iter::Empty<Vec<u8>>;
type ListenedAddressesIter = iter::Empty<Multiaddr>;
type ExternalAddressesIter = iter::Empty<Multiaddr>;
type ExternalAddressesIter = iter::Empty<AddressRecord>;
fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
iter::empty()
+4 -4
View File
@@ -304,8 +304,8 @@ impl NetworkBehaviour for PeerInfoBehaviour {
handler,
event: EitherOutput::First(event)
}),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }),
}
}
@@ -334,8 +334,8 @@ impl NetworkBehaviour for PeerInfoBehaviour {
handler,
event: EitherOutput::Second(event)
}),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }),
}
}
+2 -2
View File
@@ -1489,8 +1489,8 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }),
};
let outcome = match event {
@@ -398,9 +398,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
event: ((*protocol).to_string(), event),
})
}
NetworkBehaviourAction::ReportObservedAddr { address } => {
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address,
address, score,
})
}
};
+51 -10
View File
@@ -39,15 +39,40 @@ use crate::{
},
on_demand_layer::AlwaysBadChecker,
light_client_handler, block_requests,
protocol::{self, event::Event, NotifsHandlerError, NotificationsSink, Ready, sync::SyncState, PeerInfo, Protocol},
protocol::{
self,
NotifsHandlerError,
NotificationsSink,
PeerInfo,
Protocol,
Ready,
event::Event,
sync::SyncState,
},
transport, ReputationChange,
};
use futures::{channel::oneshot, prelude::*};
use libp2p::{PeerId, multiaddr, Multiaddr};
use libp2p::core::{ConnectedPoint, Executor, connection::{ConnectionError, PendingConnectionError}, either::EitherError};
use libp2p::core::{
ConnectedPoint,
Executor,
connection::{
ConnectionLimits,
ConnectionError,
PendingConnectionError
},
either::EitherError,
upgrade
};
use libp2p::kad::record;
use libp2p::ping::handler::PingFailure;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, protocols_handler::NodeHandlerWrapperError};
use libp2p::swarm::{
AddressScore,
NetworkBehaviour,
SwarmBuilder,
SwarmEvent,
protocols_handler::NodeHandlerWrapperError
};
use log::{error, info, trace, warn};
use metrics::{Metrics, MetricSources, Histogram, HistogramVec};
use parking_lot::Mutex;
@@ -332,7 +357,11 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
transport::build_transport(local_identity, config_mem, config_wasm)
};
let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.peer_connection_limit(crate::MAX_CONNECTIONS_PER_PEER)
.connection_limits(ConnectionLimits::default()
.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
.with_max_established_incoming(Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING))
)
.substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
.connection_event_buffer_size(1024);
if let Some(spawner) = params.executor {
@@ -368,7 +397,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
// Add external addresses.
for addr in &params.network_config.public_addresses {
Swarm::<B, H>::add_external_address(&mut swarm, addr.clone());
Swarm::<B, H>::add_external_address(&mut swarm, addr.clone(), AddressScore::Infinite);
}
let external_addresses = Arc::new(Mutex::new(Vec::new()));
@@ -551,10 +580,17 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
.collect()
};
let peer_id = Swarm::<B, H>::local_peer_id(&swarm).to_base58();
let listened_addresses = Swarm::<B, H>::listeners(&swarm).cloned().collect();
let external_addresses = Swarm::<B, H>::external_addresses(&swarm)
.map(|r| &r.addr)
.cloned()
.collect();
NetworkState {
peer_id: Swarm::<B, H>::local_peer_id(&swarm).to_base58(),
listened_addresses: Swarm::<B, H>::listeners(&swarm).cloned().collect(),
external_addresses: Swarm::<B, H>::external_addresses(&swarm).cloned().collect(),
peer_id,
listened_addresses,
external_addresses,
connected_peers,
not_connected_peers,
peerset: swarm.user_protocol_mut().peerset_debug_info(),
@@ -1660,7 +1696,10 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
// Update the variables shared with the `NetworkService`.
this.num_connected.store(num_connected_peers, Ordering::Relaxed);
{
let external_addresses = Swarm::<B, H>::external_addresses(&this.network_service).cloned().collect();
let external_addresses = Swarm::<B, H>::external_addresses(&this.network_service)
.map(|r| &r.addr)
.cloned()
.collect();
*this.external_addresses.lock() = external_addresses;
}
@@ -1687,7 +1726,9 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
}
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
metrics.pending_connections.set(Swarm::network_info(&this.network_service).num_connections_pending as u64);
metrics.pending_connections.set(
Swarm::network_info(&this.network_service).connection_counters().num_pending() as u64
);
}
Poll::Pending
+6 -20
View File
@@ -17,9 +17,9 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use libp2p::{
InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport,
PeerId, Transport,
core::{
self, either::{EitherOutput, EitherTransport}, muxing::StreamMuxerBox,
self, either::EitherTransport, muxing::StreamMuxerBox,
transport::{Boxed, OptionalTransport}, upgrade
},
mplex, identity, bandwidth, wasm_ext, noise
@@ -74,11 +74,7 @@ pub fn build_transport(
// For more information about these two panics, see in "On the Importance of
// Checking Cryptographic Protocols for Faults" by Dan Boneh, Richard A. DeMillo,
// and Richard J. Lipton.
let noise_keypair_legacy = noise::Keypair::<noise::X25519>::new().into_authentic(&keypair)
.expect("can only fail in case of a hardware bug; since this signing is performed only \
once and at initialization, we're taking the bet that the inconvenience of a very \
rare panic here is basically zero");
let noise_keypair_spec = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&keypair)
let noise_keypair = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&keypair)
.expect("can only fail in case of a hardware bug; since this signing is performed only \
once and at initialization, we're taking the bet that the inconvenience of a very \
rare panic here is basically zero");
@@ -87,19 +83,9 @@ pub fn build_transport(
let mut noise_legacy = noise::LegacyConfig::default();
noise_legacy.recv_legacy_handshake = true;
let mut xx_config = noise::NoiseConfig::xx(noise_keypair_spec);
let mut xx_config = noise::NoiseConfig::xx(noise_keypair);
xx_config.set_legacy_config(noise_legacy.clone());
let mut ix_config = noise::NoiseConfig::ix(noise_keypair_legacy);
ix_config.set_legacy_config(noise_legacy);
let extract_peer_id = |result| match result {
EitherOutput::First((peer_id, o)) => (peer_id, EitherOutput::First(o)),
EitherOutput::Second((peer_id, o)) => (peer_id, EitherOutput::Second(o)),
};
core::upgrade::SelectUpgrade::new(xx_config.into_authenticated(), ix_config.into_authenticated())
.map_inbound(extract_peer_id)
.map_outbound(extract_peer_id)
xx_config.into_authenticated()
};
let multiplexing_config = {
@@ -115,7 +101,7 @@ pub fn build_transport(
core::upgrade::SelectUpgrade::new(yamux_config, mplex_config)
};
let transport = transport.upgrade(upgrade::Version::V1)
let transport = transport.upgrade(upgrade::Version::V1Lazy)
.authenticate(authentication_config)
.multiplex(multiplexing_config)
.timeout(Duration::from_secs(20))