diff --git a/substrate/client/network/src/transport.rs b/substrate/client/network/src/transport.rs index c9226a10a3..626f84b6b5 100644 --- a/substrate/client/network/src/transport.rs +++ b/substrate/client/network/src/transport.rs @@ -16,11 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use futures::prelude::*; use libp2p::{ InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport, core::{ - self, either::{EitherError, EitherOutput}, muxing::StreamMuxerBox, + self, either::EitherOutput, muxing::StreamMuxerBox, transport::{boxed::Boxed, OptionalTransport}, upgrade }, mplex, identity, bandwidth, wasm_ext, noise @@ -44,46 +43,6 @@ pub fn build_transport( wasm_external_transport: Option, use_yamux_flow_control: bool ) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc) { - // Legacy noise configurations for backward compatibility. - let mut noise_legacy = noise::LegacyConfig::default(); - noise_legacy.send_legacy_handshake = true; - noise_legacy.recv_legacy_handshake = true; - - // Build configuration objects for encryption mechanisms. - let noise_config = { - // For more information about these two panics, see in "On the Importance of - // Checking Cryptographic Protocols for Faults" by Dan Boneh, Richard A. DeMillo, - // and Richard J. Lipton. - let noise_keypair_legacy = noise::Keypair::::new().into_authentic(&keypair) - .expect("can only fail in case of a hardware bug; since this signing is performed only \ - once and at initialization, we're taking the bet that the inconvenience of a very \ - rare panic here is basically zero"); - let noise_keypair_spec = noise::Keypair::::new().into_authentic(&keypair) - .expect("can only fail in case of a hardware bug; since this signing is performed only \ - once and at initialization, we're taking the bet that the inconvenience of a very \ - rare panic here is basically zero"); - - let mut xx_config = noise::NoiseConfig::xx(noise_keypair_spec); - xx_config.set_legacy_config(noise_legacy.clone()); - let mut ix_config = noise::NoiseConfig::ix(noise_keypair_legacy); - ix_config.set_legacy_config(noise_legacy); - - core::upgrade::SelectUpgrade::new(xx_config, ix_config) - }; - - // Build configuration objects for multiplexing mechanisms. - let mut mplex_config = mplex::MplexConfig::new(); - mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block); - mplex_config.max_buffer_len(usize::MAX); - - let mut yamux_config = libp2p::yamux::Config::default(); - - if use_yamux_flow_control { - // Enable proper flow-control: window updates are only sent when - // buffered data has been consumed. - yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::OnRead); - } - // Build the base layer of the transport. let transport = if let Some(t) = wasm_external_transport { OptionalTransport::some(t) @@ -112,45 +71,63 @@ pub fn build_transport( let (transport, bandwidth) = bandwidth::BandwidthLogging::new(transport); - // Encryption - let transport = transport.and_then(move |stream, endpoint| { - core::upgrade::apply(stream, noise_config, endpoint, upgrade::Version::V1) - .map_err(|err| - err.map_err(|err| match err { - EitherError::A(err) => err, - EitherError::B(err) => err, - }) - ) - .and_then(|result| async move { - let remote_key = match &result { - EitherOutput::First((noise::RemoteIdentity::IdentityKey(key), _)) => key.clone(), - EitherOutput::Second((noise::RemoteIdentity::IdentityKey(key), _)) => key.clone(), - _ => return Err(upgrade::UpgradeError::Apply(noise::NoiseError::InvalidKey)) - }; - let out = match result { - EitherOutput::First((_, o)) => o, - EitherOutput::Second((_, o)) => o, - }; - Ok((out, remote_key.into_peer_id())) - }) - }); + let authentication_config = { + // For more information about these two panics, see in "On the Importance of + // Checking Cryptographic Protocols for Faults" by Dan Boneh, Richard A. DeMillo, + // and Richard J. Lipton. + let noise_keypair_legacy = noise::Keypair::::new().into_authentic(&keypair) + .expect("can only fail in case of a hardware bug; since this signing is performed only \ + once and at initialization, we're taking the bet that the inconvenience of a very \ + rare panic here is basically zero"); + let noise_keypair_spec = noise::Keypair::::new().into_authentic(&keypair) + .expect("can only fail in case of a hardware bug; since this signing is performed only \ + once and at initialization, we're taking the bet that the inconvenience of a very \ + rare panic here is basically zero"); - // Multiplexing - let transport = transport.and_then(move |(stream, peer_id), endpoint| { - let peer_id2 = peer_id.clone(); - let upgrade = core::upgrade::SelectUpgrade::new(yamux_config, mplex_config) - .map_inbound(move |muxer| (peer_id, muxer)) - .map_outbound(move |muxer| (peer_id2, muxer)); + // Legacy noise configurations for backward compatibility. + let mut noise_legacy = noise::LegacyConfig::default(); + noise_legacy.send_legacy_handshake = true; + noise_legacy.recv_legacy_handshake = true; - core::upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1) - .map_ok(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer))) - }); + let mut xx_config = noise::NoiseConfig::xx(noise_keypair_spec); + xx_config.set_legacy_config(noise_legacy.clone()); + let mut ix_config = noise::NoiseConfig::ix(noise_keypair_legacy); + ix_config.set_legacy_config(noise_legacy); - let transport = transport - .timeout(Duration::from_secs(20)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) - .boxed(); + let extract_peer_id = |result| match result { + EitherOutput::First((peer_id, o)) => (peer_id, EitherOutput::First(o)), + EitherOutput::Second((peer_id, o)) => (peer_id, EitherOutput::Second(o)), + }; + + core::upgrade::SelectUpgrade::new(xx_config.into_authenticated(), ix_config.into_authenticated()) + .map_inbound(extract_peer_id) + .map_outbound(extract_peer_id) + }; + + let multiplexing_config = { + let mut mplex_config = mplex::MplexConfig::new(); + mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block); + mplex_config.max_buffer_len(usize::MAX); + + let mut yamux_config = libp2p::yamux::Config::default(); + + if use_yamux_flow_control { + // Enable proper flow-control: window updates are only sent when + // buffered data has been consumed. + yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::OnRead); + } + + core::upgrade::SelectUpgrade::new(yamux_config, mplex_config) + .map_inbound(move |muxer| core::muxing::StreamMuxerBox::new(muxer)) + .map_outbound(move |muxer| core::muxing::StreamMuxerBox::new(muxer)) + }; + + let transport = transport.upgrade(upgrade::Version::V1) + .authenticate(authentication_config) + .multiplex(multiplexing_config) + .timeout(Duration::from_secs(20)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .boxed(); (transport, bandwidth) } -