Grab stream of networking events earlier (#3025)

This commit is contained in:
Pierre Krieger
2021-05-14 17:51:44 +02:00
committed by GitHub
parent 1712dfc9fd
commit 78b87c47a8
+14 -6
View File
@@ -23,6 +23,7 @@
use parity_scale_codec::{Encode, Decode}; use parity_scale_codec::{Encode, Decode};
use parking_lot::Mutex; use parking_lot::Mutex;
use futures::prelude::*; use futures::prelude::*;
use futures::stream::BoxStream;
use sc_network::Event as NetworkEvent; use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle; use sp_consensus::SyncOracle;
@@ -277,10 +278,14 @@ impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
AD: validator_discovery::AuthorityDiscovery, AD: validator_discovery::AuthorityDiscovery,
Context: SubsystemContext<Message=NetworkBridgeMessage>, Context: SubsystemContext<Message=NetworkBridgeMessage>,
{ {
fn start(self, ctx: Context) -> SpawnedSubsystem { fn start(mut self, ctx: Context) -> SpawnedSubsystem {
// The stream of networking events has to be created at initialization, otherwise the
// networking might open connections before the stream of events has been grabbed.
let network_stream = self.network_service.event_stream();
// Swallow error because failure is fatal to the node and we log with more precision // Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`. // within `run_network`.
let future = run_network(self, ctx) let future = run_network(self, ctx, network_stream)
.map_err(|e| { .map_err(|e| {
SubsystemError::with_origin("network-bridge", e) SubsystemError::with_origin("network-bridge", e)
}) })
@@ -535,13 +540,12 @@ where
async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>( async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
mut sender: impl SubsystemSender, mut sender: impl SubsystemSender,
mut network_service: impl Network, mut network_service: impl Network,
mut network_stream: BoxStream<'static, NetworkEvent>,
mut authority_discovery_service: AD, mut authority_discovery_service: AD,
mut request_multiplexer: RequestMultiplexer, mut request_multiplexer: RequestMultiplexer,
metrics: Metrics, metrics: Metrics,
shared: Shared, shared: Shared,
) -> Result<(), UnexpectedAbort> { ) -> Result<(), UnexpectedAbort> {
let mut network_stream = network_service.event_stream();
loop { loop {
futures::select! { futures::select! {
network_event = network_stream.next().fuse() => match network_event { network_event = network_stream.next().fuse() => match network_event {
@@ -798,10 +802,11 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
/// #fn is_send<T: Send>(); /// #fn is_send<T: Send>();
/// #is_send::<parking_lot::MutexGuard<'static, ()>(); /// #is_send::<parking_lot::MutexGuard<'static, ()>();
/// ``` /// ```
#[tracing::instrument(skip(bridge, ctx), fields(subsystem = LOG_TARGET))] #[tracing::instrument(skip(bridge, ctx, network_stream), fields(subsystem = LOG_TARGET))]
async fn run_network<N, AD>( async fn run_network<N, AD>(
bridge: NetworkBridge<N, AD>, bridge: NetworkBridge<N, AD>,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>, mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
network_stream: BoxStream<'static, NetworkEvent>,
) -> SubsystemResult<()> ) -> SubsystemResult<()>
where where
N: Network, N: Network,
@@ -824,6 +829,7 @@ where
let (remote, network_event_handler) = handle_network_messages( let (remote, network_event_handler) = handle_network_messages(
ctx.sender().clone(), ctx.sender().clone(),
network_service.clone(), network_service.clone(),
network_stream,
authority_discovery_service.clone(), authority_discovery_service.clone(),
request_multiplexer, request_multiplexer,
metrics.clone(), metrics.clone(),
@@ -1351,8 +1357,9 @@ mod tests {
) { ) {
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
let (request_multiplexer, req_configs) = RequestMultiplexer::new(); let (request_multiplexer, req_configs) = RequestMultiplexer::new();
let (network, network_handle, discovery) = new_test_network(req_configs); let (mut network, network_handle, discovery) = new_test_network(req_configs);
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let network_stream = network.event_stream();
let bridge = NetworkBridge { let bridge = NetworkBridge {
network_service: network, network_service: network,
@@ -1365,6 +1372,7 @@ mod tests {
let network_bridge = run_network( let network_bridge = run_network(
bridge, bridge,
context, context,
network_stream,
) )
.map_err(|_| panic!("subsystem execution failed")) .map_err(|_| panic!("subsystem execution failed"))
.map(|_| ()); .map(|_| ());