From 68833498c6285ba02e06f83a40ad5c9146b167b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 11 Jun 2021 18:24:30 +0100 Subject: [PATCH] Transaction pool: Remove futures-diagnose and thread pool (#9074) * Transaction pool: Remove futures-diagnose and thread pool This pr removes `futures-diagnose` as this isn't used anymore. Besides that the pr also removes the thread pool that was used to validate the transactions in the background. Instead of this thread pool we now spawn two separate long running tasks that we use to validate the transactions. All tasks of the transaction pool are now also spawned as essential tasks. This means, if any of these tasks is stopping, the node will stop as well. * Update client/transaction-pool/src/api.rs --- substrate/Cargo.lock | 17 ----- .../bin/node-template/node/src/service.rs | 4 +- substrate/bin/node/cli/src/service.rs | 4 +- substrate/client/transaction-pool/Cargo.toml | 1 - substrate/client/transaction-pool/src/api.rs | 69 +++++++++++++------ substrate/client/transaction-pool/src/lib.rs | 22 +++--- .../transaction-pool/src/testing/pool.rs | 12 +++- substrate/test-utils/test-runner/src/node.rs | 2 +- 8 files changed, 77 insertions(+), 54 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index dc2c67ad18..4572ed354a 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2067,22 +2067,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "futures-diagnose" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdcef58a173af8148b182684c9f2d5250875adbcaff7b5794073894f9d8634a9" -dependencies = [ - "futures 0.1.31", - "futures 0.3.15", - "lazy_static", - "log", - "parking_lot 0.9.0", - "pin-project 0.4.27", - "serde", - "serde_json", -] - [[package]] name = "futures-executor" version = "0.3.15" @@ -8215,7 +8199,6 @@ version = "3.0.0" dependencies = [ "assert_matches", "futures 0.3.15", - "futures-diagnose", "hex", "intervalier", "log", diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index 8ed9c1ee50..51b63e614f 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -68,7 +68,7 @@ pub fn new_partial(config: &Configuration) -> Result Result let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light( config.transaction_pool.clone(), config.prometheus_registry(), - task_manager.spawn_handle(), + task_manager.spawn_essential_handle(), client.clone(), on_demand.clone(), )); diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index a9ac2ac806..06e1fcc804 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -90,7 +90,7 @@ pub fn new_partial( config.transaction_pool.clone(), config.role.is_authority().into(), config.prometheus_registry(), - task_manager.spawn_handle(), + task_manager.spawn_essential_handle(), client.clone(), ); @@ -471,7 +471,7 @@ pub fn new_light_base( let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light( config.transaction_pool.clone(), config.prometheus_registry(), - task_manager.spawn_handle(), + task_manager.spawn_essential_handle(), client.clone(), on_demand.clone(), )); diff --git a/substrate/client/transaction-pool/Cargo.toml b/substrate/client/transaction-pool/Cargo.toml index d457d709d1..6b105520ba 100644 --- a/substrate/client/transaction-pool/Cargo.toml +++ b/substrate/client/transaction-pool/Cargo.toml @@ -16,7 +16,6 @@ targets = ["x86_64-unknown-linux-gnu"] codec = { package = "parity-scale-codec", version = "2.0.0" } thiserror = "1.0.21" futures = { version = "0.3.1", features = ["compat"] } -futures-diagnose = "1.0" intervalier = "0.4.0" log = "0.4.8" parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] } diff --git a/substrate/client/transaction-pool/src/api.rs b/substrate/client/transaction-pool/src/api.rs index 09864f7824..74e08c3aa0 100644 --- a/substrate/client/transaction-pool/src/api.rs +++ b/substrate/client/transaction-pool/src/api.rs @@ -21,7 +21,8 @@ use std::{marker::PhantomData, pin::Pin, sync::Arc}; use codec::{Decode, Encode}; use futures::{ - channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready}, + channel::{oneshot, mpsc}, future::{Future, FutureExt, ready, Ready}, lock::Mutex, SinkExt, + StreamExt, }; use sc_client_api::{ @@ -34,15 +35,36 @@ use sp_runtime::{ use sp_transaction_pool::runtime_api::TaggedTransactionQueue; use sp_api::{ProvideRuntimeApi, ApiExt}; use prometheus_endpoint::Registry as PrometheusRegistry; +use sp_core::traits::SpawnEssentialNamed; use crate::{metrics::{ApiMetrics, ApiMetricsExt}, error::{self, Error}}; /// The transaction pool logic for full client. pub struct FullChainApi { client: Arc, - pool: ThreadPool, _marker: PhantomData, metrics: Option>, + validation_pool: Arc + Send>>>>>, +} + +/// Spawn a validation task that will be used by the transaction pool to validate transactions. +fn spawn_validation_pool_task( + name: &'static str, + receiver: Arc + Send>>>>>, + spawner: &impl SpawnEssentialNamed, +) { + spawner.spawn_essential_blocking( + name, + async move { + loop { + let task = receiver.lock().await.next().await; + match task { + None => return, + Some(task) => task.await, + } + } + }.boxed(), + ); } impl FullChainApi { @@ -50,6 +72,7 @@ impl FullChainApi { pub fn new( client: Arc, prometheus: Option<&PrometheusRegistry>, + spawner: &impl SpawnEssentialNamed, ) -> Self { let metrics = prometheus.map(ApiMetrics::register).and_then(|r| { match r { @@ -65,13 +88,15 @@ impl FullChainApi { } }); + let (sender, receiver) = mpsc::channel(0); + + let receiver = Arc::new(Mutex::new(receiver)); + spawn_validation_pool_task("transaction-pool-task-0", receiver.clone(), spawner); + spawn_validation_pool_task("transaction-pool-task-1", receiver, spawner); + FullChainApi { client, - pool: ThreadPoolBuilder::new() - .pool_size(2) - .name_prefix("txpool-verifier") - .create() - .expect("Failed to spawn verifier threads, that are critical for node operation."), + validation_pool: Arc::new(Mutex::new(sender)), _marker: Default::default(), metrics, } @@ -105,27 +130,29 @@ where let (tx, rx) = oneshot::channel(); let client = self.client.clone(); let at = at.clone(); - + let validation_pool = self.validation_pool.clone(); let metrics = self.metrics.clone(); - metrics.report(|m| m.validations_scheduled.inc()); - self.pool.spawn_ok(futures_diagnose::diagnose( - "validate-transaction", - async move { - let res = validate_transaction_blocking(&*client, &at, source, uxt); - if let Err(e) = tx.send(res) { - log::warn!("Unable to send a validate transaction result: {:?}", e); - } - metrics.report(|m| m.validations_finished.inc()); - }, - )); + async move { + metrics.report(|m| m.validations_scheduled.inc()); + + validation_pool.lock() + .await + .send( + async move { + let res = validate_transaction_blocking(&*client, &at, source, uxt); + let _ = tx.send(res); + metrics.report(|m| m.validations_finished.inc()); + }.boxed() + ) + .await + .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?; - Box::pin(async move { match rx.await { Ok(r) => r, Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())), } - }) + }.boxed() } fn block_id_to_number( diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs index 0cd47f870d..15c75a554d 100644 --- a/substrate/client/transaction-pool/src/lib.rs +++ b/substrate/client/transaction-pool/src/lib.rs @@ -42,7 +42,7 @@ use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero, Header as HeaderT}, }; -use sp_core::traits::SpawnNamed; +use sp_core::traits::SpawnEssentialNamed; use sp_transaction_pool::{ TransactionPool, PoolStatus, ImportNotificationStream, TxHash, TransactionFor, TransactionStatusStreamFor, MaintainedTransactionPool, PoolFuture, ChainEvent, @@ -195,20 +195,26 @@ impl BasicPool pool_api: Arc, prometheus: Option<&PrometheusRegistry>, revalidation_type: RevalidationType, - spawner: impl SpawnNamed, + spawner: impl SpawnEssentialNamed, best_block_number: NumberFor, ) -> Self { let pool = Arc::new(sc_transaction_graph::Pool::new(options, is_validator, pool_api.clone())); let (revalidation_queue, background_task) = match revalidation_type { - RevalidationType::Light => (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None), + RevalidationType::Light => ( + revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), + None, + ), RevalidationType::Full => { - let (queue, background) = revalidation::RevalidationQueue::new_background(pool_api.clone(), pool.clone()); + let (queue, background) = revalidation::RevalidationQueue::new_background( + pool_api.clone(), + pool.clone(), + ); (queue, Some(background)) }, }; if let Some(background_task) = background_task { - spawner.spawn("txpool-background", background_task); + spawner.spawn_essential("txpool-background", background_task); } Self { @@ -357,7 +363,7 @@ where pub fn new_light( options: sc_transaction_graph::Options, prometheus: Option<&PrometheusRegistry>, - spawner: impl SpawnNamed, + spawner: impl SpawnEssentialNamed, client: Arc, fetcher: Arc, ) -> Self { @@ -393,10 +399,10 @@ where options: sc_transaction_graph::Options, is_validator: txpool::IsValidator, prometheus: Option<&PrometheusRegistry>, - spawner: impl SpawnNamed, + spawner: impl SpawnEssentialNamed, client: Arc, ) -> Arc { - let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus)); + let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner)); let pool = Arc::new(Self::with_revalidation_type( options, is_validator, diff --git a/substrate/client/transaction-pool/src/testing/pool.rs b/substrate/client/transaction-pool/src/testing/pool.rs index 999d1ab65e..675a58cd44 100644 --- a/substrate/client/transaction-pool/src/testing/pool.rs +++ b/substrate/client/transaction-pool/src/testing/pool.rs @@ -910,7 +910,11 @@ fn should_not_accept_old_signatures() { let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client, None))).0 + BasicPool::new_test(Arc::new(FullChainApi::new( + client, + None, + &sp_core::testing::TaskExecutor::new(), + ))).0 ); let transfer = Transfer { @@ -946,7 +950,11 @@ fn import_notification_to_pool_maintain_works() { let mut client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None))).0 + BasicPool::new_test(Arc::new(FullChainApi::new( + client.clone(), + None, + &sp_core::testing::TaskExecutor::new(), + ))).0 ); // Prepare the extrisic, push it to the pool and check that it was added. diff --git a/substrate/test-utils/test-runner/src/node.rs b/substrate/test-utils/test-runner/src/node.rs index ce41e5b5b5..00be12b651 100644 --- a/substrate/test-utils/test-runner/src/node.rs +++ b/substrate/test-utils/test-runner/src/node.rs @@ -134,7 +134,7 @@ impl Node { config.transaction_pool.clone(), true.into(), config.prometheus_registry(), - task_manager.spawn_handle(), + task_manager.spawn_essential_handle(), client.clone(), );