mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 20:27:58 +00:00
*: Register network event stream for authority discovery (#4344)
Previously one would create a sender and receiver channel pair, pass the sender to the `build_network_future` through the service builder and funnel network events returned from polling the network service into the sender to be consumed by the authority discovery module owning the receiver. With recent changes it is now possible to register an `event_stream` with the network service directly, thus one does not need to make the detour through the `build_network_future`.
This commit is contained in:
@@ -113,13 +113,13 @@ macro_rules! new_full_start {
|
||||
/// concrete types instead.
|
||||
macro_rules! new_full {
|
||||
($config:expr, $with_startup_data: expr) => {{
|
||||
use futures01::sync::mpsc;
|
||||
use sc_network::DhtEvent;
|
||||
use futures01::Stream;
|
||||
use futures::{
|
||||
compat::Stream01CompatExt,
|
||||
stream::StreamExt,
|
||||
future::{FutureExt, TryFutureExt},
|
||||
};
|
||||
use sc_network::Event;
|
||||
|
||||
let (
|
||||
is_authority,
|
||||
@@ -142,18 +142,10 @@ macro_rules! new_full {
|
||||
|
||||
let (builder, mut import_setup, inherent_data_providers) = new_full_start!($config);
|
||||
|
||||
// Dht event channel from the network to the authority discovery module. Use bounded channel to ensure
|
||||
// back-pressure. Authority discovery is triggering one event per authority within the current authority set.
|
||||
// This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to
|
||||
// 10 000.
|
||||
let (dht_event_tx, dht_event_rx) =
|
||||
mpsc::channel::<DhtEvent>(10_000);
|
||||
|
||||
let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))?
|
||||
.with_finality_proof_provider(|client, backend|
|
||||
Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _)
|
||||
)?
|
||||
.with_dht_event_tx(dht_event_tx)?
|
||||
.build()?;
|
||||
|
||||
let (block_import, grandpa_link, babe_link) = import_setup.take()
|
||||
@@ -190,15 +182,20 @@ macro_rules! new_full {
|
||||
let babe = sc_consensus_babe::start_babe(babe_config)?;
|
||||
service.spawn_essential_task(babe);
|
||||
|
||||
let future03_dht_event_rx = dht_event_rx.compat()
|
||||
let network = service.network();
|
||||
let dht_event_stream = network.event_stream().filter_map(|e| match e {
|
||||
Event::Dht(e) => Some(e),
|
||||
_ => None,
|
||||
});
|
||||
let future03_dht_event_stream = dht_event_stream.compat()
|
||||
.map(|x| x.expect("<mpsc::channel::Receiver as Stream> never returns an error; qed"))
|
||||
.boxed();
|
||||
let authority_discovery = sc_authority_discovery::AuthorityDiscovery::new(
|
||||
service.client(),
|
||||
service.network(),
|
||||
network,
|
||||
sentry_nodes,
|
||||
service.keystore(),
|
||||
future03_dht_event_rx,
|
||||
future03_dht_event_stream,
|
||||
);
|
||||
let future01_authority_discovery = authority_discovery.map(|x| Ok(x)).compat();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user