Files
pezkuwi-subxt/templates/minimal/node/src/service.rs
T
Aaro Altonen 80616f6d03 Integrate litep2p into Polkadot SDK (#2944)
[litep2p](https://github.com/altonen/litep2p) is a libp2p-compatible P2P
networking library. It supports all of the features of `rust-libp2p`
that are currently being utilized by Polkadot SDK.

Compared to `rust-libp2p`, `litep2p` has a quite different architecture
which is why the new `litep2p` network backend is only able to use a
little of the existing code in `sc-network`. The design has been mainly
influenced by how we'd wish to structure our networking-related code in
Polkadot SDK: independent higher-levels protocols directly communicating
with the network over links that support bidirectional backpressure. A
good example would be `NotificationHandle`/`RequestResponseHandle`
abstractions which allow, e.g., `SyncingEngine` to directly communicate
with peers to announce/request blocks.

I've tried running `polkadot --network-backend litep2p` with a few
different peer configurations and there is a noticeable reduction in
networking CPU usage. For high load (`--out-peers 200`), networking CPU
usage goes down from ~110% to ~30% (80 pp) and for normal load
(`--out-peers 40`), the usage goes down from ~55% to ~18% (37 pp).

These should not be taken as final numbers because:

a) there are still some low-hanging optimization fruits, such as
enabling [receive window
auto-tuning](https://github.com/libp2p/rust-yamux/pull/176), integrating
`Peerset` more closely with `litep2p` or improving memory usage of the
WebSocket transport
b) fixing bugs/instabilities that incorrectly cause `litep2p` to do less
work will increase the networking CPU usage
c) verification in a more diverse set of tests/conditions is needed

Nevertheless, these numbers should give an early estimate for CPU usage
of the new networking backend.

This PR consists of three separate changes:
* introduce a generic `PeerId` (wrapper around `Multihash`) so that we
don't have use `NetworkService::PeerId` in every part of the code that
uses a `PeerId`
* introduce `NetworkBackend` trait, implement it for the libp2p network
stack and make Polkadot SDK generic over `NetworkBackend`
  * implement `NetworkBackend` for litep2p

The new library should be considered experimental which is why
`rust-libp2p` will remain as the default option for the time being. This
PR currently depends on the master branch of `litep2p` but I'll cut a
new release for the library once all review comments have been
addresses.

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Co-authored-by: Alexandru Vasile <alexandru.vasile@parity.io>
2024-04-08 16:44:13 +00:00

265 lines
7.7 KiB
Rust

// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::FutureExt;
use runtime::{self, interface::OpaqueBlock as Block, RuntimeApi};
use sc_client_api::backend::Backend;
use sc_executor::WasmExecutor;
use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;
use crate::cli::Consensus;
#[cfg(feature = "runtime-benchmarks")]
type HostFunctions =
(sp_io::SubstrateHostFunctions, frame_benchmarking::benchmarking::HostFunctions);
#[cfg(not(feature = "runtime-benchmarks"))]
type HostFunctions = sp_io::SubstrateHostFunctions;
pub(crate) type FullClient =
sc_service::TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
type FullBackend = sc_service::TFullBackend<Block>;
type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
/// Assembly of PartialComponents (enough to run chain ops subcommands)
pub type Service = sc_service::PartialComponents<
FullClient,
FullBackend,
FullSelectChain,
sc_consensus::DefaultImportQueue<Block>,
sc_transaction_pool::FullPool<Block, FullClient>,
Option<Telemetry>,
>;
pub fn new_partial(config: &Configuration) -> Result<Service, ServiceError> {
let telemetry = config
.telemetry_endpoints
.clone()
.filter(|x| !x.is_empty())
.map(|endpoints| -> Result<_, sc_telemetry::Error> {
let worker = TelemetryWorker::new(16)?;
let telemetry = worker.handle().new_telemetry(endpoints);
Ok((worker, telemetry))
})
.transpose()?;
let executor = sc_service::new_wasm_executor(&config);
let (client, backend, keystore_container, task_manager) =
sc_service::new_full_parts::<Block, RuntimeApi, _>(
config,
telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
executor,
)?;
let client = Arc::new(client);
let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager.spawn_handle().spawn("telemetry", None, worker.run());
telemetry
});
let select_chain = sc_consensus::LongestChain::new(backend.clone());
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_essential_handle(),
client.clone(),
);
let import_queue = sc_consensus_manual_seal::import_queue(
Box::new(client.clone()),
&task_manager.spawn_essential_handle(),
config.prometheus_registry(),
);
Ok(sc_service::PartialComponents {
client,
backend,
task_manager,
import_queue,
keystore_container,
select_chain,
transaction_pool,
other: (telemetry),
})
}
/// Builds a new service for a full client.
pub fn new_full<Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>>(
config: Configuration,
consensus: Consensus,
) -> Result<TaskManager, ServiceError> {
let sc_service::PartialComponents {
client,
backend,
mut task_manager,
import_queue,
keystore_container,
select_chain,
transaction_pool,
other: mut telemetry,
} = new_partial(&config)?;
let net_config = sc_network::config::FullNetworkConfiguration::<
Block,
<Block as BlockT>::Hash,
Network,
>::new(&config.network);
let metrics = Network::register_notification_metrics(
config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
);
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
net_config,
block_announce_validator_builder: None,
warp_sync_params: None,
block_relay: None,
metrics,
})?;
if config.offchain_worker.enabled {
task_manager.spawn_handle().spawn(
"offchain-workers-runner",
"offchain-worker",
sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
runtime_api_provider: client.clone(),
is_validator: config.role.is_authority(),
keystore: Some(keystore_container.keystore()),
offchain_db: backend.offchain_storage(),
transaction_pool: Some(OffchainTransactionPoolFactory::new(
transaction_pool.clone(),
)),
network_provider: Arc::new(network.clone()),
enable_http_requests: true,
custom_extensions: |_| vec![],
})
.run(client.clone(), task_manager.spawn_handle())
.boxed(),
);
}
let rpc_extensions_builder = {
let client = client.clone();
let pool = transaction_pool.clone();
Box::new(move |deny_unsafe, _| {
let deps =
crate::rpc::FullDeps { client: client.clone(), pool: pool.clone(), deny_unsafe };
crate::rpc::create_full(deps).map_err(Into::into)
})
};
let prometheus_registry = config.prometheus_registry().cloned();
let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
network,
client: client.clone(),
keystore: keystore_container.keystore(),
task_manager: &mut task_manager,
transaction_pool: transaction_pool.clone(),
rpc_builder: rpc_extensions_builder,
backend,
system_rpc_tx,
tx_handler_controller,
sync_service,
config,
telemetry: telemetry.as_mut(),
})?;
let proposer = sc_basic_authorship::ProposerFactory::new(
task_manager.spawn_handle(),
client.clone(),
transaction_pool.clone(),
prometheus_registry.as_ref(),
telemetry.as_ref().map(|x| x.handle()),
);
match consensus {
Consensus::InstantSeal => {
let params = sc_consensus_manual_seal::InstantSealParams {
block_import: client.clone(),
env: proposer,
client,
pool: transaction_pool,
select_chain,
consensus_data_provider: None,
create_inherent_data_providers: move |_, ()| async move {
Ok(sp_timestamp::InherentDataProvider::from_system_time())
},
};
let authorship_future = sc_consensus_manual_seal::run_instant_seal(params);
task_manager.spawn_essential_handle().spawn_blocking(
"instant-seal",
None,
authorship_future,
);
},
Consensus::ManualSeal(block_time) => {
let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
task_manager.spawn_handle().spawn("block_authoring", None, async move {
loop {
futures_timer::Delay::new(std::time::Duration::from_millis(block_time)).await;
sink.try_send(sc_consensus_manual_seal::EngineCommand::SealNewBlock {
create_empty: true,
finalize: true,
parent_hash: None,
sender: None,
})
.unwrap();
}
});
let params = sc_consensus_manual_seal::ManualSealParams {
block_import: client.clone(),
env: proposer,
client,
pool: transaction_pool,
select_chain,
commands_stream: Box::pin(commands_stream),
consensus_data_provider: None,
create_inherent_data_providers: move |_, ()| async move {
Ok(sp_timestamp::InherentDataProvider::from_system_time())
},
};
let authorship_future = sc_consensus_manual_seal::run_manual_seal(params);
task_manager.spawn_essential_handle().spawn_blocking(
"manual-seal",
None,
authorship_future,
);
},
}
network_starter.start_network();
Ok(task_manager)
}