diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index dc2c67ad18..4572ed354a 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2067,22 +2067,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "futures-diagnose" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdcef58a173af8148b182684c9f2d5250875adbcaff7b5794073894f9d8634a9" -dependencies = [ - "futures 0.1.31", - "futures 0.3.15", - "lazy_static", - "log", - "parking_lot 0.9.0", - "pin-project 0.4.27", - "serde", - "serde_json", -] - [[package]] name = "futures-executor" version = "0.3.15" @@ -8215,7 +8199,6 @@ version = "3.0.0" dependencies = [ "assert_matches", "futures 0.3.15", - "futures-diagnose", "hex", "intervalier", "log", diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index 8ed9c1ee50..51b63e614f 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -68,7 +68,7 @@ pub fn new_partial(config: &Configuration) -> Result Result let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light( config.transaction_pool.clone(), config.prometheus_registry(), - task_manager.spawn_handle(), + task_manager.spawn_essential_handle(), client.clone(), on_demand.clone(), )); diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index a9ac2ac806..06e1fcc804 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -90,7 +90,7 @@ pub fn new_partial( config.transaction_pool.clone(), config.role.is_authority().into(), config.prometheus_registry(), - task_manager.spawn_handle(), + task_manager.spawn_essential_handle(), client.clone(), ); @@ -471,7 +471,7 @@ pub fn new_light_base( let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light( config.transaction_pool.clone(), config.prometheus_registry(), - task_manager.spawn_handle(), + task_manager.spawn_essential_handle(), client.clone(), on_demand.clone(), )); diff --git a/substrate/client/transaction-pool/Cargo.toml b/substrate/client/transaction-pool/Cargo.toml index d457d709d1..6b105520ba 100644 --- a/substrate/client/transaction-pool/Cargo.toml +++ b/substrate/client/transaction-pool/Cargo.toml @@ -16,7 +16,6 @@ targets = ["x86_64-unknown-linux-gnu"] codec = { package = "parity-scale-codec", version = "2.0.0" } thiserror = "1.0.21" futures = { version = "0.3.1", features = ["compat"] } -futures-diagnose = "1.0" intervalier = "0.4.0" log = "0.4.8" parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] } diff --git a/substrate/client/transaction-pool/src/api.rs b/substrate/client/transaction-pool/src/api.rs index 09864f7824..74e08c3aa0 100644 --- a/substrate/client/transaction-pool/src/api.rs +++ b/substrate/client/transaction-pool/src/api.rs @@ -21,7 +21,8 @@ use std::{marker::PhantomData, pin::Pin, sync::Arc}; use codec::{Decode, Encode}; use futures::{ - channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready}, + channel::{oneshot, mpsc}, future::{Future, FutureExt, ready, Ready}, lock::Mutex, SinkExt, + StreamExt, }; use sc_client_api::{ @@ -34,15 +35,36 @@ use sp_runtime::{ use sp_transaction_pool::runtime_api::TaggedTransactionQueue; use sp_api::{ProvideRuntimeApi, ApiExt}; use prometheus_endpoint::Registry as PrometheusRegistry; +use sp_core::traits::SpawnEssentialNamed; use crate::{metrics::{ApiMetrics, ApiMetricsExt}, error::{self, Error}}; /// The transaction pool logic for full client. pub struct FullChainApi { client: Arc, - pool: ThreadPool, _marker: PhantomData, metrics: Option>, + validation_pool: Arc + Send>>>>>, +} + +/// Spawn a validation task that will be used by the transaction pool to validate transactions. +fn spawn_validation_pool_task( + name: &'static str, + receiver: Arc + Send>>>>>, + spawner: &impl SpawnEssentialNamed, +) { + spawner.spawn_essential_blocking( + name, + async move { + loop { + let task = receiver.lock().await.next().await; + match task { + None => return, + Some(task) => task.await, + } + } + }.boxed(), + ); } impl FullChainApi { @@ -50,6 +72,7 @@ impl FullChainApi { pub fn new( client: Arc, prometheus: Option<&PrometheusRegistry>, + spawner: &impl SpawnEssentialNamed, ) -> Self { let metrics = prometheus.map(ApiMetrics::register).and_then(|r| { match r { @@ -65,13 +88,15 @@ impl FullChainApi { } }); + let (sender, receiver) = mpsc::channel(0); + + let receiver = Arc::new(Mutex::new(receiver)); + spawn_validation_pool_task("transaction-pool-task-0", receiver.clone(), spawner); + spawn_validation_pool_task("transaction-pool-task-1", receiver, spawner); + FullChainApi { client, - pool: ThreadPoolBuilder::new() - .pool_size(2) - .name_prefix("txpool-verifier") - .create() - .expect("Failed to spawn verifier threads, that are critical for node operation."), + validation_pool: Arc::new(Mutex::new(sender)), _marker: Default::default(), metrics, } @@ -105,27 +130,29 @@ where let (tx, rx) = oneshot::channel(); let client = self.client.clone(); let at = at.clone(); - + let validation_pool = self.validation_pool.clone(); let metrics = self.metrics.clone(); - metrics.report(|m| m.validations_scheduled.inc()); - self.pool.spawn_ok(futures_diagnose::diagnose( - "validate-transaction", - async move { - let res = validate_transaction_blocking(&*client, &at, source, uxt); - if let Err(e) = tx.send(res) { - log::warn!("Unable to send a validate transaction result: {:?}", e); - } - metrics.report(|m| m.validations_finished.inc()); - }, - )); + async move { + metrics.report(|m| m.validations_scheduled.inc()); + + validation_pool.lock() + .await + .send( + async move { + let res = validate_transaction_blocking(&*client, &at, source, uxt); + let _ = tx.send(res); + metrics.report(|m| m.validations_finished.inc()); + }.boxed() + ) + .await + .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?; - Box::pin(async move { match rx.await { Ok(r) => r, Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())), } - }) + }.boxed() } fn block_id_to_number( diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs index 0cd47f870d..15c75a554d 100644 --- a/substrate/client/transaction-pool/src/lib.rs +++ b/substrate/client/transaction-pool/src/lib.rs @@ -42,7 +42,7 @@ use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero, Header as HeaderT}, }; -use sp_core::traits::SpawnNamed; +use sp_core::traits::SpawnEssentialNamed; use sp_transaction_pool::{ TransactionPool, PoolStatus, ImportNotificationStream, TxHash, TransactionFor, TransactionStatusStreamFor, MaintainedTransactionPool, PoolFuture, ChainEvent, @@ -195,20 +195,26 @@ impl BasicPool pool_api: Arc, prometheus: Option<&PrometheusRegistry>, revalidation_type: RevalidationType, - spawner: impl SpawnNamed, + spawner: impl SpawnEssentialNamed, best_block_number: NumberFor, ) -> Self { let pool = Arc::new(sc_transaction_graph::Pool::new(options, is_validator, pool_api.clone())); let (revalidation_queue, background_task) = match revalidation_type { - RevalidationType::Light => (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None), + RevalidationType::Light => ( + revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), + None, + ), RevalidationType::Full => { - let (queue, background) = revalidation::RevalidationQueue::new_background(pool_api.clone(), pool.clone()); + let (queue, background) = revalidation::RevalidationQueue::new_background( + pool_api.clone(), + pool.clone(), + ); (queue, Some(background)) }, }; if let Some(background_task) = background_task { - spawner.spawn("txpool-background", background_task); + spawner.spawn_essential("txpool-background", background_task); } Self { @@ -357,7 +363,7 @@ where pub fn new_light( options: sc_transaction_graph::Options, prometheus: Option<&PrometheusRegistry>, - spawner: impl SpawnNamed, + spawner: impl SpawnEssentialNamed, client: Arc, fetcher: Arc, ) -> Self { @@ -393,10 +399,10 @@ where options: sc_transaction_graph::Options, is_validator: txpool::IsValidator, prometheus: Option<&PrometheusRegistry>, - spawner: impl SpawnNamed, + spawner: impl SpawnEssentialNamed, client: Arc, ) -> Arc { - let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus)); + let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner)); let pool = Arc::new(Self::with_revalidation_type( options, is_validator, diff --git a/substrate/client/transaction-pool/src/testing/pool.rs b/substrate/client/transaction-pool/src/testing/pool.rs index 999d1ab65e..675a58cd44 100644 --- a/substrate/client/transaction-pool/src/testing/pool.rs +++ b/substrate/client/transaction-pool/src/testing/pool.rs @@ -910,7 +910,11 @@ fn should_not_accept_old_signatures() { let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client, None))).0 + BasicPool::new_test(Arc::new(FullChainApi::new( + client, + None, + &sp_core::testing::TaskExecutor::new(), + ))).0 ); let transfer = Transfer { @@ -946,7 +950,11 @@ fn import_notification_to_pool_maintain_works() { let mut client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None))).0 + BasicPool::new_test(Arc::new(FullChainApi::new( + client.clone(), + None, + &sp_core::testing::TaskExecutor::new(), + ))).0 ); // Prepare the extrisic, push it to the pool and check that it was added. diff --git a/substrate/test-utils/test-runner/src/node.rs b/substrate/test-utils/test-runner/src/node.rs index ce41e5b5b5..00be12b651 100644 --- a/substrate/test-utils/test-runner/src/node.rs +++ b/substrate/test-utils/test-runner/src/node.rs @@ -134,7 +134,7 @@ impl Node { config.transaction_pool.clone(), true.into(), config.prometheus_registry(), - task_manager.spawn_handle(), + task_manager.spawn_essential_handle(), client.clone(), );