From c5fe4295f80749f87f9f8a645732c14f48c42f99 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 3 Jan 2020 21:47:12 +0100 Subject: [PATCH] *: Register network event stream for authority discovery (#4344) Previously one would create a sender and receiver channel pair, pass the sender to the `build_network_future` through the service builder and funnel network events returned from polling the network service into the sender to be consumed by the authority discovery module owning the receiver. With recent changes it is now possible to register an `event_stream` with the network service directly, thus one does not need to make the detour through the `build_network_future`. --- substrate/bin/node/cli/src/service.rs | 23 +++++++------- substrate/client/service/src/builder.rs | 40 +------------------------ substrate/client/service/src/lib.rs | 26 +--------------- 3 files changed, 12 insertions(+), 77 deletions(-) diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 408f2653ba..063963f7b3 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -113,13 +113,13 @@ macro_rules! new_full_start { /// concrete types instead. macro_rules! new_full { ($config:expr, $with_startup_data: expr) => {{ - use futures01::sync::mpsc; - use sc_network::DhtEvent; + use futures01::Stream; use futures::{ compat::Stream01CompatExt, stream::StreamExt, future::{FutureExt, TryFutureExt}, }; + use sc_network::Event; let ( is_authority, @@ -142,18 +142,10 @@ macro_rules! new_full { let (builder, mut import_setup, inherent_data_providers) = new_full_start!($config); - // Dht event channel from the network to the authority discovery module. Use bounded channel to ensure - // back-pressure. Authority discovery is triggering one event per authority within the current authority set. - // This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to - // 10 000. - let (dht_event_tx, dht_event_rx) = - mpsc::channel::(10_000); - let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _) )? - .with_dht_event_tx(dht_event_tx)? .build()?; let (block_import, grandpa_link, babe_link) = import_setup.take() @@ -190,15 +182,20 @@ macro_rules! new_full { let babe = sc_consensus_babe::start_babe(babe_config)?; service.spawn_essential_task(babe); - let future03_dht_event_rx = dht_event_rx.compat() + let network = service.network(); + let dht_event_stream = network.event_stream().filter_map(|e| match e { + Event::Dht(e) => Some(e), + _ => None, + }); + let future03_dht_event_stream = dht_event_stream.compat() .map(|x| x.expect(" never returns an error; qed")) .boxed(); let authority_discovery = sc_authority_discovery::AuthorityDiscovery::new( service.client(), - service.network(), + network, sentry_nodes, service.keystore(), - future03_dht_event_rx, + future03_dht_event_stream, ); let future01_authority_discovery = authority_discovery.map(|x| Ok(x)).compat(); diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 0c50ae3969..e95717ebfa 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -35,7 +35,7 @@ use futures03::{ }; use sc_keystore::{Store as Keystore}; use log::{info, warn, error}; -use sc_network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent}; +use sc_network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo}; use sc_network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization}; use parking_lot::{Mutex, RwLock}; use sp_core::{Blake2Hasher, H256, Hasher}; @@ -90,7 +90,6 @@ pub struct ServiceBuilder, rpc_extensions: TRpc, remote_backend: Option>>, - dht_event_tx: Option>, marker: PhantomData<(TBl, TRtApi)>, } @@ -225,7 +224,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension { transaction_pool: Arc::new(()), rpc_extensions: Default::default(), remote_backend: None, - dht_event_tx: None, marker: PhantomData, }) } @@ -303,7 +301,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension { transaction_pool: Arc::new(()), rpc_extensions: Default::default(), remote_backend: Some(remote_blockchain), - dht_event_tx: None, marker: PhantomData, }) } @@ -352,7 +349,6 @@ impl, - ) -> Result, Error> { - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - network_protocol: self.network_protocol, - transaction_pool: self.transaction_pool, - rpc_extensions: self.rpc_extensions, - remote_backend: self.remote_backend, - dht_event_tx: Some(dht_event_tx), marker: self.marker, }) } @@ -761,7 +725,6 @@ ServiceBuilder< transaction_pool, rpc_extensions, remote_backend, - dht_event_tx, } = self; sp_session::generate_initial_session_keys( @@ -1051,7 +1014,6 @@ ServiceBuilder< network_status_sinks.clone(), system_rpc_rx, has_bootnodes, - dht_event_tx, ) .map_err(|_| ()) .select(exit.clone().map(Ok).compat()) diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index e383703da7..a22a578f2f 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -44,7 +44,7 @@ use futures03::{ }; use sc_network::{ NetworkService, NetworkState, specialization::NetworkSpecialization, - Event, DhtEvent, PeerId, ReportHandle, + PeerId, ReportHandle, }; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; @@ -375,7 +375,6 @@ fn build_network_future< status_sinks: Arc, NetworkState)>>>, rpc_rx: futures03::channel::mpsc::UnboundedReceiver>, should_have_peers: bool, - dht_event_tx: Option>, ) -> impl Future { // Compatibility shim while we're transitioning to stable Futures. // See https://github.com/paritytech/substrate/issues/3099 @@ -386,9 +385,6 @@ fn build_network_future< let mut finality_notification_stream = client.finality_notification_stream().fuse() .map(|v| Ok::<_, ()>(v)).compat(); - // Initializing a stream in order to obtain DHT events from the network. - let mut event_stream = network.service().event_stream(); - futures::future::poll_fn(move || { let before_polling = Instant::now(); @@ -481,26 +477,6 @@ fn build_network_future< (status, state) }); - // Processing DHT events. - while let Ok(Async::Ready(Some(event))) = event_stream.poll() { - match event { - Event::Dht(event) => { - // Given that client/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht - // events are being passed on to the authority-discovery module. In the future there might be multiple - // consumers of these events. In that case this would need to be refactored to properly dispatch the events, - // e.g. via a subscriber model. - if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) { - if e.is_full() { - warn!(target: "service", "Dht event channel to authority discovery is full, dropping event."); - } else if e.is_disconnected() { - warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event."); - } - } - } - _ => {} - } - } - // Main network polling. if let Ok(Async::Ready(())) = network.poll().map_err(|err| { warn!(target: "service", "Error in network: {:?}", err);