Add command-line flag to enable yamux flow control. (#4892)

* Add command-line flag to enable yamux flow control.

We never enabled proper flow-control for yamux streams which may cause
stream buffers to exceed their configured limit when the stream
producer outpaces the stream consumer. By switching the window update
mode to on-read, producers will only receive more sending credit when
all data has been consumed from the stream buffer. Using this option
creates backpressure on producers. However depending on the protocol
there is a risk of deadlock, if both endpoints concurrently attempt to
send more data than they have credit for and neither side reads before
finishing their writes. To facilitate proper testing, this PR adds a
command-line flag `use-yamux-flow-control`.

* Replace comment with generic message.
This commit is contained in:
Toralf Wittner
2020-02-12 11:24:25 +01:00
committed by GitHub
parent ae70b10326
commit 818c05bbc2
9 changed files with 32 additions and 8 deletions
+1
View File
@@ -6078,6 +6078,7 @@ dependencies = [
"unsigned-varint",
"void",
"wasm-timer",
"yamux",
"zeroize 1.1.0",
]
+1
View File
@@ -424,6 +424,7 @@ fn fill_network_configuration(
enable_mdns: !is_dev && !cli.no_mdns,
allow_private_ipv4: !cli.no_private_ipv4,
wasm_external_transport: None,
use_yamux_flow_control: cli.use_yamux_flow_control
};
config.max_parallel_downloads = cli.max_parallel_downloads;
+4
View File
@@ -238,6 +238,10 @@ pub struct NetworkConfigurationParams {
#[allow(missing_docs)]
#[structopt(flatten)]
pub node_key_params: NodeKeyParams,
/// Experimental feature flag.
#[structopt(long = "use-yamux-flow-control")]
pub use_yamux_flow_control: bool,
}
arg_enum! {
+1
View File
@@ -48,6 +48,7 @@ substrate-test-runtime-client = { version = "2.0.0", optional = true, path = "..
unsigned-varint = { version = "0.3.0", features = ["codec"] }
void = "1.0.2"
zeroize = "1.0.0"
yamux = "0.4.2"
[dev-dependencies]
env_logger = "0.7.0"
+3
View File
@@ -292,6 +292,7 @@ impl Default for NetworkConfiguration {
enable_mdns: false,
allow_private_ipv4: true,
wasm_external_transport: None,
use_yamux_flow_control: false,
},
max_parallel_downloads: 5,
}
@@ -348,6 +349,8 @@ pub enum TransportConfig {
/// This parameter exists whatever the target platform is, but it is expected to be set to
/// `Some` only when compiling for WASM.
wasm_external_transport: Option<wasm_ext::ExtTransport>,
/// Use flow control for yamux streams if set to true.
use_yamux_flow_control: bool,
},
/// Only allow connections within the same process.
+5 -5
View File
@@ -235,12 +235,12 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
u64::from(params.network_config.out_peers) + 15,
));
let (transport, bandwidth) = {
let (config_mem, config_wasm) = match params.network_config.transport {
TransportConfig::MemoryOnly => (true, None),
TransportConfig::Normal { wasm_external_transport, .. } =>
(false, wasm_external_transport)
let (config_mem, config_wasm, flowctrl) = match params.network_config.transport {
TransportConfig::MemoryOnly => (true, None, false),
TransportConfig::Normal { wasm_external_transport, use_yamux_flow_control, .. } =>
(false, wasm_external_transport, use_yamux_flow_control)
};
transport::build_transport(local_identity, config_mem, config_wasm)
transport::build_transport(local_identity, config_mem, config_wasm, flowctrl)
};
let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone());
if let Some(spawner) = params.executor {
+15 -3
View File
@@ -17,7 +17,7 @@
use futures::prelude::*;
use libp2p::{
InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport,
mplex, identity, yamux, bandwidth, wasm_ext
mplex, identity, bandwidth, wasm_ext
};
#[cfg(not(target_os = "unknown"))]
use libp2p::{tcp, dns, websocket, noise};
@@ -36,7 +36,8 @@ pub use self::bandwidth::BandwidthSinks;
pub fn build_transport(
keypair: identity::Keypair,
memory_only: bool,
wasm_external_transport: Option<wasm_ext::ExtTransport>
wasm_external_transport: Option<wasm_ext::ExtTransport>,
use_yamux_flow_control: bool
) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc<bandwidth::BandwidthSinks>) {
// Build configuration objects for encryption mechanisms.
#[cfg(not(target_os = "unknown"))]
@@ -55,7 +56,18 @@ pub fn build_transport(
let mut mplex_config = mplex::MplexConfig::new();
mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block);
mplex_config.max_buffer_len(usize::MAX);
let yamux_config = yamux::Config::default();
let yamux_config = {
let mut c = yamux::Config::default();
// Only set SYN flag on first data frame sent to the remote.
c.set_lazy_open(true);
if use_yamux_flow_control {
// Enable proper flow-control: window updates are only sent when
// buffered data has been consumed.
c.set_window_update_mode(yamux::WindowUpdateMode::OnRead);
}
libp2p::yamux::Config::new(c)
};
// Build the base layer of the transport.
let transport = if let Some(t) = wasm_external_transport {
+1
View File
@@ -166,6 +166,7 @@ fn node_config<G, E: Clone> (
enable_mdns: false,
allow_private_ipv4: true,
wasm_external_transport: None,
use_yamux_flow_control: true,
},
max_parallel_downloads: NetworkConfiguration::default().max_parallel_downloads,
};
+1
View File
@@ -53,6 +53,7 @@ where
wasm_external_transport: Some(transport.clone()),
allow_private_ipv4: true,
enable_mdns: false,
use_yamux_flow_control: true,
};
config.task_executor = Some(Arc::new(move |fut| {
wasm_bindgen_futures::spawn_local(fut)