// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
//! Substrate service. Starts a thread that spins up the network, client, and extrinsic pool.
//! Manages communication between them.
#![warn(missing_docs)]
mod components;
mod chain_spec;
pub mod config;
pub mod chain_ops;
pub mod error;
use std::io;
use std::net::SocketAddr;
use std::collections::HashMap;
use std::time::Duration;
use futures::sync::mpsc;
use parking_lot::Mutex;
use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT};
use exit_future::Signal;
use futures::prelude::*;
use keystore::Store as Keystore;
use log::{info, warn, debug, error};
use parity_codec::{Encode, Decode};
use primitives::Pair;
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Header, NumberFor, SaturatedConversion};
use substrate_executor::NativeExecutor;
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use tel::{telemetry, SUBSTRATE_INFO};
pub use self::error::Error;
pub use config::{Configuration, Roles, PruningMode};
pub use chain_spec::{ChainSpec, Properties};
pub use transaction_pool::txpool::{
self, Pool as TransactionPool, Options as TransactionPoolOptions, ChainApi, IntoPoolError
};
pub use client::FinalityNotifications;
pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend,
LightExecutor, Components, PoolApi, ComponentClient, ComponentOffchainStorage,
ComponentBlock, FullClient, LightClient, FullComponents, LightComponents,
CodeExecutor, NetworkService, FactoryChainSpec, FactoryBlock,
FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis,
ComponentExHash, ComponentExtrinsic, FactoryExtrinsic
};
use components::{StartRPC, MaintainTransactionPool, OffchainWorker};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
#[doc(hidden)]
pub use network::{FinalityProofProvider, OnDemand};
#[doc(hidden)]
pub use futures::future::Executor;
const DEFAULT_PROTOCOL_ID: &str = "sup";
/// Substrate service.
pub struct Service {
client: Arc>,
select_chain: Option<::SelectChain>,
network: Arc>,
/// Sinks to propagate network status updates.
network_status_sinks: Arc>>>>>,
transaction_pool: Arc>,
keystore: Option,
exit: ::exit_future::Exit,
signal: Option,
/// Sender for futures that must be spawned as background tasks.
to_spawn_tx: mpsc::UnboundedSender + Send>>,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: mpsc::UnboundedReceiver + Send>>,
/// List of futures to poll from `poll`.
/// If spawning a background task is not possible, we instead push the task into this `Vec`.
/// The elements must then be polled manually.
to_poll: Vec + Send>>,
/// Configuration of this Service
pub config: FactoryFullConfiguration,
_rpc: Box,
_telemetry: Option,
_telemetry_on_connect_sinks: Arc>>>,
_offchain_workers: Option,
ComponentOffchainStorage,
ComponentBlock>
>>,
}
/// Creates bare client without any networking.
pub fn new_client(config: &FactoryFullConfiguration)
-> Result>>, error::Error>
{
let executor = NativeExecutor::new(config.default_heap_pages);
let (client, _) = components::FullComponents::::build_client(
config,
executor,
)?;
Ok(client)
}
/// An handle for spawning tasks in the service.
#[derive(Clone)]
pub struct SpawnTaskHandle {
sender: mpsc::UnboundedSender + Send>>,
}
impl Executor + Send>> for SpawnTaskHandle {
fn execute(
&self,
future: Box + Send>
) -> Result<(), futures::future::ExecuteError + Send>>> {
if let Err(err) = self.sender.unbounded_send(future) {
let kind = futures::future::ExecuteErrorKind::Shutdown;
Err(futures::future::ExecuteError::new(kind, err.into_inner()))
} else {
Ok(())
}
}
}
/// Stream of events for connection established to a telemetry server.
pub type TelemetryOnConnectNotifications = mpsc::UnboundedReceiver<()>;
/// Used to hook on telemetry connection established events.
pub struct TelemetryOnConnect {
/// Event stream.
pub telemetry_connection_sinks: TelemetryOnConnectNotifications,
}
impl Service {
/// Get event stream for telemetry connection established events.
pub fn telemetry_on_connect_stream(&self) -> TelemetryOnConnectNotifications {
let (sink, stream) = mpsc::unbounded();
self._telemetry_on_connect_sinks.lock().push(sink);
stream
}
/// Creates a new service.
pub fn new(
mut config: FactoryFullConfiguration,
) -> Result {
let (signal, exit) = ::exit_future::signal();
// List of asynchronous tasks to spawn. We collect them, then spawn them all at once.
let (to_spawn_tx, to_spawn_rx) =
mpsc::unbounded:: + Send>>();
// Create client
let executor = NativeExecutor::new(config.default_heap_pages);
let mut keystore = if let Some(keystore_path) = config.keystore_path.as_ref() {
match Keystore::open(keystore_path.clone()) {
Ok(ks) => Some(ks),
Err(err) => {
error!("Failed to initialize keystore: {}", err);
None
}
}
} else {
None
};
// Keep the public key for telemetry
let public_key: String;
// This is meant to be for testing only
// FIXME #1063 remove this
if let Some(keystore) = keystore.as_mut() {
for seed in &config.keys {
keystore.generate_from_seed(seed)?;
}
public_key = match keystore.contents()?.get(0) {
Some(public_key) => public_key.to_string(),
None => {
let key = keystore.generate(&config.password)?;
let public_key = key.public();
info!("Generated a new keypair: {:?}", public_key);
public_key.to_string()
}
}
} else {
public_key = format!("");
}
let (client, on_demand) = Components::build_client(&config, executor)?;
let select_chain = Components::build_select_chain(&mut config, client.clone())?;
let import_queue = Box::new(Components::build_import_queue(
&mut config,
client.clone(),
select_chain.clone(),
)?);
let finality_proof_provider = Components::build_finality_proof_provider(client.clone())?;
let chain_info = client.info().chain;
let version = config.full_version();
info!("Highest known block at #{}", chain_info.best_number);
telemetry!(SUBSTRATE_INFO; "node.start";
"height" => chain_info.best_number.saturated_into::(),
"best" => ?chain_info.best_hash
);
let network_protocol = ::build_network_protocol(&config)?;
let transaction_pool = Arc::new(
Components::build_transaction_pool(config.transaction_pool.clone(), client.clone())?
);
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter:: {
imports_external_transactions: !config.roles.is_light(),
pool: transaction_pool.clone(),
client: client.clone(),
});
let protocol_id = {
let protocol_id_full = match config.chain_spec.protocol_id() {
Some(pid) => pid,
None => {
warn!("Using default protocol ID {:?} because none is configured in the \
chain specs", DEFAULT_PROTOCOL_ID
);
DEFAULT_PROTOCOL_ID
}
}.as_bytes();
network::ProtocolId::from(protocol_id_full)
};
let network_params = network::config::Params {
roles: config.roles,
network_config: config.network.clone(),
chain: client.clone(),
finality_proof_provider,
on_demand,
transaction_pool: transaction_pool_adapter.clone() as _,
import_queue,
protocol_id,
specialization: network_protocol,
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(Mutex::new(Vec::new()));
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(network_mut, network_status_sinks.clone())
.map_err(|_| ())
.select(exit.clone())
.then(|_| Ok(()))));
#[allow(deprecated)]
let offchain_storage = client.backend().offchain_storage();
let offchain_workers = match (config.offchain_worker, offchain_storage) {
(true, Some(db)) => {
Some(Arc::new(offchain::OffchainWorkers::new(
client.clone(),
db,
)))
},
(true, None) => {
log::warn!("Offchain workers disabled, due to lack of offchain storage support in backend.");
None
},
_ => None,
};
{
// block notifications
let network = Arc::downgrade(&network);
let txpool = Arc::downgrade(&transaction_pool);
let wclient = Arc::downgrade(&client);
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
let to_spawn_tx_ = to_spawn_tx.clone();
let events = client.import_notification_stream()
.for_each(move |notification| {
let number = *notification.header.number();
if let Some(network) = network.upgrade() {
network.on_block_imported(notification.hash, notification.header);
}
if let (Some(txpool), Some(client)) = (txpool.upgrade(), wclient.upgrade()) {
Components::RuntimeServices::maintain_transaction_pool(
&BlockId::hash(notification.hash),
&*client,
&*txpool,
).map_err(|e| warn!("Pool error processing new block: {:?}", e))?;
}
if let (Some(txpool), Some(offchain)) = (txpool.upgrade(), offchain.as_ref().and_then(|o| o.upgrade())) {
let future = Components::RuntimeServices::offchain_workers(
&number,
&offchain,
&txpool,
).map_err(|e| warn!("Offchain workers error processing new block: {:?}", e))?;
let _ = to_spawn_tx_.unbounded_send(future);
}
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
{
// finality notifications
let network = Arc::downgrade(&network);
// A utility stream that drops all ready items and only returns the last one.
// This is used to only keep the last finality notification and avoid
// overloading the sync module with notifications.
struct MostRecentNotification(futures::stream::Fuse>);
impl Stream for MostRecentNotification {
type Item = as Stream>::Item;
type Error = as Stream>::Error;
fn poll(&mut self) -> Poll