diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 6f9cf67766..fcaf7f0f7e 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -23,6 +23,7 @@ use parity_scale_codec::{Encode, Decode}; use parking_lot::Mutex; use futures::prelude::*; +use futures::stream::BoxStream; use sc_network::Event as NetworkEvent; use sp_consensus::SyncOracle; @@ -277,10 +278,14 @@ impl Subsystem for NetworkBridge AD: validator_discovery::AuthorityDiscovery, Context: SubsystemContext, { - fn start(self, ctx: Context) -> SpawnedSubsystem { + fn start(mut self, ctx: Context) -> SpawnedSubsystem { + // The stream of networking events has to be created at initialization, otherwise the + // networking might open connections before the stream of events has been grabbed. + let network_stream = self.network_service.event_stream(); + // Swallow error because failure is fatal to the node and we log with more precision // within `run_network`. - let future = run_network(self, ctx) + let future = run_network(self, ctx, network_stream) .map_err(|e| { SubsystemError::with_origin("network-bridge", e) }) @@ -535,13 +540,12 @@ where async fn handle_network_messages( mut sender: impl SubsystemSender, mut network_service: impl Network, + mut network_stream: BoxStream<'static, NetworkEvent>, mut authority_discovery_service: AD, mut request_multiplexer: RequestMultiplexer, metrics: Metrics, shared: Shared, ) -> Result<(), UnexpectedAbort> { - let mut network_stream = network_service.event_stream(); - loop { futures::select! { network_event = network_stream.next().fuse() => match network_event { @@ -798,10 +802,11 @@ async fn handle_network_messages( /// #fn is_send(); /// #is_send::(); /// ``` -#[tracing::instrument(skip(bridge, ctx), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(skip(bridge, ctx, network_stream), fields(subsystem = LOG_TARGET))] async fn run_network( bridge: NetworkBridge, mut ctx: impl SubsystemContext, + network_stream: BoxStream<'static, NetworkEvent>, ) -> SubsystemResult<()> where N: Network, @@ -824,6 +829,7 @@ where let (remote, network_event_handler) = handle_network_messages( ctx.sender().clone(), network_service.clone(), + network_stream, authority_discovery_service.clone(), request_multiplexer, metrics.clone(), @@ -1351,8 +1357,9 @@ mod tests { ) { let pool = sp_core::testing::TaskExecutor::new(); let (request_multiplexer, req_configs) = RequestMultiplexer::new(); - let (network, network_handle, discovery) = new_test_network(req_configs); + let (mut network, network_handle, discovery) = new_test_network(req_configs); let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + let network_stream = network.event_stream(); let bridge = NetworkBridge { network_service: network, @@ -1365,6 +1372,7 @@ mod tests { let network_bridge = run_network( bridge, context, + network_stream, ) .map_err(|_| panic!("subsystem execution failed")) .map(|_| ());