diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 408f2653ba..063963f7b3 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -113,13 +113,13 @@ macro_rules! new_full_start { /// concrete types instead. macro_rules! new_full { ($config:expr, $with_startup_data: expr) => {{ - use futures01::sync::mpsc; - use sc_network::DhtEvent; + use futures01::Stream; use futures::{ compat::Stream01CompatExt, stream::StreamExt, future::{FutureExt, TryFutureExt}, }; + use sc_network::Event; let ( is_authority, @@ -142,18 +142,10 @@ macro_rules! new_full { let (builder, mut import_setup, inherent_data_providers) = new_full_start!($config); - // Dht event channel from the network to the authority discovery module. Use bounded channel to ensure - // back-pressure. Authority discovery is triggering one event per authority within the current authority set. - // This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to - // 10 000. - let (dht_event_tx, dht_event_rx) = - mpsc::channel::(10_000); - let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _) )? - .with_dht_event_tx(dht_event_tx)? .build()?; let (block_import, grandpa_link, babe_link) = import_setup.take() @@ -190,15 +182,20 @@ macro_rules! new_full { let babe = sc_consensus_babe::start_babe(babe_config)?; service.spawn_essential_task(babe); - let future03_dht_event_rx = dht_event_rx.compat() + let network = service.network(); + let dht_event_stream = network.event_stream().filter_map(|e| match e { + Event::Dht(e) => Some(e), + _ => None, + }); + let future03_dht_event_stream = dht_event_stream.compat() .map(|x| x.expect(" never returns an error; qed")) .boxed(); let authority_discovery = sc_authority_discovery::AuthorityDiscovery::new( service.client(), - service.network(), + network, sentry_nodes, service.keystore(), - future03_dht_event_rx, + future03_dht_event_stream, ); let future01_authority_discovery = authority_discovery.map(|x| Ok(x)).compat(); diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 0c50ae3969..e95717ebfa 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -35,7 +35,7 @@ use futures03::{ }; use sc_keystore::{Store as Keystore}; use log::{info, warn, error}; -use sc_network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent}; +use sc_network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo}; use sc_network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization}; use parking_lot::{Mutex, RwLock}; use sp_core::{Blake2Hasher, H256, Hasher}; @@ -90,7 +90,6 @@ pub struct ServiceBuilder, rpc_extensions: TRpc, remote_backend: Option>>, - dht_event_tx: Option>, marker: PhantomData<(TBl, TRtApi)>, } @@ -225,7 +224,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension { transaction_pool: Arc::new(()), rpc_extensions: Default::default(), remote_backend: None, - dht_event_tx: None, marker: PhantomData, }) } @@ -303,7 +301,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension { transaction_pool: Arc::new(()), rpc_extensions: Default::default(), remote_backend: Some(remote_blockchain), - dht_event_tx: None, marker: PhantomData, }) } @@ -352,7 +349,6 @@ impl, - ) -> Result, Error> { - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - network_protocol: self.network_protocol, - transaction_pool: self.transaction_pool, - rpc_extensions: self.rpc_extensions, - remote_backend: self.remote_backend, - dht_event_tx: Some(dht_event_tx), marker: self.marker, }) } @@ -761,7 +725,6 @@ ServiceBuilder< transaction_pool, rpc_extensions, remote_backend, - dht_event_tx, } = self; sp_session::generate_initial_session_keys( @@ -1051,7 +1014,6 @@ ServiceBuilder< network_status_sinks.clone(), system_rpc_rx, has_bootnodes, - dht_event_tx, ) .map_err(|_| ()) .select(exit.clone().map(Ok).compat()) diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index e383703da7..a22a578f2f 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -44,7 +44,7 @@ use futures03::{ }; use sc_network::{ NetworkService, NetworkState, specialization::NetworkSpecialization, - Event, DhtEvent, PeerId, ReportHandle, + PeerId, ReportHandle, }; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; @@ -375,7 +375,6 @@ fn build_network_future< status_sinks: Arc, NetworkState)>>>, rpc_rx: futures03::channel::mpsc::UnboundedReceiver>, should_have_peers: bool, - dht_event_tx: Option>, ) -> impl Future { // Compatibility shim while we're transitioning to stable Futures. // See https://github.com/paritytech/substrate/issues/3099 @@ -386,9 +385,6 @@ fn build_network_future< let mut finality_notification_stream = client.finality_notification_stream().fuse() .map(|v| Ok::<_, ()>(v)).compat(); - // Initializing a stream in order to obtain DHT events from the network. - let mut event_stream = network.service().event_stream(); - futures::future::poll_fn(move || { let before_polling = Instant::now(); @@ -481,26 +477,6 @@ fn build_network_future< (status, state) }); - // Processing DHT events. - while let Ok(Async::Ready(Some(event))) = event_stream.poll() { - match event { - Event::Dht(event) => { - // Given that client/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht - // events are being passed on to the authority-discovery module. In the future there might be multiple - // consumers of these events. In that case this would need to be refactored to properly dispatch the events, - // e.g. via a subscriber model. - if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) { - if e.is_full() { - warn!(target: "service", "Dht event channel to authority discovery is full, dropping event."); - } else if e.is_disconnected() { - warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event."); - } - } - } - _ => {} - } - } - // Main network polling. if let Ok(Async::Ready(())) = network.poll().map_err(|err| { warn!(target: "service", "Error in network: {:?}", err);