mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-20 07:01:02 +00:00
Use SpawnTaskHandles for spawning tasks in the tx pool (#8958)
* Remove futures-diagnose * Use `SpawnTaskHandle`s for spawning tasks in the tx pool * Box the spawner * Fix tests * Use the testing task executor
This commit is contained in:
Generated
-17
@@ -2059,22 +2059,6 @@ dependencies = [
|
|||||||
"num_cpus",
|
"num_cpus",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "futures-diagnose"
|
|
||||||
version = "1.0.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "fdcef58a173af8148b182684c9f2d5250875adbcaff7b5794073894f9d8634a9"
|
|
||||||
dependencies = [
|
|
||||||
"futures 0.1.31",
|
|
||||||
"futures 0.3.13",
|
|
||||||
"lazy_static",
|
|
||||||
"log",
|
|
||||||
"parking_lot 0.9.0",
|
|
||||||
"pin-project 0.4.27",
|
|
||||||
"serde",
|
|
||||||
"serde_json",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-executor"
|
name = "futures-executor"
|
||||||
version = "0.3.13"
|
version = "0.3.13"
|
||||||
@@ -8116,7 +8100,6 @@ version = "3.0.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"assert_matches",
|
"assert_matches",
|
||||||
"futures 0.3.13",
|
"futures 0.3.13",
|
||||||
"futures-diagnose",
|
|
||||||
"hex",
|
"hex",
|
||||||
"intervalier",
|
"intervalier",
|
||||||
"log",
|
"log",
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ targets = ["x86_64-unknown-linux-gnu"]
|
|||||||
codec = { package = "parity-scale-codec", version = "2.0.0" }
|
codec = { package = "parity-scale-codec", version = "2.0.0" }
|
||||||
thiserror = "1.0.21"
|
thiserror = "1.0.21"
|
||||||
futures = { version = "0.3.1", features = ["compat"] }
|
futures = { version = "0.3.1", features = ["compat"] }
|
||||||
futures-diagnose = "1.0"
|
|
||||||
intervalier = "0.4.0"
|
intervalier = "0.4.0"
|
||||||
log = "0.4.8"
|
log = "0.4.8"
|
||||||
parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] }
|
parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] }
|
||||||
|
|||||||
@@ -21,7 +21,7 @@
|
|||||||
use std::{marker::PhantomData, pin::Pin, sync::Arc};
|
use std::{marker::PhantomData, pin::Pin, sync::Arc};
|
||||||
use codec::{Decode, Encode};
|
use codec::{Decode, Encode};
|
||||||
use futures::{
|
use futures::{
|
||||||
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready},
|
channel::oneshot, future::{Future, FutureExt, ready, Ready},
|
||||||
};
|
};
|
||||||
|
|
||||||
use sc_client_api::{
|
use sc_client_api::{
|
||||||
@@ -31,6 +31,7 @@ use sp_runtime::{
|
|||||||
generic::BlockId, traits::{self, Block as BlockT, BlockIdTo, Header as HeaderT, Hash as HashT},
|
generic::BlockId, traits::{self, Block as BlockT, BlockIdTo, Header as HeaderT, Hash as HashT},
|
||||||
transaction_validity::{TransactionValidity, TransactionSource},
|
transaction_validity::{TransactionValidity, TransactionSource},
|
||||||
};
|
};
|
||||||
|
use sp_core::traits::SpawnNamed;
|
||||||
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
|
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
|
||||||
use sp_api::{ProvideRuntimeApi, ApiExt};
|
use sp_api::{ProvideRuntimeApi, ApiExt};
|
||||||
use prometheus_endpoint::Registry as PrometheusRegistry;
|
use prometheus_endpoint::Registry as PrometheusRegistry;
|
||||||
@@ -40,7 +41,7 @@ use crate::{metrics::{ApiMetrics, ApiMetricsExt}, error::{self, Error}};
|
|||||||
/// The transaction pool logic for full client.
|
/// The transaction pool logic for full client.
|
||||||
pub struct FullChainApi<Client, Block> {
|
pub struct FullChainApi<Client, Block> {
|
||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
pool: ThreadPool,
|
spawner: Box<dyn SpawnNamed>,
|
||||||
_marker: PhantomData<Block>,
|
_marker: PhantomData<Block>,
|
||||||
metrics: Option<Arc<ApiMetrics>>,
|
metrics: Option<Arc<ApiMetrics>>,
|
||||||
}
|
}
|
||||||
@@ -50,6 +51,7 @@ impl<Client, Block> FullChainApi<Client, Block> {
|
|||||||
pub fn new(
|
pub fn new(
|
||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
prometheus: Option<&PrometheusRegistry>,
|
prometheus: Option<&PrometheusRegistry>,
|
||||||
|
spawner: impl SpawnNamed + 'static,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let metrics = prometheus.map(ApiMetrics::register).and_then(|r| {
|
let metrics = prometheus.map(ApiMetrics::register).and_then(|r| {
|
||||||
match r {
|
match r {
|
||||||
@@ -67,13 +69,9 @@ impl<Client, Block> FullChainApi<Client, Block> {
|
|||||||
|
|
||||||
FullChainApi {
|
FullChainApi {
|
||||||
client,
|
client,
|
||||||
pool: ThreadPoolBuilder::new()
|
|
||||||
.pool_size(2)
|
|
||||||
.name_prefix("txpool-verifier")
|
|
||||||
.create()
|
|
||||||
.expect("Failed to spawn verifier threads, that are critical for node operation."),
|
|
||||||
_marker: Default::default(),
|
_marker: Default::default(),
|
||||||
metrics,
|
metrics,
|
||||||
|
spawner: Box::new(spawner) ,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -109,9 +107,9 @@ where
|
|||||||
let metrics = self.metrics.clone();
|
let metrics = self.metrics.clone();
|
||||||
metrics.report(|m| m.validations_scheduled.inc());
|
metrics.report(|m| m.validations_scheduled.inc());
|
||||||
|
|
||||||
self.pool.spawn_ok(futures_diagnose::diagnose(
|
self.spawner.spawn_blocking(
|
||||||
"validate-transaction",
|
"validate-transaction",
|
||||||
async move {
|
Box::pin(async move {
|
||||||
let res = validate_transaction_blocking(&*client, &at, source, uxt);
|
let res = validate_transaction_blocking(&*client, &at, source, uxt);
|
||||||
if let Err(e) = tx.send(res) {
|
if let Err(e) = tx.send(res) {
|
||||||
log::warn!("Unable to send a validate transaction result: {:?}", e);
|
log::warn!("Unable to send a validate transaction result: {:?}", e);
|
||||||
|
|||||||
@@ -366,10 +366,10 @@ where
|
|||||||
options: sc_transaction_graph::Options,
|
options: sc_transaction_graph::Options,
|
||||||
is_validator: txpool::IsValidator,
|
is_validator: txpool::IsValidator,
|
||||||
prometheus: Option<&PrometheusRegistry>,
|
prometheus: Option<&PrometheusRegistry>,
|
||||||
spawner: impl SpawnNamed,
|
spawner: impl SpawnNamed + Clone + 'static,
|
||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus));
|
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, spawner.clone()));
|
||||||
let pool = Arc::new(Self::with_revalidation_type(
|
let pool = Arc::new(Self::with_revalidation_type(
|
||||||
options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner
|
options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner
|
||||||
));
|
));
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ use std::collections::BTreeSet;
|
|||||||
use sc_client_api::client::BlockchainEvents;
|
use sc_client_api::client::BlockchainEvents;
|
||||||
use sc_block_builder::BlockBuilderProvider;
|
use sc_block_builder::BlockBuilderProvider;
|
||||||
use sp_consensus::BlockOrigin;
|
use sp_consensus::BlockOrigin;
|
||||||
|
use sp_core::testing::TaskExecutor;
|
||||||
|
|
||||||
fn pool() -> Pool<TestApi> {
|
fn pool() -> Pool<TestApi> {
|
||||||
Pool::new(Default::default(), true.into(), TestApi::with_alice_nonce(209).into())
|
Pool::new(Default::default(), true.into(), TestApi::with_alice_nonce(209).into())
|
||||||
@@ -935,7 +936,7 @@ fn should_not_accept_old_signatures() {
|
|||||||
let client = Arc::new(substrate_test_runtime_client::new());
|
let client = Arc::new(substrate_test_runtime_client::new());
|
||||||
|
|
||||||
let pool = Arc::new(
|
let pool = Arc::new(
|
||||||
BasicPool::new_test(Arc::new(FullChainApi::new(client, None))).0
|
BasicPool::new_test(Arc::new(FullChainApi::new(client, None, TaskExecutor::new()))).0
|
||||||
);
|
);
|
||||||
|
|
||||||
let transfer = Transfer {
|
let transfer = Transfer {
|
||||||
@@ -971,7 +972,7 @@ fn import_notification_to_pool_maintain_works() {
|
|||||||
let mut client = Arc::new(substrate_test_runtime_client::new());
|
let mut client = Arc::new(substrate_test_runtime_client::new());
|
||||||
|
|
||||||
let pool = Arc::new(
|
let pool = Arc::new(
|
||||||
BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None))).0
|
BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None, TaskExecutor::new()))).0
|
||||||
);
|
);
|
||||||
|
|
||||||
// Prepare the extrisic, push it to the pool and check that it was added.
|
// Prepare the extrisic, push it to the pool and check that it was added.
|
||||||
|
|||||||
Reference in New Issue
Block a user