// This file is part of Substrate.
// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
use crate::{
build_network_future,
client::{Client, ClientConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig, TransactionStorageMode},
error::Error,
metrics::MetricsService,
start_rpc_servers, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
};
use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
use jsonrpc_pubsub::manager::SubscriptionManager;
use log::info;
use prometheus_endpoint::Registry;
use sc_chain_spec::get_extension;
use sc_client_api::{
execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, StorageProvider, UsageProvider,
};
use sc_client_db::{Backend, DatabaseSettings};
use sc_consensus::import_queue::ImportQueue;
use sc_executor::RuntimeVersionOf;
use sc_keystore::LocalKeystore;
use sc_network::{
block_request_handler::{self, BlockRequestHandler},
config::{Role, SyncMode},
light_client_requests::{self, handler::LightClientRequestHandler},
state_request_handler::{self, StateRequestHandler},
warp_request_handler::{self, RequestHandler as WarpSyncRequestHandler, WarpSyncProvider},
NetworkService,
};
use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
use sc_transaction_pool_api::MaintainedTransactionPool;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_api::{CallApiAt, ProvideRuntimeApi};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::block_validation::{
BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
};
use sp_core::traits::{CodeExecutor, SpawnNamed};
use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, BlockIdTo, Zero},
BuildStorage,
};
use std::{str::FromStr, sync::Arc, time::SystemTime};
/// A utility trait for building an RPC extension given a `DenyUnsafe` instance.
/// This is useful since at service definition time we don't know whether the
/// specific interface where the RPC extension will be exposed is safe or not.
/// This trait allows us to lazily build the RPC extension whenever we bind the
/// service to an interface.
pub trait RpcExtensionBuilder {
/// The type of the RPC extension that will be built.
type Output: sc_rpc::RpcExtension;
/// Returns an instance of the RPC extension for a particular `DenyUnsafe`
/// value, e.g. the RPC extension might not expose some unsafe methods.
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Result;
}
impl RpcExtensionBuilder for F
where
F: Fn(sc_rpc::DenyUnsafe, sc_rpc::SubscriptionTaskExecutor) -> Result,
R: sc_rpc::RpcExtension,
{
type Output = R;
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Result {
(*self)(deny, subscription_executor)
}
}
/// A utility struct for implementing an `RpcExtensionBuilder` given a cloneable
/// `RpcExtension`, the resulting builder will simply ignore the provided
/// `DenyUnsafe` instance and return a static `RpcExtension` instance.
pub struct NoopRpcExtensionBuilder(pub R);
impl RpcExtensionBuilder for NoopRpcExtensionBuilder
where
R: Clone + sc_rpc::RpcExtension,
{
type Output = R;
fn build(
&self,
_deny: sc_rpc::DenyUnsafe,
_subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Result {
Ok(self.0.clone())
}
}
impl From for NoopRpcExtensionBuilder
where
R: sc_rpc::RpcExtension,
{
fn from(e: R) -> NoopRpcExtensionBuilder {
NoopRpcExtensionBuilder(e)
}
}
/// Full client type.
pub type TFullClient =
Client, TFullCallExecutor, TBl, TRtApi>;
/// Full client backend type.
pub type TFullBackend = sc_client_db::Backend;
/// Full client call executor type.
pub type TFullCallExecutor =
crate::client::LocalCallExecutor, TExec>;
type TFullParts =
(TFullClient, Arc>, KeystoreContainer, TaskManager);
trait AsCryptoStoreRef {
fn keystore_ref(&self) -> Arc;
fn sync_keystore_ref(&self) -> Arc;
}
impl AsCryptoStoreRef for Arc
where
T: CryptoStore + SyncCryptoStore + 'static,
{
fn keystore_ref(&self) -> Arc {
self.clone()
}
fn sync_keystore_ref(&self) -> Arc {
self.clone()
}
}
/// Construct and hold different layers of Keystore wrappers
pub struct KeystoreContainer {
remote: Option>,
local: Arc,
}
impl KeystoreContainer {
/// Construct KeystoreContainer
pub fn new(config: &KeystoreConfig) -> Result {
let keystore = Arc::new(match config {
KeystoreConfig::Path { path, password } =>
LocalKeystore::open(path.clone(), password.clone())?,
KeystoreConfig::InMemory => LocalKeystore::in_memory(),
});
Ok(Self { remote: Default::default(), local: keystore })
}
/// Set the remote keystore.
/// Should be called right away at startup and not at runtime:
/// even though this overrides any previously set remote store, it
/// does not reset any references previously handed out - they will
/// stick around.
pub fn set_remote_keystore(&mut self, remote: Arc)
where
T: CryptoStore + SyncCryptoStore + 'static,
{
self.remote = Some(Box::new(remote))
}
/// Returns an adapter to the asynchronous keystore that implements `CryptoStore`
pub fn keystore(&self) -> Arc {
if let Some(c) = self.remote.as_ref() {
c.keystore_ref()
} else {
self.local.clone()
}
}
/// Returns the synchronous keystore wrapper
pub fn sync_keystore(&self) -> SyncCryptoStorePtr {
if let Some(c) = self.remote.as_ref() {
c.sync_keystore_ref()
} else {
self.local.clone() as SyncCryptoStorePtr
}
}
/// Returns the local keystore if available
///
/// The function will return None if the available keystore is not a local keystore.
///
/// # Note
///
/// Using the [`LocalKeystore`] will result in loosing the ability to use any other keystore
/// implementation, like a remote keystore for example. Only use this if you a certain that you
/// require it!
pub fn local_keystore(&self) -> Option> {
Some(self.local.clone())
}
}
/// Creates a new full client for the given config.
pub fn new_full_client(
config: &Configuration,
telemetry: Option,
executor: TExec,
) -> Result, Error>
where
TBl: BlockT,
TExec: CodeExecutor + RuntimeVersionOf + Clone,
TBl::Hash: FromStr,
{
new_full_parts(config, telemetry, executor).map(|parts| parts.0)
}
/// Create the initial parts of a full node.
pub fn new_full_parts(
config: &Configuration,
telemetry: Option,
executor: TExec,
) -> Result, Error>
where
TBl: BlockT,
TExec: CodeExecutor + RuntimeVersionOf + Clone,
TBl::Hash: FromStr,
{
let keystore_container = KeystoreContainer::new(&config.keystore)?;
let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.tokio_handle.clone(), registry)?
};
let chain_spec = &config.chain_spec;
let fork_blocks = get_extension::>(chain_spec.extensions())
.cloned()
.unwrap_or_default();
let bad_blocks = get_extension::>(chain_spec.extensions())
.cloned()
.unwrap_or_default();
let (client, backend) = {
let db_config = sc_client_db::DatabaseSettings {
state_cache_size: config.state_cache_size,
state_cache_child_ratio: config.state_cache_child_ratio.map(|v| (v, 100)),
state_pruning: config.state_pruning.clone(),
source: config.database.clone(),
keep_blocks: config.keep_blocks.clone(),
transaction_storage: config.transaction_storage.clone(),
};
let backend = new_db_backend(db_config)?;
let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
config.execution_strategies.clone(),
Some(keystore_container.sync_keystore()),
sc_offchain::OffchainDb::factory_from_backend(&*backend),
);
let wasm_runtime_substitutes = config
.chain_spec
.code_substitutes()
.into_iter()
.map(|(h, c)| {
let hash = TBl::Hash::from_str(&h).map_err(|_| {
Error::Application(Box::from(format!(
"Failed to parse `{}` as block hash for code substitutes.",
h
)))
})?;
Ok((hash, c))
})
.collect::, Error>>()?;
let client = new_client(
backend.clone(),
executor,
chain_spec.as_storage_builder(),
fork_blocks,
bad_blocks,
extensions,
Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
telemetry,
ClientConfig {
offchain_worker_enabled: config.offchain_worker.enabled,
offchain_indexing_api: config.offchain_worker.indexing_enabled,
wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
no_genesis: matches!(
config.network.sync_mode,
sc_network::config::SyncMode::Fast { .. } | sc_network::config::SyncMode::Warp
),
wasm_runtime_substitutes,
},
)?;
(client, backend)
};
Ok((client, backend, keystore_container, task_manager))
}
/// Create an instance of default DB-backend backend.
pub fn new_db_backend(
settings: DatabaseSettings,
) -> Result>, sp_blockchain::Error>
where
Block: BlockT,
{
const CANONICALIZATION_DELAY: u64 = 4096;
Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
}
/// Create an instance of client backed by given backend.
pub fn new_client(
backend: Arc>,
executor: E,
genesis_storage: &dyn BuildStorage,
fork_blocks: ForkBlocks,
bad_blocks: BadBlocks,
execution_extensions: ExecutionExtensions,
spawn_handle: Box,
prometheus_registry: Option,
telemetry: Option,
config: ClientConfig,
) -> Result<
crate::client::Client<
Backend,
crate::client::LocalCallExecutor, E>,
Block,
RA,
>,
sp_blockchain::Error,
>
where
Block: BlockT,
E: CodeExecutor + RuntimeVersionOf,
{
let executor = crate::client::LocalCallExecutor::new(
backend.clone(),
executor,
spawn_handle,
config.clone(),
)?;
Ok(crate::client::Client::new(
backend,
executor,
genesis_storage,
fork_blocks,
bad_blocks,
execution_extensions,
prometheus_registry,
telemetry,
config,
)?)
}
/// Parameters to pass into `build`.
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
/// The service configuration.
pub config: Configuration,
/// A shared client returned by `new_full_parts`.
pub client: Arc,
/// A shared backend returned by `new_full_parts`.
pub backend: Arc,
/// A task manager returned by `new_full_parts`.
pub task_manager: &'a mut TaskManager,
/// A shared keystore returned by `new_full_parts`.
pub keystore: SyncCryptoStorePtr,
/// A shared transaction pool.
pub transaction_pool: Arc,
/// A RPC extension builder. Use `NoopRpcExtensionBuilder` if you just want to pass in the
/// extensions directly.
pub rpc_extensions_builder: Box + Send>,
/// A shared network instance.
pub network: Arc::Hash>>,
/// A Sender for RPC requests.
pub system_rpc_tx: TracingUnboundedSender>,
/// Telemetry instance for this node.
pub telemetry: Option<&'a mut Telemetry>,
}
/// Build a shared offchain workers instance.
pub fn build_offchain_workers(
config: &Configuration,
spawn_handle: SpawnTaskHandle,
client: Arc,
network: Arc::Hash>>,
) -> Option>>
where
TBl: BlockT,
TCl: Send + Sync + ProvideRuntimeApi + BlockchainEvents + 'static,
>::Api: sc_offchain::OffchainWorkerApi,
{
let offchain_workers = Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone())));
// Inform the offchain worker about new imported blocks
if let Some(offchain) = offchain_workers.clone() {
spawn_handle.spawn(
"offchain-notifications",
Some("offchain-worker"),
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
offchain,
Clone::clone(&spawn_handle),
network.clone(),
),
);
}
offchain_workers
}
/// Spawn the tasks that are required to run a node.
pub fn spawn_tasks(
params: SpawnTasksParams,
) -> Result
where
TCl: ProvideRuntimeApi
+ HeaderMetadata
+ Chain
+ BlockBackend
+ BlockIdTo
+ ProofProvider
+ HeaderBackend
+ BlockchainEvents
+ ExecutorProvider
+ UsageProvider
+ StorageProvider
+ CallApiAt
+ Send
+ 'static,
>::Api: sp_api::Metadata
+ sc_offchain::OffchainWorkerApi
+ sp_transaction_pool::runtime_api::TaggedTransactionQueue
+ sp_session::SessionKeys
+ sp_api::ApiExt,
TBl: BlockT,
TBl::Hash: Unpin,
TBl::Header: Unpin,
TBackend: 'static + sc_client_api::backend::Backend + Send,
TExPool: MaintainedTransactionPool::Hash>
+ parity_util_mem::MallocSizeOf
+ 'static,
TRpc: sc_rpc::RpcExtension,
{
let SpawnTasksParams {
mut config,
task_manager,
client,
backend,
keystore,
transaction_pool,
rpc_extensions_builder,
network,
system_rpc_tx,
telemetry,
} = params;
let chain_info = client.usage_info().chain;
sp_session::generate_initial_session_keys(
client.clone(),
&BlockId::Hash(chain_info.best_hash),
config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
)
.map_err(|e| Error::Application(Box::new(e)))?;
let telemetry = telemetry
.map(|telemetry| init_telemetry(&mut config, network.clone(), client.clone(), telemetry))
.transpose()?;
info!("📦 Highest known block at #{}", chain_info.best_number);
let spawn_handle = task_manager.spawn_handle();
// Inform the tx pool about imported and finalized blocks.
spawn_handle.spawn(
"txpool-notifications",
Some("transaction-pool"),
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);
spawn_handle.spawn(
"on-transaction-imported",
Some("transaction-pool"),
transaction_notifications(transaction_pool.clone(), network.clone(), telemetry.clone()),
);
// Prometheus metrics.
let metrics_service =
if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
// Set static metrics.
let metrics = MetricsService::with_prometheus(telemetry.clone(), ®istry, &config)?;
spawn_handle.spawn(
"prometheus-endpoint",
None,
prometheus_endpoint::init_prometheus(port, registry).map(drop),
);
metrics
} else {
MetricsService::new(telemetry.clone())
};
// Periodically updated metrics and telemetry updates.
spawn_handle.spawn(
"telemetry-periodic-send",
None,
metrics_service.run(client.clone(), transaction_pool.clone(), network.clone()),
);
// RPC
let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware| {
gen_handler(
deny_unsafe,
rpc_middleware,
&config,
task_manager.spawn_handle(),
client.clone(),
transaction_pool.clone(),
keystore.clone(),
&*rpc_extensions_builder,
backend.offchain_storage(),
system_rpc_tx.clone(),
)
};
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;
let server_metrics = sc_rpc_server::ServerMetrics::new(config.prometheus_registry())?;
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone(), server_metrics)?;
// This is used internally, so don't restrict access to unsafe RPC
let known_rpc_method_names =
sc_rpc_server::method_names(|m| gen_handler(sc_rpc::DenyUnsafe::No, m))?;
let rpc_handlers = RpcHandlers(Arc::new(
gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics, known_rpc_method_names, "inbrowser"),
)?
.into(),
));
// Spawn informant task
spawn_handle.spawn(
"informant",
None,
sc_informant::build(
client.clone(),
network.clone(),
transaction_pool.clone(),
config.informant_output_format,
),
);
task_manager.keep_alive((config.base_path, rpc, rpc_handlers.clone()));
Ok(rpc_handlers)
}
async fn transaction_notifications(
transaction_pool: Arc,
network: Arc::Hash>>,
telemetry: Option,
) where
TBl: BlockT,
TExPool: MaintainedTransactionPool::Hash>,
{
// transaction notifications
transaction_pool
.import_notification_stream()
.for_each(move |hash| {
network.propagate_transaction(hash);
let status = transaction_pool.status();
telemetry!(
telemetry;
SUBSTRATE_INFO;
"txpool.import";
"ready" => status.ready,
"future" => status.future,
);
ready(())
})
.await;
}
fn init_telemetry>(
config: &mut Configuration,
network: Arc::Hash>>,
client: Arc,
telemetry: &mut Telemetry,
) -> sc_telemetry::Result {
let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
let connection_message = ConnectionMessage {
name: config.network.node_name.to_owned(),
implementation: config.impl_name.to_owned(),
version: config.impl_version.to_owned(),
config: String::new(),
chain: config.chain_spec.name().to_owned(),
genesis_hash: format!("{:?}", genesis_hash),
authority: config.role.is_authority(),
startup_time: SystemTime::UNIX_EPOCH
.elapsed()
.map(|dur| dur.as_millis())
.unwrap_or(0)
.to_string(),
network_id: network.local_peer_id().to_base58(),
};
telemetry.start_telemetry(connection_message)?;
Ok(telemetry.handle())
}
fn gen_handler(
deny_unsafe: sc_rpc::DenyUnsafe,
rpc_middleware: sc_rpc_server::RpcMiddleware,
config: &Configuration,
spawn_handle: SpawnTaskHandle,
client: Arc,
transaction_pool: Arc,
keystore: SyncCryptoStorePtr,
rpc_extensions_builder: &(dyn RpcExtensionBuilder