Transaction pool: Remove futures-diagnose and thread pool (#9074)

* Transaction pool: Remove futures-diagnose and thread pool

This pr removes `futures-diagnose` as this isn't used anymore. Besides
that the pr also removes the thread pool that was used to validate the
transactions in the background. Instead of this thread pool we now spawn
two separate long running tasks that we use to validate the
transactions. All tasks of the transaction pool are now also spawned as
essential tasks. This means, if any of these tasks is stopping, the node
will stop as well.

* Update client/transaction-pool/src/api.rs
This commit is contained in:
Bastian Köcher
2021-06-11 18:24:30 +01:00
committed by GitHub
parent ed448ef28b
commit 68833498c6
8 changed files with 77 additions and 54 deletions
-17
View File
@@ -2067,22 +2067,6 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "futures-diagnose"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdcef58a173af8148b182684c9f2d5250875adbcaff7b5794073894f9d8634a9"
dependencies = [
"futures 0.1.31",
"futures 0.3.15",
"lazy_static",
"log",
"parking_lot 0.9.0",
"pin-project 0.4.27",
"serde",
"serde_json",
]
[[package]]
name = "futures-executor"
version = "0.3.15"
@@ -8215,7 +8199,6 @@ version = "3.0.0"
dependencies = [
"assert_matches",
"futures 0.3.15",
"futures-diagnose",
"hex",
"intervalier",
"log",
@@ -68,7 +68,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_handle(),
task_manager.spawn_essential_handle(),
client.clone(),
);
@@ -332,7 +332,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light(
config.transaction_pool.clone(),
config.prometheus_registry(),
task_manager.spawn_handle(),
task_manager.spawn_essential_handle(),
client.clone(),
on_demand.clone(),
));
+2 -2
View File
@@ -90,7 +90,7 @@ pub fn new_partial(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_handle(),
task_manager.spawn_essential_handle(),
client.clone(),
);
@@ -471,7 +471,7 @@ pub fn new_light_base(
let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light(
config.transaction_pool.clone(),
config.prometheus_registry(),
task_manager.spawn_handle(),
task_manager.spawn_essential_handle(),
client.clone(),
on_demand.clone(),
));
@@ -16,7 +16,6 @@ targets = ["x86_64-unknown-linux-gnu"]
codec = { package = "parity-scale-codec", version = "2.0.0" }
thiserror = "1.0.21"
futures = { version = "0.3.1", features = ["compat"] }
futures-diagnose = "1.0"
intervalier = "0.4.0"
log = "0.4.8"
parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] }
+48 -21
View File
@@ -21,7 +21,8 @@
use std::{marker::PhantomData, pin::Pin, sync::Arc};
use codec::{Decode, Encode};
use futures::{
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready},
channel::{oneshot, mpsc}, future::{Future, FutureExt, ready, Ready}, lock::Mutex, SinkExt,
StreamExt,
};
use sc_client_api::{
@@ -34,15 +35,36 @@ use sp_runtime::{
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use sp_api::{ProvideRuntimeApi, ApiExt};
use prometheus_endpoint::Registry as PrometheusRegistry;
use sp_core::traits::SpawnEssentialNamed;
use crate::{metrics::{ApiMetrics, ApiMetricsExt}, error::{self, Error}};
/// The transaction pool logic for full client.
pub struct FullChainApi<Client, Block> {
client: Arc<Client>,
pool: ThreadPool,
_marker: PhantomData<Block>,
metrics: Option<Arc<ApiMetrics>>,
validation_pool: Arc<Mutex<mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
}
/// Spawn a validation task that will be used by the transaction pool to validate transactions.
fn spawn_validation_pool_task(
name: &'static str,
receiver: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
spawner: &impl SpawnEssentialNamed,
) {
spawner.spawn_essential_blocking(
name,
async move {
loop {
let task = receiver.lock().await.next().await;
match task {
None => return,
Some(task) => task.await,
}
}
}.boxed(),
);
}
impl<Client, Block> FullChainApi<Client, Block> {
@@ -50,6 +72,7 @@ impl<Client, Block> FullChainApi<Client, Block> {
pub fn new(
client: Arc<Client>,
prometheus: Option<&PrometheusRegistry>,
spawner: &impl SpawnEssentialNamed,
) -> Self {
let metrics = prometheus.map(ApiMetrics::register).and_then(|r| {
match r {
@@ -65,13 +88,15 @@ impl<Client, Block> FullChainApi<Client, Block> {
}
});
let (sender, receiver) = mpsc::channel(0);
let receiver = Arc::new(Mutex::new(receiver));
spawn_validation_pool_task("transaction-pool-task-0", receiver.clone(), spawner);
spawn_validation_pool_task("transaction-pool-task-1", receiver, spawner);
FullChainApi {
client,
pool: ThreadPoolBuilder::new()
.pool_size(2)
.name_prefix("txpool-verifier")
.create()
.expect("Failed to spawn verifier threads, that are critical for node operation."),
validation_pool: Arc::new(Mutex::new(sender)),
_marker: Default::default(),
metrics,
}
@@ -105,27 +130,29 @@ where
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
let at = at.clone();
let validation_pool = self.validation_pool.clone();
let metrics = self.metrics.clone();
metrics.report(|m| m.validations_scheduled.inc());
self.pool.spawn_ok(futures_diagnose::diagnose(
"validate-transaction",
async move {
let res = validate_transaction_blocking(&*client, &at, source, uxt);
if let Err(e) = tx.send(res) {
log::warn!("Unable to send a validate transaction result: {:?}", e);
}
metrics.report(|m| m.validations_finished.inc());
},
));
async move {
metrics.report(|m| m.validations_scheduled.inc());
validation_pool.lock()
.await
.send(
async move {
let res = validate_transaction_blocking(&*client, &at, source, uxt);
let _ = tx.send(res);
metrics.report(|m| m.validations_finished.inc());
}.boxed()
)
.await
.map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
Box::pin(async move {
match rx.await {
Ok(r) => r,
Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
}
})
}.boxed()
}
fn block_id_to_number(
+14 -8
View File
@@ -42,7 +42,7 @@ use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero, Header as HeaderT},
};
use sp_core::traits::SpawnNamed;
use sp_core::traits::SpawnEssentialNamed;
use sp_transaction_pool::{
TransactionPool, PoolStatus, ImportNotificationStream, TxHash, TransactionFor,
TransactionStatusStreamFor, MaintainedTransactionPool, PoolFuture, ChainEvent,
@@ -195,20 +195,26 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
pool_api: Arc<PoolApi>,
prometheus: Option<&PrometheusRegistry>,
revalidation_type: RevalidationType,
spawner: impl SpawnNamed,
spawner: impl SpawnEssentialNamed,
best_block_number: NumberFor<Block>,
) -> Self {
let pool = Arc::new(sc_transaction_graph::Pool::new(options, is_validator, pool_api.clone()));
let (revalidation_queue, background_task) = match revalidation_type {
RevalidationType::Light => (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
RevalidationType::Light => (
revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()),
None,
),
RevalidationType::Full => {
let (queue, background) = revalidation::RevalidationQueue::new_background(pool_api.clone(), pool.clone());
let (queue, background) = revalidation::RevalidationQueue::new_background(
pool_api.clone(),
pool.clone(),
);
(queue, Some(background))
},
};
if let Some(background_task) = background_task {
spawner.spawn("txpool-background", background_task);
spawner.spawn_essential("txpool-background", background_task);
}
Self {
@@ -357,7 +363,7 @@ where
pub fn new_light(
options: sc_transaction_graph::Options,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnNamed,
spawner: impl SpawnEssentialNamed,
client: Arc<Client>,
fetcher: Arc<Fetcher>,
) -> Self {
@@ -393,10 +399,10 @@ where
options: sc_transaction_graph::Options,
is_validator: txpool::IsValidator,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnNamed,
spawner: impl SpawnEssentialNamed,
client: Arc<Client>,
) -> Arc<Self> {
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus));
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
let pool = Arc::new(Self::with_revalidation_type(
options,
is_validator,
@@ -910,7 +910,11 @@ fn should_not_accept_old_signatures() {
let client = Arc::new(substrate_test_runtime_client::new());
let pool = Arc::new(
BasicPool::new_test(Arc::new(FullChainApi::new(client, None))).0
BasicPool::new_test(Arc::new(FullChainApi::new(
client,
None,
&sp_core::testing::TaskExecutor::new(),
))).0
);
let transfer = Transfer {
@@ -946,7 +950,11 @@ fn import_notification_to_pool_maintain_works() {
let mut client = Arc::new(substrate_test_runtime_client::new());
let pool = Arc::new(
BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None))).0
BasicPool::new_test(Arc::new(FullChainApi::new(
client.clone(),
None,
&sp_core::testing::TaskExecutor::new(),
))).0
);
// Prepare the extrisic, push it to the pool and check that it was added.
+1 -1
View File
@@ -134,7 +134,7 @@ impl<T: ChainInfo> Node<T> {
config.transaction_pool.clone(),
true.into(),
config.prometheus_registry(),
task_manager.spawn_handle(),
task_manager.spawn_essential_handle(),
client.clone(),
);