// This file is part of Substrate.
// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
use crate::{
error::Error, MallocSizeOfWasm, RpcHandlers,
start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle,
metrics::MetricsService,
client::{light, Client, ClientConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig, TransactionStorageMode},
};
use sc_client_api::{
light::RemoteBlockchain, ForkBlocks, BadBlocks, UsageProvider, ExecutorProvider,
};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sc_chain_spec::get_extension;
use sp_consensus::{
block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator, Chain},
import_queue::ImportQueue,
};
use jsonrpc_pubsub::manager::SubscriptionManager;
use futures::{
FutureExt, StreamExt,
future::ready,
channel::oneshot,
};
use sc_keystore::LocalKeystore;
use log::info;
use sc_network::config::{Role, OnDemand, SyncMode};
use sc_network::NetworkService;
use sc_network::block_request_handler::{self, BlockRequestHandler};
use sc_network::state_request_handler::{self, StateRequestHandler};
use sc_network::light_client_requests::{self, handler::LightClientRequestHandler};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
Block as BlockT, HashFor, Zero, BlockIdTo,
};
use sp_api::{ProvideRuntimeApi, CallApiAt};
use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo};
use std::{sync::Arc, str::FromStr};
use wasm_timer::SystemTime;
use sc_telemetry::{
telemetry,
ConnectionMessage,
Telemetry,
TelemetryHandle,
SUBSTRATE_INFO,
};
use sc_transaction_pool_api::MaintainedTransactionPool;
use prometheus_endpoint::Registry;
use sc_client_db::{Backend, DatabaseSettings};
use sp_core::traits::{
CodeExecutor,
SpawnNamed,
};
use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::BuildStorage;
use sc_client_api::{
BlockBackend, BlockchainEvents,
StorageProvider,
proof_provider::ProofProvider,
execution_extensions::ExecutionExtensions
};
use sp_blockchain::{HeaderMetadata, HeaderBackend};
/// A utility trait for building an RPC extension given a `DenyUnsafe` instance.
/// This is useful since at service definition time we don't know whether the
/// specific interface where the RPC extension will be exposed is safe or not.
/// This trait allows us to lazily build the RPC extension whenever we bind the
/// service to an interface.
pub trait RpcExtensionBuilder {
/// The type of the RPC extension that will be built.
type Output: sc_rpc::RpcExtension;
/// Returns an instance of the RPC extension for a particular `DenyUnsafe`
/// value, e.g. the RPC extension might not expose some unsafe methods.
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output;
}
impl RpcExtensionBuilder for F where
F: Fn(sc_rpc::DenyUnsafe, sc_rpc::SubscriptionTaskExecutor) -> R,
R: sc_rpc::RpcExtension,
{
type Output = R;
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output {
(*self)(deny, subscription_executor)
}
}
/// A utility struct for implementing an `RpcExtensionBuilder` given a cloneable
/// `RpcExtension`, the resulting builder will simply ignore the provided
/// `DenyUnsafe` instance and return a static `RpcExtension` instance.
pub struct NoopRpcExtensionBuilder(pub R);
impl RpcExtensionBuilder for NoopRpcExtensionBuilder where
R: Clone + sc_rpc::RpcExtension,
{
type Output = R;
fn build(
&self,
_deny: sc_rpc::DenyUnsafe,
_subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output {
self.0.clone()
}
}
impl From for NoopRpcExtensionBuilder where
R: sc_rpc::RpcExtension,
{
fn from(e: R) -> NoopRpcExtensionBuilder {
NoopRpcExtensionBuilder(e)
}
}
/// Full client type.
pub type TFullClient = Client<
TFullBackend,
TFullCallExecutor,
TBl,
TRtApi,
>;
/// Full client backend type.
pub type TFullBackend = sc_client_db::Backend;
/// Full client call executor type.
pub type TFullCallExecutor = crate::client::LocalCallExecutor<
TBl,
sc_client_db::Backend,
NativeExecutor,
>;
/// Light client type.
pub type TLightClient = TLightClientWithBackend<
TBl, TRtApi, TExecDisp, TLightBackend
>;
/// Light client backend type.
pub type TLightBackend = sc_light::Backend<
sc_client_db::light::LightStorage,
HashFor,
>;
/// Light call executor type.
pub type TLightCallExecutor = sc_light::GenesisCallExecutor<
sc_light::Backend<
sc_client_db::light::LightStorage,
HashFor
>,
crate::client::LocalCallExecutor<
TBl,
sc_light::Backend<
sc_client_db::light::LightStorage,
HashFor
>,
NativeExecutor
>,
>;
type TFullParts = (
TFullClient,
Arc>,
KeystoreContainer,
TaskManager,
);
type TLightParts = (
Arc>,
Arc>,
KeystoreContainer,
TaskManager,
Arc>,
);
/// Light client backend type with a specific hash type.
pub type TLightBackendWithHash = sc_light::Backend<
sc_client_db::light::LightStorage,
THash,
>;
/// Light client type with a specific backend.
pub type TLightClientWithBackend = Client<
TBackend,
sc_light::GenesisCallExecutor<
TBackend,
crate::client::LocalCallExecutor>,
>,
TBl,
TRtApi,
>;
trait AsCryptoStoreRef {
fn keystore_ref(&self) -> Arc;
fn sync_keystore_ref(&self) -> Arc;
}
impl AsCryptoStoreRef for Arc where T: CryptoStore + SyncCryptoStore + 'static {
fn keystore_ref(&self) -> Arc {
self.clone()
}
fn sync_keystore_ref(&self) -> Arc {
self.clone()
}
}
/// Construct and hold different layers of Keystore wrappers
pub struct KeystoreContainer {
remote: Option>,
local: Arc,
}
impl KeystoreContainer {
/// Construct KeystoreContainer
pub fn new(config: &KeystoreConfig) -> Result {
let keystore = Arc::new(match config {
KeystoreConfig::Path { path, password } => LocalKeystore::open(
path.clone(),
password.clone(),
)?,
KeystoreConfig::InMemory => LocalKeystore::in_memory(),
});
Ok(Self{remote: Default::default(), local: keystore})
}
/// Set the remote keystore.
/// Should be called right away at startup and not at runtime:
/// even though this overrides any previously set remote store, it
/// does not reset any references previously handed out - they will
/// stick around.
pub fn set_remote_keystore(&mut self, remote: Arc)
where T: CryptoStore + SyncCryptoStore + 'static
{
self.remote = Some(Box::new(remote))
}
/// Returns an adapter to the asynchronous keystore that implements `CryptoStore`
pub fn keystore(&self) -> Arc {
if let Some(c) = self.remote.as_ref() {
c.keystore_ref()
} else {
self.local.clone()
}
}
/// Returns the synchronous keystore wrapper
pub fn sync_keystore(&self) -> SyncCryptoStorePtr {
if let Some(c) = self.remote.as_ref() {
c.sync_keystore_ref()
} else {
self.local.clone() as SyncCryptoStorePtr
}
}
/// Returns the local keystore if available
///
/// The function will return None if the available keystore is not a local keystore.
///
/// # Note
///
/// Using the [`LocalKeystore`] will result in loosing the ability to use any other keystore implementation, like
/// a remote keystore for example. Only use this if you a certain that you require it!
pub fn local_keystore(&self) -> Option> {
Some(self.local.clone())
}
}
/// Creates a new full client for the given config.
pub fn new_full_client(
config: &Configuration,
telemetry: Option,
) -> Result, Error> where
TBl: BlockT,
TExecDisp: NativeExecutionDispatch + 'static,
TBl::Hash: FromStr,
{
new_full_parts(config, telemetry).map(|parts| parts.0)
}
/// Create the initial parts of a full node.
pub fn new_full_parts(
config: &Configuration,
telemetry: Option,
) -> Result, Error> where
TBl: BlockT,
TExecDisp: NativeExecutionDispatch + 'static,
TBl::Hash: FromStr,
{
let keystore_container = KeystoreContainer::new(&config.keystore)?;
let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry)?
};
let executor = NativeExecutor::::new(
config.wasm_method,
config.default_heap_pages,
config.max_runtime_instances,
);
let chain_spec = &config.chain_spec;
let fork_blocks = get_extension::>(chain_spec.extensions())
.cloned()
.unwrap_or_default();
let bad_blocks = get_extension::>(chain_spec.extensions())
.cloned()
.unwrap_or_default();
let (client, backend) = {
let db_config = sc_client_db::DatabaseSettings {
state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
state_pruning: config.state_pruning.clone(),
source: config.database.clone(),
keep_blocks: config.keep_blocks.clone(),
transaction_storage: config.transaction_storage.clone(),
};
let backend = new_db_backend(db_config)?;
let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
config.execution_strategies.clone(),
Some(keystore_container.sync_keystore()),
sc_offchain::OffchainDb::factory_from_backend(&*backend),
);
let wasm_runtime_substitutes = config.chain_spec.code_substitutes().into_iter().map(|(h, c)| {
let hash = TBl::Hash::from_str(&h)
.map_err(|_|
Error::Application(Box::from(
format!("Failed to parse `{}` as block hash for code substitutes.", h)
))
)?;
Ok((hash, c))
}).collect::, Error>>()?;
let client = new_client(
backend.clone(),
executor,
chain_spec.as_storage_builder(),
fork_blocks,
bad_blocks,
extensions,
Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
telemetry,
ClientConfig {
offchain_worker_enabled : config.offchain_worker.enabled,
offchain_indexing_api: config.offchain_worker.indexing_enabled,
wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
no_genesis: matches!(config.network.sync_mode, sc_network::config::SyncMode::Fast {..}),
wasm_runtime_substitutes,
},
)?;
(client, backend)
};
Ok((
client,
backend,
keystore_container,
task_manager,
))
}
/// Create the initial parts of a light node.
pub fn new_light_parts(
config: &Configuration,
telemetry: Option,
) -> Result, Error> where
TBl: BlockT,
TExecDisp: NativeExecutionDispatch + 'static,
{
let keystore_container = KeystoreContainer::new(&config.keystore)?;
let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry)?
};
let executor = NativeExecutor::::new(
config.wasm_method,
config.default_heap_pages,
config.max_runtime_instances,
);
let db_storage = {
let db_settings = sc_client_db::DatabaseSettings {
state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
state_pruning: config.state_pruning.clone(),
source: config.database.clone(),
keep_blocks: config.keep_blocks.clone(),
transaction_storage: config.transaction_storage.clone(),
};
sc_client_db::light::LightStorage::new(db_settings)?
};
let light_blockchain = sc_light::new_light_blockchain(db_storage);
let fetch_checker = Arc::new(
sc_light::new_fetch_checker::<_, TBl, _>(
light_blockchain.clone(),
executor.clone(),
Box::new(task_manager.spawn_handle()),
),
);
let on_demand = Arc::new(sc_network::config::OnDemand::new(fetch_checker));
let backend = sc_light::new_light_backend(light_blockchain);
let client = Arc::new(light::new_light(
backend.clone(),
config.chain_spec.as_storage_builder(),
executor,
Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
telemetry,
)?);
Ok((client, backend, keystore_container, task_manager, on_demand))
}
/// Create an instance of default DB-backend backend.
pub fn new_db_backend(
settings: DatabaseSettings,
) -> Result>, sp_blockchain::Error> where
Block: BlockT,
{
const CANONICALIZATION_DELAY: u64 = 4096;
Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
}
/// Create an instance of client backed by given backend.
pub fn new_client(
backend: Arc>,
executor: E,
genesis_storage: &dyn BuildStorage,
fork_blocks: ForkBlocks,
bad_blocks: BadBlocks,
execution_extensions: ExecutionExtensions,
spawn_handle: Box,
prometheus_registry: Option,
telemetry: Option,
config: ClientConfig,
) -> Result<
crate::client::Client<
Backend,
crate::client::LocalCallExecutor, E>,
Block,
RA,
>,
sp_blockchain::Error,
>
where
Block: BlockT,
E: CodeExecutor + RuntimeInfo,
{
let executor = crate::client::LocalCallExecutor::new(backend.clone(), executor, spawn_handle, config.clone())?;
Ok(crate::client::Client::new(
backend,
executor,
genesis_storage,
fork_blocks,
bad_blocks,
execution_extensions,
prometheus_registry,
telemetry,
config,
)?)
}
/// Parameters to pass into `build`.
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
/// The service configuration.
pub config: Configuration,
/// A shared client returned by `new_full_parts`/`new_light_parts`.
pub client: Arc,
/// A shared backend returned by `new_full_parts`/`new_light_parts`.
pub backend: Arc,
/// A task manager returned by `new_full_parts`/`new_light_parts`.
pub task_manager: &'a mut TaskManager,
/// A shared keystore returned by `new_full_parts`/`new_light_parts`.
pub keystore: SyncCryptoStorePtr,
/// An optional, shared data fetcher for light clients.
pub on_demand: Option>>,
/// A shared transaction pool.
pub transaction_pool: Arc,
/// A RPC extension builder. Use `NoopRpcExtensionBuilder` if you just want to pass in the
/// extensions directly.
pub rpc_extensions_builder: Box + Send>,
/// An optional, shared remote blockchain instance. Used for light clients.
pub remote_blockchain: Option>>,
/// A shared network instance.
pub network: Arc::Hash>>,
/// A Sender for RPC requests.
pub system_rpc_tx: TracingUnboundedSender>,
/// Telemetry instance for this node.
pub telemetry: Option<&'a mut Telemetry>,
}
/// Build a shared offchain workers instance.
pub fn build_offchain_workers(
config: &Configuration,
spawn_handle: SpawnTaskHandle,
client: Arc,
network: Arc::Hash>>,
) -> Option>>
where
TBl: BlockT,
TCl: Send + Sync + ProvideRuntimeApi + BlockchainEvents + 'static,
>::Api: sc_offchain::OffchainWorkerApi,
{
let offchain_workers = Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone())));
// Inform the offchain worker about new imported blocks
if let Some(offchain) = offchain_workers.clone() {
spawn_handle.spawn(
"offchain-notifications",
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
offchain,
Clone::clone(&spawn_handle),
network.clone(),
)
);
}
offchain_workers
}
/// Spawn the tasks that are required to run a node.
pub fn spawn_tasks(
params: SpawnTasksParams,
) -> Result
where
TCl: ProvideRuntimeApi + HeaderMetadata + Chain +
BlockBackend + BlockIdTo + ProofProvider +
HeaderBackend + BlockchainEvents + ExecutorProvider + UsageProvider +
StorageProvider + CallApiAt + Send + 'static,
>::Api:
sp_api::Metadata +
sc_offchain::OffchainWorkerApi +
sp_transaction_pool::runtime_api::TaggedTransactionQueue +
sp_session::SessionKeys +
sp_api::ApiExt,
TBl: BlockT,
TBackend: 'static + sc_client_api::backend::Backend + Send,
TExPool: MaintainedTransactionPool::Hash> +
MallocSizeOfWasm + 'static,
TRpc: sc_rpc::RpcExtension
{
let SpawnTasksParams {
mut config,
task_manager,
client,
on_demand,
backend,
keystore,
transaction_pool,
rpc_extensions_builder,
remote_blockchain,
network,
system_rpc_tx,
telemetry,
} = params;
let chain_info = client.usage_info().chain;
sp_session::generate_initial_session_keys(
client.clone(),
&BlockId::Hash(chain_info.best_hash),
config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
).map_err(|e| Error::Application(Box::new(e)))?;
let telemetry = telemetry
.map(|telemetry| {
init_telemetry(
&mut config,
network.clone(),
client.clone(),
telemetry,
)
})
.transpose()?;
info!("📦 Highest known block at #{}", chain_info.best_number);
let spawn_handle = task_manager.spawn_handle();
// Inform the tx pool about imported and finalized blocks.
spawn_handle.spawn(
"txpool-notifications",
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);
spawn_handle.spawn(
"on-transaction-imported",
transaction_notifications(
transaction_pool.clone(),
network.clone(),
telemetry.clone(),
),
);
// Prometheus metrics.
let metrics_service = if let Some(PrometheusConfig { port, registry }) =
config.prometheus_config.clone()
{
// Set static metrics.
let metrics = MetricsService::with_prometheus(telemetry.clone(), ®istry, &config)?;
spawn_handle.spawn(
"prometheus-endpoint",
prometheus_endpoint::init_prometheus(port, registry).map(drop)
);
metrics
} else {
MetricsService::new(telemetry.clone())
};
// Periodically updated metrics and telemetry updates.
spawn_handle.spawn("telemetry-periodic-send",
metrics_service.run(
client.clone(),
transaction_pool.clone(),
network.clone(),
)
);
// RPC
let gen_handler = |
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware
| gen_handler(
deny_unsafe, rpc_middleware, &config, task_manager.spawn_handle(),
client.clone(), transaction_pool.clone(), keystore.clone(),
on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder,
backend.offchain_storage(), system_rpc_tx.clone()
);
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone())?;
// This is used internally, so don't restrict access to unsafe RPC
let rpc_handlers = RpcHandlers(Arc::new(gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics, "inbrowser")
).into()));
// Spawn informant task
spawn_handle.spawn("informant", sc_informant::build(
client.clone(),
network.clone(),
transaction_pool.clone(),
config.informant_output_format,
));
task_manager.keep_alive((config.base_path, rpc, rpc_handlers.clone()));
Ok(rpc_handlers)
}
async fn transaction_notifications(
transaction_pool: Arc,
network: Arc::Hash>>,
telemetry: Option,
)
where
TBl: BlockT,
TExPool: MaintainedTransactionPool::Hash>,
{
// transaction notifications
transaction_pool
.import_notification_stream()
.for_each(move |hash| {
network.propagate_transaction(hash);
let status = transaction_pool.status();
telemetry!(
telemetry;
SUBSTRATE_INFO;
"txpool.import";
"ready" => status.ready,
"future" => status.future,
);
ready(())
})
.await;
}
fn init_telemetry>(
config: &mut Configuration,
network: Arc::Hash>>,
client: Arc,
telemetry: &mut Telemetry,
) -> sc_telemetry::Result {
let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
let connection_message = ConnectionMessage {
name: config.network.node_name.to_owned(),
implementation: config.impl_name.to_owned(),
version: config.impl_version.to_owned(),
config: String::new(),
chain: config.chain_spec.name().to_owned(),
genesis_hash: format!("{:?}", genesis_hash),
authority: config.role.is_authority(),
startup_time: SystemTime::UNIX_EPOCH.elapsed()
.map(|dur| dur.as_millis())
.unwrap_or(0).to_string(),
network_id: network.local_peer_id().to_base58(),
};
telemetry.start_telemetry(connection_message)?;
Ok(telemetry.handle())
}
fn gen_handler(
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware,
config: &Configuration,
spawn_handle: SpawnTaskHandle,
client: Arc,
transaction_pool: Arc,
keystore: SyncCryptoStorePtr,
on_demand: Option>>,
remote_blockchain: Option>>,
rpc_extensions_builder: &(dyn RpcExtensionBuilder