From 818c05bbc26777cfa8b6639647bf300051d0e5ec Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Wed, 12 Feb 2020 11:24:25 +0100 Subject: [PATCH] Add command-line flag to enable yamux flow control. (#4892) * Add command-line flag to enable yamux flow control. We never enabled proper flow-control for yamux streams which may cause stream buffers to exceed their configured limit when the stream producer outpaces the stream consumer. By switching the window update mode to on-read, producers will only receive more sending credit when all data has been consumed from the stream buffer. Using this option creates backpressure on producers. However depending on the protocol there is a risk of deadlock, if both endpoints concurrently attempt to send more data than they have credit for and neither side reads before finishing their writes. To facilitate proper testing, this PR adds a command-line flag `use-yamux-flow-control`. * Replace comment with generic message. --- substrate/Cargo.lock | 1 + substrate/client/cli/src/lib.rs | 1 + substrate/client/cli/src/params.rs | 4 ++++ substrate/client/network/Cargo.toml | 1 + substrate/client/network/src/config.rs | 3 +++ substrate/client/network/src/service.rs | 10 +++++----- substrate/client/network/src/transport.rs | 18 +++++++++++++++--- substrate/client/service/test/src/lib.rs | 1 + substrate/utils/browser/src/lib.rs | 1 + 9 files changed, 32 insertions(+), 8 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 0f214fe87a..8ac3012df7 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6078,6 +6078,7 @@ dependencies = [ "unsigned-varint", "void", "wasm-timer", + "yamux", "zeroize 1.1.0", ] diff --git a/substrate/client/cli/src/lib.rs b/substrate/client/cli/src/lib.rs index a2e9fa96da..7495ad8e75 100644 --- a/substrate/client/cli/src/lib.rs +++ b/substrate/client/cli/src/lib.rs @@ -424,6 +424,7 @@ fn fill_network_configuration( enable_mdns: !is_dev && !cli.no_mdns, allow_private_ipv4: !cli.no_private_ipv4, wasm_external_transport: None, + use_yamux_flow_control: cli.use_yamux_flow_control }; config.max_parallel_downloads = cli.max_parallel_downloads; diff --git a/substrate/client/cli/src/params.rs b/substrate/client/cli/src/params.rs index 0881247b07..2ffa8bd61b 100644 --- a/substrate/client/cli/src/params.rs +++ b/substrate/client/cli/src/params.rs @@ -238,6 +238,10 @@ pub struct NetworkConfigurationParams { #[allow(missing_docs)] #[structopt(flatten)] pub node_key_params: NodeKeyParams, + + /// Experimental feature flag. + #[structopt(long = "use-yamux-flow-control")] + pub use_yamux_flow_control: bool, } arg_enum! { diff --git a/substrate/client/network/Cargo.toml b/substrate/client/network/Cargo.toml index 10d5f1ce9b..db3e32393d 100644 --- a/substrate/client/network/Cargo.toml +++ b/substrate/client/network/Cargo.toml @@ -48,6 +48,7 @@ substrate-test-runtime-client = { version = "2.0.0", optional = true, path = ".. unsigned-varint = { version = "0.3.0", features = ["codec"] } void = "1.0.2" zeroize = "1.0.0" +yamux = "0.4.2" [dev-dependencies] env_logger = "0.7.0" diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index 6cf2587fe4..87c77fee9f 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -292,6 +292,7 @@ impl Default for NetworkConfiguration { enable_mdns: false, allow_private_ipv4: true, wasm_external_transport: None, + use_yamux_flow_control: false, }, max_parallel_downloads: 5, } @@ -348,6 +349,8 @@ pub enum TransportConfig { /// This parameter exists whatever the target platform is, but it is expected to be set to /// `Some` only when compiling for WASM. wasm_external_transport: Option, + /// Use flow control for yamux streams if set to true. + use_yamux_flow_control: bool, }, /// Only allow connections within the same process. diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index b4281112f6..3dc8a49764 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -235,12 +235,12 @@ impl, H: ExHashT> NetworkWorker u64::from(params.network_config.out_peers) + 15, )); let (transport, bandwidth) = { - let (config_mem, config_wasm) = match params.network_config.transport { - TransportConfig::MemoryOnly => (true, None), - TransportConfig::Normal { wasm_external_transport, .. } => - (false, wasm_external_transport) + let (config_mem, config_wasm, flowctrl) = match params.network_config.transport { + TransportConfig::MemoryOnly => (true, None, false), + TransportConfig::Normal { wasm_external_transport, use_yamux_flow_control, .. } => + (false, wasm_external_transport, use_yamux_flow_control) }; - transport::build_transport(local_identity, config_mem, config_wasm) + transport::build_transport(local_identity, config_mem, config_wasm, flowctrl) }; let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone()); if let Some(spawner) = params.executor { diff --git a/substrate/client/network/src/transport.rs b/substrate/client/network/src/transport.rs index 6b5c18cf33..e2c95824f8 100644 --- a/substrate/client/network/src/transport.rs +++ b/substrate/client/network/src/transport.rs @@ -17,7 +17,7 @@ use futures::prelude::*; use libp2p::{ InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport, - mplex, identity, yamux, bandwidth, wasm_ext + mplex, identity, bandwidth, wasm_ext }; #[cfg(not(target_os = "unknown"))] use libp2p::{tcp, dns, websocket, noise}; @@ -36,7 +36,8 @@ pub use self::bandwidth::BandwidthSinks; pub fn build_transport( keypair: identity::Keypair, memory_only: bool, - wasm_external_transport: Option + wasm_external_transport: Option, + use_yamux_flow_control: bool ) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc) { // Build configuration objects for encryption mechanisms. #[cfg(not(target_os = "unknown"))] @@ -55,7 +56,18 @@ pub fn build_transport( let mut mplex_config = mplex::MplexConfig::new(); mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block); mplex_config.max_buffer_len(usize::MAX); - let yamux_config = yamux::Config::default(); + + let yamux_config = { + let mut c = yamux::Config::default(); + // Only set SYN flag on first data frame sent to the remote. + c.set_lazy_open(true); + if use_yamux_flow_control { + // Enable proper flow-control: window updates are only sent when + // buffered data has been consumed. + c.set_window_update_mode(yamux::WindowUpdateMode::OnRead); + } + libp2p::yamux::Config::new(c) + }; // Build the base layer of the transport. let transport = if let Some(t) = wasm_external_transport { diff --git a/substrate/client/service/test/src/lib.rs b/substrate/client/service/test/src/lib.rs index 2976e66a29..723c13ec82 100644 --- a/substrate/client/service/test/src/lib.rs +++ b/substrate/client/service/test/src/lib.rs @@ -166,6 +166,7 @@ fn node_config ( enable_mdns: false, allow_private_ipv4: true, wasm_external_transport: None, + use_yamux_flow_control: true, }, max_parallel_downloads: NetworkConfiguration::default().max_parallel_downloads, }; diff --git a/substrate/utils/browser/src/lib.rs b/substrate/utils/browser/src/lib.rs index b054d73ac9..d82f982e14 100644 --- a/substrate/utils/browser/src/lib.rs +++ b/substrate/utils/browser/src/lib.rs @@ -53,6 +53,7 @@ where wasm_external_transport: Some(transport.clone()), allow_private_ipv4: true, enable_mdns: false, + use_yamux_flow_control: true, }; config.task_executor = Some(Arc::new(move |fut| { wasm_bindgen_futures::spawn_local(fut)