Various small improvements to service construction. (#6738)

* Remove service components and add build_network, build_offchain_workers etc

* Improve transaction pool api

* Remove commented out line

* Add PartialComponents

* Add BuildNetworkParams, documentation

* Remove unused imports in tests

* Apply suggestions from code review

Co-authored-by: Nikolay Volf <nikvolf@gmail.com>

* Remove unused imports in node-bench

Co-authored-by: Nikolay Volf <nikvolf@gmail.com>
This commit is contained in:
Ashley
2020-07-28 15:21:33 +02:00
committed by GitHub
parent 97f400f0f0
commit 9220b646d2
17 changed files with 401 additions and 326 deletions
+1
View File
@@ -7039,6 +7039,7 @@ dependencies = [
"sp-core",
"sp-externalities",
"sp-finality-grandpa",
"sp-inherents",
"sp-io",
"sp-runtime",
"sp-session",
@@ -19,8 +19,8 @@ use crate::chain_spec;
use crate::cli::Cli;
use crate::service;
use sc_cli::{SubstrateCli, RuntimeVersion, Role, ChainSpec};
use sc_service::ServiceParams;
use crate::service::new_full_params;
use sc_service::PartialComponents;
use crate::service::new_partial;
impl SubstrateCli for Cli {
fn impl_name() -> String {
@@ -70,8 +70,8 @@ pub fn run() -> sc_cli::Result<()> {
Some(subcommand) => {
let runner = cli.create_runner(subcommand)?;
runner.run_subcommand(subcommand, |config| {
let (ServiceParams { client, backend, task_manager, import_queue, .. }, ..)
= new_full_params(config)?;
let PartialComponents { client, backend, task_manager, import_queue, .. }
= new_partial(&config)?;
Ok((client, backend, import_queue, task_manager))
})
}
+101 -80
View File
@@ -4,14 +4,12 @@ use std::sync::Arc;
use std::time::Duration;
use sc_client_api::{ExecutorProvider, RemoteBackend};
use node_template_runtime::{self, opaque::Block, RuntimeApi};
use sc_service::{error::Error as ServiceError, Configuration, ServiceComponents, TaskManager};
use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
use sp_inherents::InherentDataProviders;
use sc_executor::native_executor_instance;
pub use sc_executor::NativeExecutor;
use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair};
use sc_finality_grandpa::{
FinalityProofProvider as GrandpaFinalityProofProvider, StorageAndProofProvider, SharedVoterState,
};
use sc_finality_grandpa::{FinalityProofProvider as GrandpaFinalityProofProvider, SharedVoterState};
// Our native executor instance.
native_executor_instance!(
@@ -24,18 +22,15 @@ type FullClient = sc_service::TFullClient<Block, RuntimeApi, Executor>;
type FullBackend = sc_service::TFullBackend<Block>;
type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
pub fn new_full_params(config: Configuration) -> Result<(
sc_service::ServiceParams<
Block, FullClient,
sc_consensus_aura::AuraImportQueue<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient>,
(), FullBackend,
>,
FullSelectChain,
sp_inherents::InherentDataProviders,
sc_finality_grandpa::GrandpaBlockImport<FullBackend, Block, FullClient, FullSelectChain>,
sc_finality_grandpa::LinkHalf<Block, FullClient, FullSelectChain>
), ServiceError> {
pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponents<
FullClient, FullBackend, FullSelectChain,
sc_consensus_aura::AuraImportQueue<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient>,
(
sc_finality_grandpa::GrandpaBlockImport<FullBackend, Block, FullClient, FullSelectChain>,
sc_finality_grandpa::LinkHalf<Block, FullClient, FullSelectChain>
)
>, ServiceError> {
let inherent_data_providers = sp_inherents::InherentDataProviders::new();
let (client, backend, keystore, task_manager) =
@@ -44,12 +39,8 @@ pub fn new_full_params(config: Configuration) -> Result<(
let select_chain = sc_consensus::LongestChain::new(backend.clone());
let pool_api = sc_transaction_pool::FullChainApi::new(
client.clone(), config.prometheus_registry(),
);
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
std::sync::Arc::new(pool_api),
config.prometheus_registry(),
task_manager.spawn_handle(),
client.clone(),
@@ -74,56 +65,62 @@ pub fn new_full_params(config: Configuration) -> Result<(
config.prometheus_registry(),
)?;
let provider = client.clone() as Arc<dyn StorageAndProofProvider<_, _>>;
let finality_proof_provider =
Arc::new(GrandpaFinalityProofProvider::new(backend.clone(), provider));
let params = sc_service::ServiceParams {
backend, client, import_queue, keystore, task_manager, transaction_pool,
config,
block_announce_validator_builder: None,
finality_proof_request_builder: None,
finality_proof_provider: Some(finality_proof_provider),
on_demand: None,
remote_blockchain: None,
rpc_extensions_builder: Box::new(|_| ()),
};
Ok((
params, select_chain, inherent_data_providers,
grandpa_block_import, grandpa_link,
))
Ok(sc_service::PartialComponents {
client, backend, task_manager, import_queue, keystore, select_chain, transaction_pool,
inherent_data_providers,
other: (grandpa_block_import, grandpa_link),
})
}
/// Builds a new service for a full client.
pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
let (
params, select_chain, inherent_data_providers,
block_import, grandpa_link,
) = new_full_params(config)?;
let sc_service::PartialComponents {
client, backend, mut task_manager, import_queue, keystore, select_chain, transaction_pool,
inherent_data_providers,
other: (block_import, grandpa_link),
} = new_partial(&config)?;
let (
role, force_authoring, name, enable_grandpa, prometheus_registry,
client, transaction_pool, keystore,
) = {
let sc_service::ServiceParams {
config, client, transaction_pool, keystore, ..
} = &params;
let finality_proof_provider =
GrandpaFinalityProofProvider::new_for_service(backend.clone(), client.clone());
(
config.role.clone(),
config.force_authoring,
config.network.node_name.clone(),
!config.disable_grandpa,
config.prometheus_registry().cloned(),
let (network, network_status_sinks, system_rpc_tx) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
on_demand: None,
block_announce_validator_builder: None,
finality_proof_request_builder: None,
finality_proof_provider: Some(finality_proof_provider.clone()),
})?;
client.clone(), transaction_pool.clone(), keystore.clone(),
)
};
if config.offchain_worker.enabled {
sc_service::build_offchain_workers(
&config, backend.clone(), task_manager.spawn_handle(), client.clone(), network.clone(),
);
}
let ServiceComponents {
task_manager, network, telemetry_on_connect_sinks, ..
} = sc_service::build(params)?;
let role = config.role.clone();
let force_authoring = config.force_authoring;
let name = config.network.node_name.clone();
let enable_grandpa = !config.disable_grandpa;
let prometheus_registry = config.prometheus_registry().cloned();
let telemetry_connection_sinks = sc_service::TelemetryConnectionSinks::default();
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
network: network.clone(),
client: client.clone(),
keystore: keystore.clone(),
task_manager: &mut task_manager,
transaction_pool: transaction_pool.clone(),
telemetry_connection_sinks: telemetry_connection_sinks.clone(),
rpc_extensions_builder: Box::new(|_| ()),
on_demand: None,
remote_blockchain: None,
backend, network_status_sinks, system_rpc_tx, config,
})?;
if role.is_authority() {
let proposer = sc_basic_authorship::ProposerFactory::new(
@@ -183,7 +180,7 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
link: grandpa_link,
network,
inherent_data_providers,
telemetry_on_connect: Some(telemetry_on_connect_sinks.on_connect_stream()),
telemetry_on_connect: Some(telemetry_connection_sinks.on_connect_stream()),
voting_rule: sc_finality_grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
shared_voter_state: SharedVoterState::empty(),
@@ -208,18 +205,16 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
/// Builds a new service for a light client.
pub fn new_light(config: Configuration) -> Result<TaskManager, ServiceError> {
let (client, backend, keystore, task_manager, on_demand) =
let (client, backend, keystore, mut task_manager, on_demand) =
sc_service::new_light_parts::<Block, RuntimeApi, Executor>(&config)?;
let transaction_pool_api = Arc::new(sc_transaction_pool::LightChainApi::new(
client.clone(), on_demand.clone(),
));
let transaction_pool = sc_transaction_pool::BasicPool::new_light(
let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light(
config.transaction_pool.clone(),
transaction_pool_api,
config.prometheus_registry(),
task_manager.spawn_handle(),
);
client.clone(),
on_demand.clone(),
));
let grandpa_block_import = sc_finality_grandpa::light_block_import(
client.clone(), backend.clone(), &(client.clone() as Arc<_>),
@@ -241,16 +236,42 @@ pub fn new_light(config: Configuration) -> Result<TaskManager, ServiceError> {
)?;
let finality_proof_provider =
Arc::new(GrandpaFinalityProofProvider::new(backend.clone(), client.clone() as Arc<_>));
GrandpaFinalityProofProvider::new_for_service(backend.clone(), client.clone());
sc_service::build(sc_service::ServiceParams {
block_announce_validator_builder: None,
finality_proof_request_builder: Some(finality_proof_request_builder),
finality_proof_provider: Some(finality_proof_provider),
on_demand: Some(on_demand),
let (network, network_status_sinks, system_rpc_tx) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
on_demand: Some(on_demand.clone()),
block_announce_validator_builder: None,
finality_proof_request_builder: Some(finality_proof_request_builder),
finality_proof_provider: Some(finality_proof_provider),
})?;
if config.offchain_worker.enabled {
sc_service::build_offchain_workers(
&config, backend.clone(), task_manager.spawn_handle(), client.clone(), network.clone(),
);
}
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
remote_blockchain: Some(backend.remote_blockchain()),
transaction_pool,
task_manager: &mut task_manager,
on_demand: Some(on_demand),
rpc_extensions_builder: Box::new(|_| ()),
transaction_pool: Arc::new(transaction_pool),
config, client, import_queue, keystore, backend, task_manager
}).map(|ServiceComponents { task_manager, .. }| task_manager)
telemetry_connection_sinks: sc_service::TelemetryConnectionSinks::default(),
config,
client,
keystore,
backend,
network,
network_status_sinks,
system_rpc_tx,
})?;
Ok(task_manager)
}
+2 -3
View File
@@ -21,11 +21,11 @@
//! The goal of this benchmark is to figure out time needed to fill
//! the transaction pool for the next block.
use std::{borrow::Cow, sync::Arc};
use std::borrow::Cow;
use node_testing::bench::{BenchDb, Profile, BlockType, KeyTypes, DatabaseType};
use sc_transaction_pool::{BasicPool, FullChainApi};
use sc_transaction_pool::BasicPool;
use sp_runtime::generic::BlockId;
use sp_transaction_pool::{TransactionPool, TransactionSource};
@@ -74,7 +74,6 @@ impl core::Benchmark for PoolBenchmark {
let executor = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(context.client.clone(), None)),
None,
executor,
context.client.clone(),
+4 -4
View File
@@ -20,8 +20,8 @@ use crate::{chain_spec, service, Cli, Subcommand};
use node_executor::Executor;
use node_runtime::{Block, RuntimeApi};
use sc_cli::{Result, SubstrateCli, RuntimeVersion, Role, ChainSpec};
use sc_service::ServiceParams;
use crate::service::new_full_params;
use sc_service::PartialComponents;
use crate::service::new_partial;
impl SubstrateCli for Cli {
fn impl_name() -> String {
@@ -96,8 +96,8 @@ pub fn run() -> Result<()> {
Some(Subcommand::Base(subcommand)) => {
let runner = cli.create_runner(subcommand)?;
runner.run_subcommand(subcommand, |config| {
let (ServiceParams { client, backend, import_queue, task_manager, .. }, ..)
= new_full_params(config)?;
let PartialComponents { client, backend, task_manager, import_queue, ..}
= new_partial(&config)?;
Ok((client, backend, import_queue, task_manager))
})
}
+98 -77
View File
@@ -22,14 +22,12 @@
use std::sync::Arc;
use sc_consensus_babe;
use grandpa::{
self, FinalityProofProvider as GrandpaFinalityProofProvider, StorageAndProofProvider,
};
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use node_primitives::Block;
use node_runtime::RuntimeApi;
use sc_service::{
config::{Role, Configuration}, error::{Error as ServiceError},
RpcHandlers, ServiceComponents, TaskManager,
RpcHandlers, TaskManager,
};
use sp_inherents::InherentDataProviders;
use sc_network::{Event, NetworkService};
@@ -46,34 +44,28 @@ type FullGrandpaBlockImport =
grandpa::GrandpaBlockImport<FullBackend, Block, FullClient, FullSelectChain>;
type LightClient = sc_service::TLightClient<Block, RuntimeApi, Executor>;
pub fn new_full_params(config: Configuration) -> Result<(
sc_service::ServiceParams<
Block, FullClient,
sc_consensus_babe::BabeImportQueue<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient>, node_rpc::IoHandler,
FullBackend
>,
pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponents<
FullClient, FullBackend, FullSelectChain,
sc_consensus_babe::BabeImportQueue<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient>,
(
sc_consensus_babe::BabeBlockImport<Block, FullClient, FullGrandpaBlockImport>,
grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
sc_consensus_babe::BabeLink<Block>,
),
grandpa::SharedVoterState,
FullSelectChain,
InherentDataProviders
), ServiceError> {
impl Fn(node_rpc::DenyUnsafe) -> node_rpc::IoHandler,
(
sc_consensus_babe::BabeBlockImport<Block, FullClient, FullGrandpaBlockImport>,
grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
sc_consensus_babe::BabeLink<Block>,
),
grandpa::SharedVoterState,
)
>, ServiceError> {
let (client, backend, keystore, task_manager) =
sc_service::new_full_parts::<Block, RuntimeApi, Executor>(&config)?;
let client = Arc::new(client);
let select_chain = sc_consensus::LongestChain::new(backend.clone());
let pool_api = sc_transaction_pool::FullChainApi::new(
client.clone(), config.prometheus_registry(),
);
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
std::sync::Arc::new(pool_api),
config.prometheus_registry(),
task_manager.spawn_handle(),
client.clone(),
@@ -122,7 +114,7 @@ pub fn new_full_params(config: Configuration) -> Result<(
let select_chain = select_chain.clone();
let keystore = keystore.clone();
let rpc_extensions_builder = Box::new(move |deny_unsafe| {
let rpc_extensions_builder = move |deny_unsafe| {
let deps = node_rpc::FullDeps {
client: client.clone(),
pool: pool.clone(),
@@ -140,26 +132,16 @@ pub fn new_full_params(config: Configuration) -> Result<(
};
node_rpc::create_full(deps)
});
};
(rpc_extensions_builder, rpc_setup)
};
let provider = client.clone() as Arc<dyn grandpa::StorageAndProofProvider<_, _>>;
let finality_proof_provider =
Arc::new(grandpa::FinalityProofProvider::new(backend.clone(), provider));
let params = sc_service::ServiceParams {
config, backend, client, import_queue, keystore, task_manager, rpc_extensions_builder,
transaction_pool,
block_announce_validator_builder: None,
finality_proof_request_builder: None,
finality_proof_provider: Some(finality_proof_provider),
on_demand: None,
remote_blockchain: None,
};
Ok((params, import_setup, rpc_setup, select_chain, inherent_data_providers))
Ok(sc_service::PartialComponents {
client, backend, task_manager, keystore, select_chain, import_queue, transaction_pool,
inherent_data_providers,
other: (rpc_extensions_builder, import_setup, rpc_setup)
})
}
/// Creates a full service from the configuration.
@@ -174,31 +156,56 @@ pub fn new_full_base(
Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
Arc<sc_transaction_pool::FullPool<Block, FullClient>>,
), ServiceError> {
let (params, import_setup, rpc_setup, select_chain, inherent_data_providers)
= new_full_params(config)?;
let sc_service::PartialComponents {
client, backend, mut task_manager, import_queue, keystore, select_chain, transaction_pool,
inherent_data_providers,
other: (rpc_extensions_builder, import_setup, rpc_setup),
} = new_partial(&config)?;
let (
role, force_authoring, name, enable_grandpa, prometheus_registry,
client, transaction_pool, keystore,
) = {
let sc_service::ServiceParams {
config, client, transaction_pool, keystore, ..
} = &params;
let finality_proof_provider =
GrandpaFinalityProofProvider::new_for_service(backend.clone(), client.clone());
let (network, network_status_sinks, system_rpc_tx) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
on_demand: None,
block_announce_validator_builder: None,
finality_proof_request_builder: None,
finality_proof_provider: Some(finality_proof_provider.clone()),
})?;
(
config.role.clone(),
config.force_authoring,
config.network.node_name.clone(),
!config.disable_grandpa,
config.prometheus_registry().cloned(),
if config.offchain_worker.enabled {
sc_service::build_offchain_workers(
&config, backend.clone(), task_manager.spawn_handle(), client.clone(), network.clone(),
);
}
client.clone(), transaction_pool.clone(), keystore.clone(),
)
};
let role = config.role.clone();
let force_authoring = config.force_authoring;
let name = config.network.node_name.clone();
let enable_grandpa = !config.disable_grandpa;
let prometheus_registry = config.prometheus_registry().cloned();
let telemetry_connection_sinks = sc_service::TelemetryConnectionSinks::default();
let ServiceComponents {
task_manager, network, telemetry_on_connect_sinks, ..
} = sc_service::build(params)?;
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
config,
backend: backend.clone(),
client: client.clone(),
keystore: keystore.clone(),
network: network.clone(),
rpc_extensions_builder: Box::new(rpc_extensions_builder),
transaction_pool: transaction_pool.clone(),
task_manager: &mut task_manager,
on_demand: None,
remote_blockchain: None,
telemetry_connection_sinks: telemetry_connection_sinks.clone(),
network_status_sinks,
system_rpc_tx,
})?;
let (block_import, grandpa_link, babe_link) = import_setup;
let shared_voter_state = rpc_setup;
@@ -295,7 +302,7 @@ pub fn new_full_base(
link: grandpa_link,
network: network.clone(),
inherent_data_providers: inherent_data_providers.clone(),
telemetry_on_connect: Some(telemetry_on_connect_sinks.on_connect_stream()),
telemetry_on_connect: Some(telemetry_connection_sinks.on_connect_stream()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
shared_voter_state,
@@ -331,20 +338,17 @@ pub fn new_light_base(config: Configuration) -> Result<(
Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
Arc<sc_transaction_pool::LightPool<Block, LightClient, sc_network::config::OnDemand<Block>>>
), ServiceError> {
let (client, backend, keystore, task_manager, on_demand) =
let (client, backend, keystore, mut task_manager, on_demand) =
sc_service::new_light_parts::<Block, RuntimeApi, Executor>(&config)?;
let select_chain = sc_consensus::LongestChain::new(backend.clone());
let transaction_pool_api = Arc::new(sc_transaction_pool::LightChainApi::new(
client.clone(),
on_demand.clone(),
));
let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light(
config.transaction_pool.clone(),
transaction_pool_api,
config.prometheus_registry(),
task_manager.spawn_handle(),
client.clone(),
on_demand.clone(),
));
let grandpa_block_import = grandpa::light_block_import(
@@ -376,10 +380,27 @@ pub fn new_light_base(config: Configuration) -> Result<(
config.prometheus_registry(),
)?;
// GenesisAuthoritySetProvider is implemented for StorageAndProofProvider
let provider = client.clone() as Arc<dyn StorageAndProofProvider<_, _>>;
let finality_proof_provider =
Arc::new(GrandpaFinalityProofProvider::new(backend.clone(), provider));
GrandpaFinalityProofProvider::new_for_service(backend.clone(), client.clone());
let (network, network_status_sinks, system_rpc_tx) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
on_demand: Some(on_demand.clone()),
block_announce_validator_builder: None,
finality_proof_request_builder: Some(finality_proof_request_builder),
finality_proof_provider: Some(finality_proof_provider),
})?;
if config.offchain_worker.enabled {
sc_service::build_offchain_workers(
&config, backend.clone(), task_manager.spawn_handle(), client.clone(), network.clone(),
);
}
let light_deps = node_rpc::LightDeps {
remote_blockchain: backend.remote_blockchain(),
@@ -390,17 +411,17 @@ pub fn new_light_base(config: Configuration) -> Result<(
let rpc_extensions = node_rpc::create_light(light_deps);
let ServiceComponents { task_manager, rpc_handlers, network, .. } =
sc_service::build(sc_service::ServiceParams {
block_announce_validator_builder: None,
finality_proof_request_builder: Some(finality_proof_request_builder),
finality_proof_provider: Some(finality_proof_provider),
let rpc_handlers =
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
on_demand: Some(on_demand),
remote_blockchain: Some(backend.remote_blockchain()),
rpc_extensions_builder: Box::new(sc_service::NoopRpcExtensionBuilder(rpc_extensions)),
client: client.clone(),
transaction_pool: transaction_pool.clone(),
config, import_queue, keystore, backend, task_manager,
config, keystore, backend, network_status_sinks, system_rpc_tx,
network: network.clone(),
telemetry_connection_sinks: sc_service::TelemetryConnectionSinks::default(),
task_manager: &mut task_manager,
})?;
Ok((task_manager, rpc_handlers, client, network, transaction_pool))
+1 -1
View File
@@ -45,8 +45,8 @@ use sc_consensus_babe::{Config, Epoch};
use sc_consensus_babe_rpc::BabeRpcHandler;
use sc_finality_grandpa::{SharedVoterState, SharedAuthoritySet};
use sc_finality_grandpa_rpc::GrandpaRpcHandler;
use sc_rpc_api::DenyUnsafe;
use sp_block_builder::BlockBuilder;
pub use sc_rpc_api::DenyUnsafe;
/// Light client extra dependencies.
pub struct LightDeps<C, F, P> {
@@ -326,7 +326,7 @@ mod tests {
prelude::*, TestClientBuilder, runtime::{Extrinsic, Transfer}, TestClientBuilderExt,
};
use sp_transaction_pool::{ChainEvent, MaintainedTransactionPool, TransactionSource};
use sc_transaction_pool::{BasicPool, FullChainApi};
use sc_transaction_pool::BasicPool;
use sp_api::Core;
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::NumberFor;
@@ -361,7 +361,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
@@ -415,7 +414,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
@@ -451,7 +449,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
@@ -514,7 +511,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
@@ -34,7 +34,6 @@
//! # let spawner = sp_core::testing::TaskExecutor::new();
//! # let txpool = BasicPool::new_full(
//! # Default::default(),
//! # Arc::new(FullChainApi::new(client.clone(), None)),
//! # None,
//! # spawner,
//! # client.clone(),
@@ -169,6 +169,18 @@ impl<B, Block: BlockT> FinalityProofProvider<B, Block>
{
FinalityProofProvider { backend, authority_provider: Arc::new(authority_provider) }
}
/// Create new finality proof provider for the service using:
///
/// - backend for accessing blockchain data;
/// - storage_and_proof_provider, which is generally a client.
pub fn new_for_service(
backend: Arc<B>,
storage_and_proof_provider: Arc<dyn StorageAndProofProvider<Block, B>>,
) -> Arc<Self> {
Arc::new(Self::new(backend, storage_and_proof_provider))
}
}
impl<B, Block> sc_network::config::FinalityProofProvider<Block> for FinalityProofProvider<B, Block>
-1
View File
@@ -250,7 +250,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = TestPool(BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
-1
View File
@@ -64,7 +64,6 @@ impl Default for TestSetup {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
+1
View File
@@ -53,6 +53,7 @@ sp-session = { version = "2.0.0-rc5", path = "../../primitives/session" }
sp-state-machine = { version = "0.8.0-rc5", path = "../../primitives/state-machine" }
sp-application-crypto = { version = "2.0.0-rc5", path = "../../primitives/application-crypto" }
sp-consensus = { version = "0.8.0-rc5", path = "../../primitives/consensus/common" }
sp-inherents = { version = "2.0.0-rc5", path = "../../primitives/inherents" }
sc-network = { version = "0.8.0-rc5", path = "../network" }
sc-chain-spec = { version = "2.0.0-rc5", path = "../chain-spec" }
sc-light = { version = "2.0.0-rc5", path = "../light" }
+122 -106
View File
@@ -18,37 +18,35 @@
use crate::{
NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm,
TelemetryConnectionSinks, RpcHandlers, NetworkStatusSinks,
start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle,
status_sinks, metrics::MetricsService,
client::{light, Client, ClientConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig, OffchainWorkerConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig},
};
use sc_client_api::{
light::RemoteBlockchain, ForkBlocks, BadBlocks, UsageProvider, ExecutorProvider,
};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sc_chain_spec::get_extension;
use sp_consensus::{
block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator, Chain},
import_queue::ImportQueue,
};
use futures::{
Future, FutureExt, StreamExt,
future::ready,
};
use futures::{FutureExt, StreamExt, future::ready};
use jsonrpc_pubsub::manager::SubscriptionManager;
use sc_keystore::Store as Keystore;
use log::{info, warn, error};
use sc_network::config::{Role, FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
use sc_network::NetworkService;
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
Block as BlockT, SaturatedConversion, HashFor, Zero, BlockIdTo,
};
use sp_api::{ProvideRuntimeApi, CallApiAt};
use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo};
use std::{collections::HashMap, sync::Arc, pin::Pin};
use std::{collections::HashMap, sync::Arc};
use wasm_timer::SystemTime;
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::MaintainedTransactionPool;
@@ -63,7 +61,6 @@ use sc_client_api::{
execution_extensions::ExecutionExtensions
};
use sp_blockchain::{HeaderMetadata, HeaderBackend};
use crate::{ServiceComponents, TelemetryOnConnectSinks, RpcHandlers, NetworkStatusSinks};
/// A utility trait for building an RPC extension given a `DenyUnsafe` instance.
/// This is useful since at service definition time we don't know whether the
@@ -371,7 +368,7 @@ pub fn new_client<E, Block, RA>(
}
/// Parameters to pass into `build`.
pub struct ServiceParams<TBl: BlockT, TCl, TImpQu, TExPool, TRpc, Backend> {
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
/// The service configuration.
pub config: Configuration,
/// A shared client returned by `new_full_parts`/`new_light_parts`.
@@ -379,17 +376,11 @@ pub struct ServiceParams<TBl: BlockT, TCl, TImpQu, TExPool, TRpc, Backend> {
/// A shared backend returned by `new_full_parts`/`new_light_parts`.
pub backend: Arc<Backend>,
/// A task manager returned by `new_full_parts`/`new_light_parts`.
pub task_manager: TaskManager,
pub task_manager: &'a mut TaskManager,
/// A shared keystore returned by `new_full_parts`/`new_light_parts`.
pub keystore: Arc<RwLock<Keystore>>,
/// An optional, shared data fetcher for light clients.
pub on_demand: Option<Arc<OnDemand<TBl>>>,
/// An import queue.
pub import_queue: TImpQu,
/// An optional finality proof request builder.
pub finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<TBl>>,
/// An optional, shared finality proof request provider.
pub finality_proof_provider: Option<Arc<dyn FinalityProofProvider<TBl>>>,
/// A shared transaction pool.
pub transaction_pool: Arc<TExPool>,
/// A RPC extension builder. Use `NoopRpcExtensionBuilder` if you just want to pass in the
@@ -397,15 +388,61 @@ pub struct ServiceParams<TBl: BlockT, TCl, TImpQu, TExPool, TRpc, Backend> {
pub rpc_extensions_builder: Box<dyn RpcExtensionBuilder<Output = TRpc> + Send>,
/// An optional, shared remote blockchain instance. Used for light clients.
pub remote_blockchain: Option<Arc<dyn RemoteBlockchain<TBl>>>,
/// A block annouce validator builder.
pub block_announce_validator_builder:
Option<Box<dyn FnOnce(Arc<TCl>) -> Box<dyn BlockAnnounceValidator<TBl> + Send> + Send>>,
/// A shared network instance.
pub network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
/// Sinks to propagate network status updates.
pub network_status_sinks: NetworkStatusSinks<TBl>,
/// A Sender for RPC requests.
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
/// Shared Telemetry connection sinks,
pub telemetry_connection_sinks: TelemetryConnectionSinks,
}
/// Put together the components of a service from the parameters.
pub fn build<TBl, TBackend, TImpQu, TExPool, TRpc, TCl>(
builder: ServiceParams<TBl, TCl, TImpQu, TExPool, TRpc, TBackend>,
) -> Result<ServiceComponents<TBl, TBackend, TCl>, Error>
/// Build a shared offchain workers instance.
pub fn build_offchain_workers<TBl, TBackend, TCl>(
config: &Configuration,
backend: Arc<TBackend>,
spawn_handle: SpawnTaskHandle,
client: Arc<TCl>,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
) -> Option<Arc<sc_offchain::OffchainWorkers<TCl, TBackend::OffchainStorage, TBl>>>
where
TBl: BlockT, TBackend: sc_client_api::Backend<TBl>,
<TBackend as sc_client_api::Backend<TBl>>::OffchainStorage: 'static,
TCl: Send + Sync + ProvideRuntimeApi<TBl> + BlockchainEvents<TBl> + 'static,
<TCl as ProvideRuntimeApi<TBl>>::Api: sc_offchain::OffchainWorkerApi<TBl>,
{
let offchain_workers = match backend.offchain_storage() {
Some(db) => {
Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone(), db)))
},
None => {
warn!("Offchain workers disabled, due to lack of offchain storage support in backend.");
None
},
};
// Inform the offchain worker about new imported blocks
if let Some(offchain) = offchain_workers.clone() {
spawn_handle.spawn(
"offchain-notifications",
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
offchain,
Clone::clone(&spawn_handle),
network.clone()
)
);
}
offchain_workers
}
/// Spawn the tasks that are required to run a node.
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
) -> Result<Arc<RpcHandlers>, Error>
where
TCl: ProvideRuntimeApi<TBl> + HeaderMetadata<TBl, Error=sp_blockchain::Error> + Chain<TBl> +
BlockBackend<TBl> + BlockIdTo<TBl, Error=sp_blockchain::Error> + ProofProvider<TBl> +
@@ -421,26 +458,23 @@ pub fn build<TBl, TBackend, TImpQu, TExPool, TRpc, TCl>(
sp_api::ApiExt<TBl, StateBackend = TBackend::State>,
TBl: BlockT,
TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
TImpQu: 'static + ImportQueue<TBl>,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> +
MallocSizeOfWasm + 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata>
{
let ServiceParams {
let SpawnTasksParams {
mut config,
mut task_manager,
task_manager,
client,
on_demand,
backend,
keystore,
import_queue,
finality_proof_request_builder,
finality_proof_provider,
transaction_pool,
rpc_extensions_builder,
remote_blockchain,
block_announce_validator_builder,
} = builder;
network, network_status_sinks, system_rpc_tx,
telemetry_connection_sinks,
} = params;
let chain_info = client.usage_info().chain;
@@ -458,57 +492,14 @@ pub fn build<TBl, TBackend, TImpQu, TExPool, TRpc, TCl>(
"best" => ?chain_info.best_hash
);
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
let (network, network_status_sinks, network_future) = build_network(
&config, client.clone(), transaction_pool.clone(), task_manager.spawn_handle(),
on_demand.clone(), block_announce_validator_builder, finality_proof_request_builder,
finality_proof_provider, system_rpc_rx, import_queue
)?;
let spawn_handle = task_manager.spawn_handle();
// The network worker is responsible for gathering all network messages and processing
// them. This is quite a heavy task, and at the time of the writing of this comment it
// frequently happens that this future takes several seconds or in some situations
// even more than a minute until it has processed its entire queue. This is clearly an
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking("network-worker", network_future);
let offchain_storage = backend.offchain_storage();
let offchain_workers = match (config.offchain_worker.clone(), offchain_storage.clone()) {
(OffchainWorkerConfig {enabled: true, .. }, Some(db)) => {
Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone(), db)))
},
(OffchainWorkerConfig {enabled: true, .. }, None) => {
warn!("Offchain workers disabled, due to lack of offchain storage support in backend.");
None
},
_ => None,
};
// Inform the tx pool about imported and finalized blocks.
spawn_handle.spawn(
"txpool-notifications",
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);
// Inform the offchain worker about new imported blocks
if let Some(offchain) = offchain_workers.clone() {
spawn_handle.spawn(
"offchain-notifications",
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
offchain,
task_manager.spawn_handle(),
network.clone()
)
);
}
spawn_handle.spawn(
"on-transaction-imported",
transaction_notifications(transaction_pool.clone(), network.clone()),
@@ -545,14 +536,12 @@ pub fn build<TBl, TBackend, TImpQu, TExPool, TRpc, TCl>(
let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe| gen_handler(
deny_unsafe, &config, task_manager.spawn_handle(), client.clone(), transaction_pool.clone(),
keystore.clone(), on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder,
offchain_storage.clone(), system_rpc_tx.clone()
backend.offchain_storage(), system_rpc_tx.clone()
);
let rpc = start_rpc_servers(&config, gen_handler)?;
// This is used internally, so don't restrict access to unsafe RPC
let rpc_handlers = Arc::new(RpcHandlers(gen_handler(sc_rpc::DenyUnsafe::No)));
let telemetry_connection_sinks: Arc<Mutex<Vec<TracingUnboundedSender<()>>>> = Default::default();
// Telemetry
let telemetry = config.telemetry_endpoints.clone().and_then(|endpoints| {
if endpoints.is_empty() {
@@ -585,18 +574,14 @@ pub fn build<TBl, TBackend, TImpQu, TExPool, TRpc, TCl>(
// Spawn informant task
spawn_handle.spawn("informant", sc_informant::build(
client.clone(),
network_status_sinks.clone(),
network_status_sinks.clone().0,
transaction_pool.clone(),
config.informant_output_format,
));
task_manager.keep_alive((telemetry, config.base_path, rpc, rpc_handlers.clone()));
Ok(ServiceComponents {
task_manager, network, rpc_handlers, offchain_workers,
telemetry_on_connect_sinks: TelemetryOnConnectSinks(telemetry_connection_sinks),
network_status_sinks: NetworkStatusSinks::new(network_status_sinks),
})
Ok(rpc_handlers)
}
async fn transaction_notifications<TBl, TExPool>(
@@ -626,7 +611,7 @@ async fn telemetry_periodic_send<TBl, TExPool, TCl>(
client: Arc<TCl>,
transaction_pool: Arc<TExPool>,
mut metrics_service: MetricsService,
network_status_sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>
network_status_sinks: NetworkStatusSinks<TBl>,
)
where
TBl: BlockT,
@@ -634,7 +619,7 @@ async fn telemetry_periodic_send<TBl, TExPool, TCl>(
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>,
{
let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1");
network_status_sinks.push(std::time::Duration::from_millis(5000), state_tx);
network_status_sinks.0.push(std::time::Duration::from_millis(5000), state_tx);
state_rx.for_each(move |(net_status, _)| {
let info = client.usage_info();
metrics_service.tick(
@@ -647,11 +632,11 @@ async fn telemetry_periodic_send<TBl, TExPool, TCl>(
}
async fn telemetry_periodic_network_state<TBl: BlockT>(
network_status_sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>
network_status_sinks: NetworkStatusSinks<TBl>,
) {
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2");
network_status_sinks.push(std::time::Duration::from_secs(30), netstat_tx);
network_status_sinks.0.push(std::time::Duration::from_secs(30), netstat_tx);
netstat_rx.for_each(move |(_, network_state)| {
telemetry!(
SUBSTRATE_INFO;
@@ -665,7 +650,7 @@ async fn telemetry_periodic_network_state<TBl: BlockT>(
fn build_telemetry<TBl: BlockT>(
config: &mut Configuration,
endpoints: sc_telemetry::TelemetryEndpoints,
telemetry_connection_sinks: Arc<Mutex<Vec<TracingUnboundedSender<()>>>>,
telemetry_connection_sinks: TelemetryConnectionSinks,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
spawn_handle: SpawnTaskHandle,
genesis_hash: <TBl as BlockT>::Hash,
@@ -703,7 +688,7 @@ fn build_telemetry<TBl: BlockT>(
"network_id" => network_id.clone()
);
telemetry_connection_sinks.lock().retain(|sink| {
telemetry_connection_sinks.0.lock().retain(|sink| {
sink.unbounded_send(()).is_ok()
});
ready(())
@@ -805,24 +790,38 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
))
}
fn build_network<TBl, TExPool, TImpQu, TCl>(
config: &Configuration,
client: Arc<TCl>,
transaction_pool: Arc<TExPool>,
spawn_handle: SpawnTaskHandle,
on_demand: Option<Arc<OnDemand<TBl>>>,
block_announce_validator_builder: Option<Box<
/// Parameters to pass into `build_network`.
pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> {
/// The service configuration.
pub config: &'a Configuration,
/// A shared client returned by `new_full_parts`/`new_light_parts`.
pub client: Arc<TCl>,
/// A shared transaction pool.
pub transaction_pool: Arc<TExPool>,
/// A handle for spawning tasks.
pub spawn_handle: SpawnTaskHandle,
/// An import queue.
pub import_queue: TImpQu,
/// An optional, shared data fetcher for light clients.
pub on_demand: Option<Arc<OnDemand<TBl>>>,
/// A block annouce validator builder.
pub block_announce_validator_builder: Option<Box<
dyn FnOnce(Arc<TCl>) -> Box<dyn BlockAnnounceValidator<TBl> + Send> + Send
>>,
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<TBl>>,
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<TBl>>>,
system_rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<TBl>>,
import_queue: TImpQu
/// An optional finality proof request builder.
pub finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<TBl>>,
/// An optional, shared finality proof request provider.
pub finality_proof_provider: Option<Arc<dyn FinalityProofProvider<TBl>>>,
}
/// Build the network service, the network status sinks and an RPC sender.
pub fn build_network<TBl, TExPool, TImpQu, TCl>(
params: BuildNetworkParams<TBl, TExPool, TImpQu, TCl>
) -> Result<
(
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
Arc<status_sinks::StatusSinks<(NetworkStatus<TBl>, NetworkState)>>,
Pin<Box<dyn Future<Output = ()> + Send>>
NetworkStatusSinks<TBl>,
TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
),
Error
>
@@ -834,6 +833,11 @@ fn build_network<TBl, TExPool, TImpQu, TCl>(
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TImpQu: ImportQueue<TBl> + 'static,
{
let BuildNetworkParams {
config, client, transaction_pool, spawn_handle, import_queue, on_demand,
block_announce_validator_builder, finality_proof_request_builder, finality_proof_provider,
} = params;
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !matches!(config.role, Role::Light),
pool: transaction_pool,
@@ -862,6 +866,7 @@ fn build_network<TBl, TExPool, TImpQu, TCl>(
let network_params = sc_network::config::Params {
role: config.role.clone(),
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Some(Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", fut);
}))
@@ -881,7 +886,9 @@ fn build_network<TBl, TExPool, TImpQu, TCl>(
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(status_sinks::StatusSinks::new());
let network_status_sinks = NetworkStatusSinks::new(Arc::new(status_sinks::StatusSinks::new()));
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
let future = build_network_future(
config.role.clone(),
@@ -891,7 +898,16 @@ fn build_network<TBl, TExPool, TImpQu, TCl>(
system_rpc_rx,
has_bootnodes,
config.announce_block,
).boxed();
);
Ok((network, network_status_sinks, future))
// The network worker is responsible for gathering all network messages and processing
// them. This is quite a heavy task, and at the time of the writing of this comment it
// frequently happens that this future takes several seconds or in some situations
// even more than a minute until it has processed its entire queue. This is clearly an
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking("network-worker", future);
Ok((network, network_status_sinks, system_rpc_tx))
}
+30 -24
View File
@@ -52,8 +52,9 @@ use sp_utils::{status_sinks, mpsc::{tracing_unbounded, TracingUnboundedReceiver,
pub use self::error::Error;
pub use self::builder::{
new_full_client, new_client, new_full_parts, new_light_parts, build,
ServiceParams, TFullClient, TLightClient, TFullBackend, TLightBackend,
new_full_client, new_client, new_full_parts, new_light_parts,
spawn_tasks, build_network, BuildNetworkParams, build_offchain_workers,
SpawnTasksParams, TFullClient, TLightClient, TFullBackend, TLightBackend,
TLightBackendWithHash, TLightClientWithBackend,
TFullCallExecutor, TLightCallExecutor, RpcExtensionBuilder, NoopRpcExtensionBuilder,
};
@@ -79,7 +80,8 @@ pub use sc_tracing::TracingReceiver;
pub use task_manager::SpawnTaskHandle;
pub use task_manager::TaskManager;
pub use sp_consensus::import_queue::ImportQueue;
use sc_client_api::{Backend, BlockchainEvents};
use sc_client_api::BlockchainEvents;
pub use sc_keystore::KeyStorePtr as KeyStore;
const DEFAULT_PROTOCOL_ID: &str = "sup";
@@ -117,6 +119,7 @@ impl RpcHandlers {
/// Sinks to propagate network status updates.
/// For each element, every time the `Interval` fires we push an element on the sender.
#[derive(Clone)]
pub struct NetworkStatusSinks<Block: BlockT>(
Arc<status_sinks::StatusSinks<(NetworkStatus<Block>, NetworkState)>>,
);
@@ -138,9 +141,10 @@ impl<Block: BlockT> NetworkStatusSinks<Block> {
}
/// Sinks to propagate telemetry connection established events.
pub struct TelemetryOnConnectSinks(pub Arc<Mutex<Vec<TracingUnboundedSender<()>>>>);
#[derive(Default, Clone)]
pub struct TelemetryConnectionSinks(Arc<Mutex<Vec<TracingUnboundedSender<()>>>>);
impl TelemetryOnConnectSinks {
impl TelemetryConnectionSinks {
/// Get event stream for telemetry connection established events.
pub fn on_connect_stream(&self) -> TracingUnboundedReceiver<()> {
let (sink, stream) =tracing_unbounded("mpsc_telemetry_on_connect");
@@ -149,23 +153,26 @@ impl TelemetryOnConnectSinks {
}
}
/// The individual components of the chain, built by the service builder. You are encouraged to
/// deconstruct this into its fields.
pub struct ServiceComponents<TBl: BlockT, TBackend: Backend<TBl>, TCl> {
/// An imcomplete set of chain components, but enough to run the chain ops subcommands.
pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, TransactionPool, Other> {
/// A shared client instance.
pub client: Arc<Client>,
/// A shared backend instance.
pub backend: Arc<Backend>,
/// The chain task manager.
pub task_manager: TaskManager,
/// A shared network instance.
pub network: Arc<sc_network::NetworkService<TBl, <TBl as BlockT>::Hash>>,
/// RPC handlers that can perform RPC queries.
pub rpc_handlers: Arc<RpcHandlers>,
/// Sinks to propagate network status updates.
pub network_status_sinks: NetworkStatusSinks<TBl>,
/// Shared Telemetry connection sinks,
pub telemetry_on_connect_sinks: TelemetryOnConnectSinks,
/// A shared offchain workers instance.
pub offchain_workers: Option<Arc<sc_offchain::OffchainWorkers<
TCl, TBackend::OffchainStorage, TBl
>>>,
/// A shared keystore instance.
pub keystore: KeyStore,
/// A chain selection algorithm instance.
pub select_chain: SelectChain,
/// An import queue.
pub import_queue: ImportQueue,
/// A shared transaction pool.
pub transaction_pool: Arc<TransactionPool>,
/// A registry of all providers of `InherentData`.
pub inherent_data_providers: sp_inherents::InherentDataProviders,
/// Everything else that needs to be passed into the main build function.
pub other: Other,
}
/// Builds a never-ending future that continuously polls the network.
@@ -179,7 +186,7 @@ async fn build_network_future<
role: Role,
mut network: sc_network::NetworkWorker<B, H>,
client: Arc<C>,
status_sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>,
status_sinks: NetworkStatusSinks<B>,
mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
should_have_peers: bool,
announce_imported_blocks: bool,
@@ -308,7 +315,7 @@ async fn build_network_future<
// At a regular interval, we send the state of the network on what is called
// the "status sinks".
ready_sink = status_sinks.next().fuse() => {
ready_sink = status_sinks.0.next().fuse() => {
let status = NetworkStatus {
sync_state: network.sync_state(),
best_seen_block: network.best_seen_block(),
@@ -549,7 +556,7 @@ mod tests {
use sp_consensus::SelectChain;
use sp_runtime::traits::BlindCheckable;
use substrate_test_runtime_client::{prelude::*, runtime::{Extrinsic, Transfer}};
use sc_transaction_pool::{BasicPool, FullChainApi};
use sc_transaction_pool::BasicPool;
#[test]
fn should_not_propagate_transactions_that_are_marked_as_such() {
@@ -559,7 +566,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
+23 -14
View File
@@ -180,18 +180,6 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
)
}
/// Create new basic transaction pool for a light node with the provided api.
pub fn new_light(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnNamed,
) -> Self {
Self::with_revalidation_type(
options, pool_api, prometheus, RevalidationType::Light, spawner,
)
}
/// Create new basic transaction pool with provided api and custom
/// revalidation type.
pub fn with_revalidation_type(
@@ -342,7 +330,28 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
}
}
impl<Block, Client> BasicPool<FullChainApi<Client, Block>, Block>
impl<Block, Client, Fetcher> LightPool<Block, Client, Fetcher>
where
Block: BlockT,
Client: sp_blockchain::HeaderBackend<Block> + 'static,
Fetcher: sc_client_api::Fetcher<Block> + 'static,
{
/// Create new basic transaction pool for a light node with the provided api.
pub fn new_light(
options: sc_transaction_graph::Options,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnNamed,
client: Arc<Client>,
fetcher: Arc<Fetcher>,
) -> Self {
let pool_api = Arc::new(LightChainApi::new(client, fetcher));
Self::with_revalidation_type(
options, pool_api, prometheus, RevalidationType::Light, spawner,
)
}
}
impl<Block, Client> FullPool<Block, Client>
where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
@@ -355,11 +364,11 @@ where
/// Create new basic transaction pool for a full node with the provided api.
pub fn new_full(
options: sc_transaction_graph::Options,
pool_api: Arc<FullChainApi<Client, Block>>,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnNamed,
client: Arc<Client>,
) -> Arc<Self> {
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus));
let pool = Arc::new(Self::with_revalidation_type(
options, pool_api, prometheus, RevalidationType::Full, spawner
));
+1 -5
View File
@@ -289,7 +289,7 @@ mod tests {
use futures::executor::block_on;
use substrate_test_runtime_client::{runtime::Transfer, AccountKeyring};
use sc_transaction_pool::{BasicPool, FullChainApi};
use sc_transaction_pool::BasicPool;
use sp_runtime::{ApplyExtrinsicResult, transaction_validity::{TransactionValidityError, InvalidTransaction}};
#[test]
@@ -301,7 +301,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
@@ -341,7 +340,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
@@ -365,7 +363,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),
@@ -398,7 +395,6 @@ mod tests {
let spawner = sp_core::testing::TaskExecutor::new();
let pool = BasicPool::new_full(
Default::default(),
Arc::new(FullChainApi::new(client.clone(), None)),
None,
spawner,
client.clone(),