Split the service initialisation up into seperate functions (#6332)

* Seperate out the complexity in ServiceBuilder::build_common into seperate functions

* Fix line widths

* Move some functions to their respective crates
This commit is contained in:
Ashley
2020-06-16 15:50:21 +02:00
committed by GitHub
parent c44947bbcb
commit 761dbd7dcc
19 changed files with 452 additions and 330 deletions
+347 -305
View File
@@ -24,11 +24,11 @@ use crate::{
config::{Configuration, KeystoreConfig, PrometheusConfig, OffchainWorkerConfig},
};
use sc_client_api::{
self, BlockchainEvents, light::RemoteBlockchain, execution_extensions::ExtensionsFactory,
self, light::RemoteBlockchain, execution_extensions::ExtensionsFactory,
ExecutorProvider, CallExecutor, ForkBlocks, BadBlocks, CloneableSpawn, UsageProvider,
backend::RemoteBackend,
};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
use sc_chain_spec::get_extension;
use sp_consensus::{
block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator},
@@ -42,7 +42,7 @@ use jsonrpc_pubsub::manager::SubscriptionManager;
use sc_keystore::Store as Keystore;
use log::{info, warn, error};
use sc_network::config::{Role, FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
use sc_network::{NetworkService, NetworkStateInfo};
use sc_network::NetworkService;
use parking_lot::{Mutex, RwLock};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
@@ -1002,11 +1002,8 @@ ServiceBuilder<
// A side-channel for essential tasks to communicate shutdown.
let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks");
let import_queue = Box::new(import_queue);
let chain_info = client.chain_info();
let chain_spec = &config.chain_spec;
let version = config.impl_version;
info!("📦 Highest known block at #{}", chain_info.best_number);
telemetry!(
SUBSTRATE_INFO;
@@ -1015,55 +1012,26 @@ ServiceBuilder<
"best" => ?chain_info.best_hash
);
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !matches!(config.role, Role::Light),
pool: transaction_pool.clone(),
client: client.clone(),
});
let spawn_handle = task_manager.spawn_handle();
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
let protocol_id = {
let protocol_id_full = match chain_spec.protocol_id() {
Some(pid) => pid,
None => {
warn!("Using default protocol ID {:?} because none is configured in the \
chain specs", DEFAULT_PROTOCOL_ID
);
DEFAULT_PROTOCOL_ID
}
}.as_bytes();
sc_network::config::ProtocolId::from(protocol_id_full)
};
let (network, network_status_sinks, network_future) = build_network(
&config, client.clone(), transaction_pool.clone(), Clone::clone(&spawn_handle), on_demand.clone(),
block_announce_validator_builder, finality_proof_request_builder, finality_proof_provider,
system_rpc_rx, import_queue
)?;
let block_announce_validator = if let Some(f) = block_announce_validator_builder {
f(client.clone())
} else {
Box::new(DefaultBlockAnnounceValidator::new(client.clone()))
};
let network_params = sc_network::config::Params {
role: config.role.clone(),
executor: {
let spawn_handle = task_manager.spawn_handle();
Some(Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", fut);
}))
},
network_config: config.network.clone(),
chain: client.clone(),
finality_proof_provider,
finality_proof_request_builder,
on_demand: on_demand.clone(),
transaction_pool: transaction_pool_adapter.clone() as _,
import_queue,
protocol_id,
block_announce_validator,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone())
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new()));
// The network worker is responsible for gathering all network messages and processing
// them. This is quite a heavy task, and at the time of the writing of this comment it
// frequently happens that this future takes several seconds or in some situations
// even more than a minute until it has processed its entire queue. This is clearly an
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking(
"network-worker",
network_future
);
let offchain_storage = backend.offchain_storage();
let offchain_workers = match (config.offchain_worker.clone(), offchain_storage.clone()) {
@@ -1077,114 +1045,39 @@ ServiceBuilder<
_ => None,
};
let spawn_handle = task_manager.spawn_handle();
// Inform the tx pool about imported and finalized blocks.
{
let txpool = Arc::downgrade(&transaction_pool);
let mut import_stream = client.import_notification_stream().map(Into::into).fuse();
let mut finality_stream = client.finality_notification_stream()
.map(Into::into)
.fuse();
let events = async move {
loop {
let evt = futures::select! {
evt = import_stream.next() => evt,
evt = finality_stream.next() => evt,
complete => return,
};
let txpool = txpool.upgrade();
if let Some((txpool, evt)) = txpool.and_then(|tp| evt.map(|evt| (tp, evt))) {
txpool.maintain(evt).await;
}
}
};
spawn_handle.spawn(
"txpool-notifications",
events,
);
}
spawn_handle.spawn(
"txpool-notifications",
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);
// Inform the offchain worker about new imported blocks
{
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
let notifications_spawn_handle = task_manager.spawn_handle();
let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone();
let is_validator = config.role.is_authority();
let events = client.import_notification_stream().for_each(move |n| {
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
match offchain {
Some(offchain) if n.is_new_best => {
notifications_spawn_handle.spawn(
"offchain-on-block",
offchain.on_block_imported(
&n.header,
network_state_info.clone(),
is_validator,
),
);
},
Some(_) => log::debug!(
target: "sc_offchain",
"Skipping offchain workers for non-canon block: {:?}",
n.header,
),
_ => {},
}
ready(())
});
if let Some(offchain) = offchain_workers.clone() {
spawn_handle.spawn(
"offchain-notifications",
events,
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
offchain,
task_manager.spawn_handle(),
network.clone()
)
);
}
{
// extrinsic notifications
let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream()
.for_each(move |hash| {
if let Some(network) = network.upgrade() {
network.propagate_extrinsic(hash);
}
let status = transaction_pool_.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready,
"future" => status.future
);
ready(())
});
spawn_handle.spawn(
"on-transaction-imported",
events,
);
}
spawn_handle.spawn(
"on-transaction-imported",
extrinsic_notifications(transaction_pool.clone(), network.clone()),
);
// Prometheus metrics.
let mut metrics_service = if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
let metrics_service = if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
// Set static metrics.
let role_bits = match config.role {
Role::Full => 1u64,
Role::Light => 2u64,
Role::Sentry { .. } => 3u64,
Role::Authority { .. } => 4u64,
};
let metrics = MetricsService::with_prometheus(
&registry,
&config.network.node_name,
&config.impl_version,
role_bits,
&config.role,
)?;
spawn_handle.spawn(
"prometheus-endpoint",
@@ -1197,171 +1090,33 @@ ServiceBuilder<
};
// Periodically notify the telemetry.
let transaction_pool_ = transaction_pool.clone();
let client_ = client.clone();
let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1");
network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx);
let tel_task = state_rx.for_each(move |(net_status, _)| {
let info = client_.usage_info();
metrics_service.tick(
&info,
&transaction_pool_.status(),
&net_status,
);
ready(())
});
spawn_handle.spawn(
"telemetry-periodic-send",
tel_task,
);
spawn_handle.spawn("telemetry-periodic-send", telemetry_periodic_send(
client.clone(), transaction_pool.clone(), metrics_service, network_status_sinks.clone()
));
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2");
network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx);
let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| {
telemetry!(
SUBSTRATE_INFO;
"system.network_state";
"state" => network_state,
);
ready(())
});
spawn_handle.spawn(
"telemetry-periodic-network-state",
tel_task_2,
telemetry_periodic_network_state(network_status_sinks.clone()),
);
// RPC
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe| {
use sc_rpc::{chain, state, author, system, offchain};
let system_info = sc_rpc::system::SystemInfo {
chain_name: chain_spec.name().into(),
impl_name: config.impl_name.into(),
impl_version: config.impl_version.into(),
properties: chain_spec.properties().clone(),
chain_type: chain_spec.chain_type().clone(),
};
let subscriptions = SubscriptionManager::new(Arc::new(task_manager.spawn_handle()));
let (chain, state, child_state) = if let (Some(remote_backend), Some(on_demand)) =
(remote_backend.as_ref(), on_demand.as_ref()) {
// Light clients
let chain = sc_rpc::chain::new_light(
client.clone(),
subscriptions.clone(),
remote_backend.clone(),
on_demand.clone()
);
let (state, child_state) = sc_rpc::state::new_light(
client.clone(),
subscriptions.clone(),
remote_backend.clone(),
on_demand.clone()
);
(chain, state, child_state)
} else {
// Full nodes
let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone());
let (state, child_state) = sc_rpc::state::new_full(client.clone(), subscriptions.clone());
(chain, state, child_state)
};
let author = sc_rpc::author::Author::new(
client.clone(),
transaction_pool.clone(),
subscriptions,
keystore.clone(),
deny_unsafe,
);
let system = system::System::new(system_info, system_rpc_tx.clone(), deny_unsafe);
let maybe_offchain_rpc = offchain_storage.clone()
.map(|storage| {
let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe);
// FIXME: Use plain Option (don't collect into HashMap) when we upgrade to jsonrpc 14.1
// https://github.com/paritytech/jsonrpc/commit/20485387ed06a48f1a70bf4d609a7cde6cf0accf
let delegate = offchain::OffchainApi::to_delegate(offchain);
delegate.into_iter().collect::<HashMap<_, _>>()
}).unwrap_or_default();
sc_rpc_server::rpc_handler((
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe),
))
};
let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe| gen_handler(
deny_unsafe, &config, &task_manager, client.clone(), transaction_pool.clone(),
keystore.clone(), on_demand.clone(), remote_backend.clone(), &*rpc_extensions_builder,
offchain_storage.clone(), system_rpc_tx.clone()
);
let rpc = start_rpc_servers(&config, gen_handler)?;
// This is used internally, so don't restrict access to unsafe RPC
let rpc_handlers = gen_handler(sc_rpc::DenyUnsafe::No);
// The network worker is responsible for gathering all network messages and processing
// them. This is quite a heavy task, and at the time of the writing of this comment it
// frequently happens that this future takes several seconds or in some situations
// even more than a minute until it has processed its entire queue. This is clearly an
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking(
"network-worker",
build_network_future(
config.role.clone(),
network_mut,
client.clone(),
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
config.announce_block,
),
);
let telemetry_connection_sinks: Arc<Mutex<Vec<TracingUnboundedSender<()>>>> = Default::default();
// Telemetry
let telemetry = config.telemetry_endpoints.clone().map(|endpoints| {
let is_authority = config.role.is_authority();
let network_id = network.local_peer_id().to_base58();
let name = config.network.node_name.clone();
let impl_name = config.impl_name.to_owned();
let version = version.clone();
let chain_name = config.chain_spec.name().to_owned();
let telemetry_connection_sinks_ = telemetry_connection_sinks.clone();
let telemetry = sc_telemetry::init_telemetry(sc_telemetry::TelemetryConfig {
endpoints,
wasm_external_transport: config.telemetry_external_transport.take(),
});
let startup_time = SystemTime::UNIX_EPOCH.elapsed()
.map(|dur| dur.as_millis())
.unwrap_or(0);
let future = telemetry.clone()
.for_each(move |event| {
// Safe-guard in case we add more events in the future.
let sc_telemetry::TelemetryEvent::Connected = event;
telemetry!(SUBSTRATE_INFO; "system.connected";
"name" => name.clone(),
"implementation" => impl_name.clone(),
"version" => version.clone(),
"config" => "",
"chain" => chain_name.clone(),
"authority" => is_authority,
"startup_time" => startup_time,
"network_id" => network_id.clone()
);
telemetry_connection_sinks_.lock().retain(|sink| {
sink.unbounded_send(()).is_ok()
});
ready(())
});
let (telemetry, future) = build_telemetry(
&mut config, endpoints, telemetry_connection_sinks.clone(), network.clone()
);
spawn_handle.spawn(
"telemetry-worker",
@@ -1383,18 +1138,12 @@ ServiceBuilder<
}
// Spawn informant task
let network_status_sinks_1 = network_status_sinks.clone();
let informant_future = sc_informant::build(
spawn_handle.spawn("informant", sc_informant::build(
client.clone(),
move |interval| {
let (sink, stream) = tracing_unbounded("mpsc_network_status");
network_status_sinks_1.lock().push(interval, sink);
stream
},
network_status_sinks.clone(),
transaction_pool.clone(),
sc_informant::OutputFormat { enable_color: true, prefix: informant_prefix },
);
spawn_handle.spawn("informant", informant_future);
));
Ok(Service {
client,
@@ -1495,3 +1244,296 @@ ServiceBuilder<
self.build_common()
}
}
async fn extrinsic_notifications<TBl, TExPool>(
transaction_pool: Arc<TExPool>,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>
)
where
TBl: BlockT,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>,
{
// extrinsic notifications
transaction_pool.import_notification_stream()
.for_each(move |hash| {
network.propagate_extrinsic(hash);
let status = transaction_pool.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready,
"future" => status.future
);
ready(())
})
.await;
}
// Periodically notify the telemetry.
async fn telemetry_periodic_send<TBl, TBackend, TExec, TRtApi, TExPool>(
client: Arc<Client<TBackend, TExec, TBl, TRtApi>>,
transaction_pool: Arc<TExPool>,
mut metrics_service: MetricsService,
network_status_sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>>
)
where
TBl: BlockT,
TExec: CallExecutor<TBl>,
Client<TBackend, TExec, TBl, TRtApi>: ProvideRuntimeApi<TBl>,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>,
TBackend: sc_client_api::backend::Backend<TBl>,
{
let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1");
network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx);
state_rx.for_each(move |(net_status, _)| {
let info = client.usage_info();
metrics_service.tick(
&info,
&transaction_pool.status(),
&net_status,
);
ready(())
}).await;
}
async fn telemetry_periodic_network_state<TBl: BlockT>(
network_status_sinks: Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>>
) {
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2");
network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx);
netstat_rx.for_each(move |(_, network_state)| {
telemetry!(
SUBSTRATE_INFO;
"system.network_state";
"state" => network_state,
);
ready(())
}).await;
}
fn build_telemetry<TBl: BlockT>(
config: &mut Configuration,
endpoints: sc_telemetry::TelemetryEndpoints,
telemetry_connection_sinks: Arc<Mutex<Vec<TracingUnboundedSender<()>>>>,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>
) -> (sc_telemetry::Telemetry, Pin<Box<dyn Future<Output = ()> + Send>>) {
let is_authority = config.role.is_authority();
let network_id = network.local_peer_id().to_base58();
let name = config.network.node_name.clone();
let impl_name = config.impl_name.to_owned();
let version = config.impl_version;
let chain_name = config.chain_spec.name().to_owned();
let telemetry = sc_telemetry::init_telemetry(sc_telemetry::TelemetryConfig {
endpoints,
wasm_external_transport: config.telemetry_external_transport.take(),
});
let startup_time = SystemTime::UNIX_EPOCH.elapsed()
.map(|dur| dur.as_millis())
.unwrap_or(0);
let future = telemetry.clone()
.for_each(move |event| {
// Safe-guard in case we add more events in the future.
let sc_telemetry::TelemetryEvent::Connected = event;
telemetry!(SUBSTRATE_INFO; "system.connected";
"name" => name.clone(),
"implementation" => impl_name.clone(),
"version" => version,
"config" => "",
"chain" => chain_name.clone(),
"authority" => is_authority,
"startup_time" => startup_time,
"network_id" => network_id.clone()
);
telemetry_connection_sinks.lock().retain(|sink| {
sink.unbounded_send(()).is_ok()
});
ready(())
})
.boxed();
(telemetry, future)
}
fn gen_handler<TBl, TBackend, TExec, TRtApi, TExPool, TRpc>(
deny_unsafe: sc_rpc::DenyUnsafe,
config: &Configuration,
task_manager: &TaskManager,
client: Arc<Client<TBackend, TExec, TBl, TRtApi>>,
transaction_pool: Arc<TExPool>,
keystore: Arc<RwLock<Keystore>>,
on_demand: Option<Arc<OnDemand<TBl>>>,
remote_backend: Option<Arc<dyn RemoteBlockchain<TBl>>>,
rpc_extensions_builder: &(dyn RpcExtensionBuilder<Output = TRpc> + Send),
offchain_storage: Option<<TBackend as sc_client_api::backend::Backend<TBl>>::OffchainStorage>,
system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>
) -> jsonrpc_pubsub::PubSubHandler<sc_rpc::Metadata>
where
TBl: BlockT,
TExec: CallExecutor<TBl, Backend = TBackend> + Send + Sync + 'static,
TRtApi: Send + Sync + 'static,
Client<TBackend, TExec, TBl, TRtApi>: ProvideRuntimeApi<TBl>,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TBackend: sc_client_api::backend::Backend<TBl> + 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata>,
<Client<TBackend, TExec, TBl, TRtApi> as ProvideRuntimeApi<TBl>>::Api:
sp_session::SessionKeys<TBl> +
sp_api::Metadata<TBl, Error = sp_blockchain::Error>,
{
use sc_rpc::{chain, state, author, system, offchain};
let system_info = sc_rpc::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.into(),
impl_version: config.impl_version.into(),
properties: config.chain_spec.properties(),
chain_type: config.chain_spec.chain_type(),
};
let subscriptions = SubscriptionManager::new(Arc::new(task_manager.spawn_handle()));
let (chain, state, child_state) = if let (Some(remote_backend), Some(on_demand)) =
(remote_backend, on_demand) {
// Light clients
let chain = sc_rpc::chain::new_light(
client.clone(),
subscriptions.clone(),
remote_backend.clone(),
on_demand.clone()
);
let (state, child_state) = sc_rpc::state::new_light(
client.clone(),
subscriptions.clone(),
remote_backend.clone(),
on_demand.clone()
);
(chain, state, child_state)
} else {
// Full nodes
let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone());
let (state, child_state) = sc_rpc::state::new_full(client.clone(), subscriptions.clone());
(chain, state, child_state)
};
let author = sc_rpc::author::Author::new(
client.clone(),
transaction_pool.clone(),
subscriptions,
keystore.clone(),
deny_unsafe,
);
let system = system::System::new(system_info, system_rpc_tx.clone(), deny_unsafe);
let maybe_offchain_rpc = offchain_storage.clone()
.map(|storage| {
let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe);
// FIXME: Use plain Option (don't collect into HashMap) when we upgrade to jsonrpc 14.1
// https://github.com/paritytech/jsonrpc/commit/20485387ed06a48f1a70bf4d609a7cde6cf0accf
let delegate = offchain::OffchainApi::to_delegate(offchain);
delegate.into_iter().collect::<HashMap<_, _>>()
}).unwrap_or_default();
sc_rpc_server::rpc_handler((
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe),
))
}
fn build_network<TBl, TBackend, TExec, TRtApi, TExPool, TImpQu>(
config: &Configuration,
client: Arc<Client<TBackend, TExec, TBl, TRtApi>>,
transaction_pool: Arc<TExPool>,
spawn_handle: SpawnTaskHandle,
on_demand: Option<Arc<OnDemand<TBl>>>,
block_announce_validator_builder: Option<Box<
dyn FnOnce(Arc<Client<TBackend, TExec, TBl, TRtApi>>) ->
Box<dyn BlockAnnounceValidator<TBl> + Send> + Send
>>,
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<TBl>>,
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<TBl>>>,
system_rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<TBl>>,
import_queue: TImpQu
) -> Result<
(
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
Arc<Mutex<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>>,
Pin<Box<dyn Future<Output = ()> + Send>>
),
Error
>
where
TBl: BlockT,
TExec: CallExecutor<TBl> + Send + Sync + 'static,
TRtApi: Send + Sync + 'static,
Client<TBackend, TExec, TBl, TRtApi>: ProvideRuntimeApi<TBl>,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TBackend: sc_client_api::backend::Backend<TBl> + 'static,
TImpQu: ImportQueue<TBl> + 'static,
{
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !matches!(config.role, Role::Light),
pool: transaction_pool.clone(),
client: client.clone(),
});
let protocol_id = {
let protocol_id_full = match config.chain_spec.protocol_id() {
Some(pid) => pid,
None => {
warn!("Using default protocol ID {:?} because none is configured in the \
chain specs", DEFAULT_PROTOCOL_ID
);
DEFAULT_PROTOCOL_ID
}
}.as_bytes();
sc_network::config::ProtocolId::from(protocol_id_full)
};
let block_announce_validator = if let Some(f) = block_announce_validator_builder {
f(client.clone())
} else {
Box::new(DefaultBlockAnnounceValidator::new(client.clone()))
};
let network_params = sc_network::config::Params {
role: config.role.clone(),
executor: {
Some(Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", fut);
}))
},
network_config: config.network.clone(),
chain: client.clone(),
finality_proof_provider,
finality_proof_request_builder,
on_demand: on_demand.clone(),
transaction_pool: transaction_pool_adapter.clone() as _,
import_queue: Box::new(import_queue),
protocol_id,
block_announce_validator,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone())
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new()));
let future = build_network_future(
config.role.clone(),
network_mut,
client.clone(),
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
config.announce_block,
).boxed();
Ok((network, network_status_sinks, future))
}