mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 21:21:11 +00:00
Delay network startup to after complete initialization (#6833)
* Delay network startup to after complete initialization * Update client/service/src/builder.rs Co-authored-by: Ashley <ashley.ruglys@gmail.com> Co-authored-by: Ashley <ashley.ruglys@gmail.com>
This commit is contained in:
@@ -83,7 +83,7 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
|
||||
let finality_proof_provider =
|
||||
GrandpaFinalityProofProvider::new_for_service(backend.clone(), client.clone());
|
||||
|
||||
let (network, network_status_sinks, system_rpc_tx) =
|
||||
let (network, network_status_sinks, system_rpc_tx, network_starter) =
|
||||
sc_service::build_network(sc_service::BuildNetworkParams {
|
||||
config: &config,
|
||||
client: client.clone(),
|
||||
@@ -215,6 +215,7 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
|
||||
)?;
|
||||
}
|
||||
|
||||
network_starter.start_network();
|
||||
Ok(task_manager)
|
||||
}
|
||||
|
||||
@@ -253,7 +254,7 @@ pub fn new_light(config: Configuration) -> Result<TaskManager, ServiceError> {
|
||||
let finality_proof_provider =
|
||||
GrandpaFinalityProofProvider::new_for_service(backend.clone(), client.clone());
|
||||
|
||||
let (network, network_status_sinks, system_rpc_tx) =
|
||||
let (network, network_status_sinks, system_rpc_tx, network_starter) =
|
||||
sc_service::build_network(sc_service::BuildNetworkParams {
|
||||
config: &config,
|
||||
client: client.clone(),
|
||||
@@ -288,5 +289,7 @@ pub fn new_light(config: Configuration) -> Result<TaskManager, ServiceError> {
|
||||
system_rpc_tx,
|
||||
})?;
|
||||
|
||||
network_starter.start_network();
|
||||
|
||||
Ok(task_manager)
|
||||
}
|
||||
|
||||
@@ -164,8 +164,8 @@ pub fn new_full_base(
|
||||
|
||||
let finality_proof_provider =
|
||||
GrandpaFinalityProofProvider::new_for_service(backend.clone(), client.clone());
|
||||
|
||||
let (network, network_status_sinks, system_rpc_tx) =
|
||||
|
||||
let (network, network_status_sinks, system_rpc_tx, network_starter) =
|
||||
sc_service::build_network(sc_service::BuildNetworkParams {
|
||||
config: &config,
|
||||
client: client.clone(),
|
||||
@@ -206,7 +206,7 @@ pub fn new_full_base(
|
||||
network_status_sinks,
|
||||
system_rpc_tx,
|
||||
})?;
|
||||
|
||||
|
||||
let (block_import, grandpa_link, babe_link) = import_setup;
|
||||
let shared_voter_state = rpc_setup;
|
||||
|
||||
@@ -322,6 +322,7 @@ pub fn new_full_base(
|
||||
)?;
|
||||
}
|
||||
|
||||
network_starter.start_network();
|
||||
Ok((task_manager, inherent_data_providers, client, network, transaction_pool))
|
||||
}
|
||||
|
||||
@@ -383,7 +384,7 @@ pub fn new_light_base(config: Configuration) -> Result<(
|
||||
let finality_proof_provider =
|
||||
GrandpaFinalityProofProvider::new_for_service(backend.clone(), client.clone());
|
||||
|
||||
let (network, network_status_sinks, system_rpc_tx) =
|
||||
let (network, network_status_sinks, system_rpc_tx, network_starter) =
|
||||
sc_service::build_network(sc_service::BuildNetworkParams {
|
||||
config: &config,
|
||||
client: client.clone(),
|
||||
@@ -395,7 +396,8 @@ pub fn new_light_base(config: Configuration) -> Result<(
|
||||
finality_proof_request_builder: Some(finality_proof_request_builder),
|
||||
finality_proof_provider: Some(finality_proof_provider),
|
||||
})?;
|
||||
|
||||
network_starter.start_network();
|
||||
|
||||
if config.offchain_worker.enabled {
|
||||
sc_service::build_offchain_workers(
|
||||
&config, backend.clone(), task_manager.spawn_handle(), client.clone(), network.clone(),
|
||||
@@ -412,7 +414,7 @@ pub fn new_light_base(config: Configuration) -> Result<(
|
||||
let rpc_extensions = node_rpc::create_light(light_deps);
|
||||
|
||||
let rpc_handlers =
|
||||
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
|
||||
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
|
||||
on_demand: Some(on_demand),
|
||||
remote_blockchain: Some(backend.remote_blockchain()),
|
||||
rpc_extensions_builder: Box::new(sc_service::NoopRpcExtensionBuilder(rpc_extensions)),
|
||||
@@ -423,7 +425,7 @@ pub fn new_light_base(config: Configuration) -> Result<(
|
||||
telemetry_connection_sinks: sc_service::TelemetryConnectionSinks::default(),
|
||||
task_manager: &mut task_manager,
|
||||
})?;
|
||||
|
||||
|
||||
Ok((task_manager, rpc_handlers, client, network, transaction_pool))
|
||||
}
|
||||
|
||||
@@ -498,7 +500,7 @@ mod tests {
|
||||
setup_handles = Some((block_import.clone(), babe_link.clone()));
|
||||
}
|
||||
)?;
|
||||
|
||||
|
||||
let node = sc_service_test::TestNetComponents::new(
|
||||
keep_alive, client, network, transaction_pool
|
||||
);
|
||||
|
||||
@@ -33,7 +33,7 @@ use sp_consensus::{
|
||||
block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator, Chain},
|
||||
import_queue::ImportQueue,
|
||||
};
|
||||
use futures::{FutureExt, StreamExt, future::ready};
|
||||
use futures::{FutureExt, StreamExt, future::ready, channel::oneshot};
|
||||
use jsonrpc_pubsub::manager::SubscriptionManager;
|
||||
use sc_keystore::Store as Keystore;
|
||||
use log::{info, warn, error};
|
||||
@@ -668,7 +668,7 @@ fn build_telemetry<TBl: BlockT>(
|
||||
let startup_time = SystemTime::UNIX_EPOCH.elapsed()
|
||||
.map(|dur| dur.as_millis())
|
||||
.unwrap_or(0);
|
||||
|
||||
|
||||
spawn_handle.spawn(
|
||||
"telemetry-worker",
|
||||
telemetry.clone()
|
||||
@@ -822,6 +822,7 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
|
||||
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
|
||||
NetworkStatusSinks<TBl>,
|
||||
TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
|
||||
NetworkStarter,
|
||||
),
|
||||
Error
|
||||
>
|
||||
@@ -900,6 +901,22 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
|
||||
config.announce_block,
|
||||
);
|
||||
|
||||
// TODO: Normally, one is supposed to pass a list of notifications protocols supported by the
|
||||
// node through the `NetworkConfiguration` struct. But because this function doesn't know in
|
||||
// advance which components, such as GrandPa or Polkadot, will be plugged on top of the
|
||||
// service, it is unfortunately not possible to do so without some deep refactoring. To bypass
|
||||
// this problem, the `NetworkService` provides a `register_notifications_protocol` method that
|
||||
// can be called even after the network has been initialized. However, we want to avoid the
|
||||
// situation where `register_notifications_protocol` is called *after* the network actually
|
||||
// connects to other peers. For this reason, we delay the process of the network future until
|
||||
// the user calls `NetworkStarter::start_network`.
|
||||
//
|
||||
// This entire hack should eventually be removed in favour of passing the list of protocols
|
||||
// through the configuration.
|
||||
//
|
||||
// See also https://github.com/paritytech/substrate/issues/6827
|
||||
let (network_start_tx, network_start_rx) = oneshot::channel();
|
||||
|
||||
// The network worker is responsible for gathering all network messages and processing
|
||||
// them. This is quite a heavy task, and at the time of the writing of this comment it
|
||||
// frequently happens that this future takes several seconds or in some situations
|
||||
@@ -907,7 +924,32 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
|
||||
// issue, and ideally we would like to fix the network future to take as little time as
|
||||
// possible, but we also take the extra harm-prevention measure to execute the networking
|
||||
// future using `spawn_blocking`.
|
||||
spawn_handle.spawn_blocking("network-worker", future);
|
||||
spawn_handle.spawn_blocking("network-worker", async move {
|
||||
if network_start_rx.await.is_err() {
|
||||
debug_assert!(false);
|
||||
log::warn!(
|
||||
"The NetworkStart returned as part of `build_network` has been silently dropped"
|
||||
);
|
||||
// This `return` might seem unnecessary, but we don't want to make it look like
|
||||
// everything is working as normal even though the user is clearly misusing the API.
|
||||
return;
|
||||
}
|
||||
|
||||
Ok((network, network_status_sinks, system_rpc_tx))
|
||||
future.await
|
||||
});
|
||||
|
||||
Ok((network, network_status_sinks, system_rpc_tx, NetworkStarter(network_start_tx)))
|
||||
}
|
||||
|
||||
/// Object used to start the network.
|
||||
#[must_use]
|
||||
pub struct NetworkStarter(oneshot::Sender<()>);
|
||||
|
||||
impl NetworkStarter {
|
||||
/// Start the network. Call this after all sub-components have been initialized.
|
||||
///
|
||||
/// > **Note**: If you don't call this function, the networking will not work.
|
||||
pub fn start_network(self) {
|
||||
let _ = self.0.send(());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ use sp_utils::{status_sinks, mpsc::{tracing_unbounded, TracingUnboundedReceiver,
|
||||
pub use self::error::Error;
|
||||
pub use self::builder::{
|
||||
new_full_client, new_client, new_full_parts, new_light_parts,
|
||||
spawn_tasks, build_network, BuildNetworkParams, build_offchain_workers,
|
||||
spawn_tasks, build_network, BuildNetworkParams, NetworkStarter, build_offchain_workers,
|
||||
SpawnTasksParams, TFullClient, TLightClient, TFullBackend, TLightBackend,
|
||||
TLightBackendWithHash, TLightClientWithBackend,
|
||||
TFullCallExecutor, TLightCallExecutor, RpcExtensionBuilder, NoopRpcExtensionBuilder,
|
||||
|
||||
Reference in New Issue
Block a user