mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 17:31:03 +00:00
Use SpawnNamed to give tasks names (#1379)
This commit is contained in:
@@ -23,7 +23,7 @@
|
|||||||
#![warn(missing_docs)]
|
#![warn(missing_docs)]
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::{channel::{mpsc, oneshot}, task::Spawn};
|
use futures::channel::{mpsc, oneshot};
|
||||||
use keystore::KeyStorePtr;
|
use keystore::KeyStorePtr;
|
||||||
use polkadot_primitives::{
|
use polkadot_primitives::{
|
||||||
Hash, Block,
|
Hash, Block,
|
||||||
@@ -39,6 +39,7 @@ use client::{
|
|||||||
};
|
};
|
||||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||||
use codec::{Encode, Decode};
|
use codec::{Encode, Decode};
|
||||||
|
use sp_core::traits::SpawnNamed;
|
||||||
|
|
||||||
use log::warn;
|
use log::warn;
|
||||||
|
|
||||||
@@ -174,7 +175,7 @@ impl Store {
|
|||||||
&self,
|
&self,
|
||||||
wrapped_block_import: I,
|
wrapped_block_import: I,
|
||||||
client: Arc<P>,
|
client: Arc<P>,
|
||||||
spawner: impl Spawn,
|
spawner: impl SpawnNamed,
|
||||||
keystore: KeyStorePtr,
|
keystore: KeyStorePtr,
|
||||||
) -> ClientResult<AvailabilityBlockImport<I, P>>
|
) -> ClientResult<AvailabilityBlockImport<I, P>>
|
||||||
where
|
where
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ use std::sync::Arc;
|
|||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
use log::{error, info, trace, warn};
|
use log::{error, info, trace, warn};
|
||||||
use sp_blockchain::{Result as ClientResult};
|
use sp_blockchain::Result as ClientResult;
|
||||||
use sp_runtime::traits::{Header as HeaderT, Block as BlockT, HashFor, BlakeTwo256};
|
use sp_runtime::traits::{Header as HeaderT, Block as BlockT, HashFor, BlakeTwo256};
|
||||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||||
use client::{
|
use client::{
|
||||||
@@ -32,12 +32,13 @@ use consensus_common::{
|
|||||||
ImportResult,
|
ImportResult,
|
||||||
import_queue::CacheKeyId,
|
import_queue::CacheKeyId,
|
||||||
};
|
};
|
||||||
|
use sp_core::traits::SpawnNamed;
|
||||||
use polkadot_primitives::{Block, BlockId, Hash};
|
use polkadot_primitives::{Block, BlockId, Hash};
|
||||||
use polkadot_primitives::parachain::{
|
use polkadot_primitives::parachain::{
|
||||||
ParachainHost, ValidatorId, AbridgedCandidateReceipt, AvailableData,
|
ParachainHost, ValidatorId, AbridgedCandidateReceipt, AvailableData,
|
||||||
ValidatorPair, ErasureChunk,
|
ValidatorPair, ErasureChunk,
|
||||||
};
|
};
|
||||||
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}};
|
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}};
|
||||||
use futures::future::AbortHandle;
|
use futures::future::AbortHandle;
|
||||||
use keystore::KeyStorePtr;
|
use keystore::KeyStorePtr;
|
||||||
|
|
||||||
@@ -641,7 +642,7 @@ impl<I, P> AvailabilityBlockImport<I, P> {
|
|||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
client: Arc<P>,
|
client: Arc<P>,
|
||||||
block_import: I,
|
block_import: I,
|
||||||
spawner: impl Spawn,
|
spawner: impl SpawnNamed,
|
||||||
keystore: KeyStorePtr,
|
keystore: KeyStorePtr,
|
||||||
to_worker: mpsc::UnboundedSender<WorkerMsg>,
|
to_worker: mpsc::UnboundedSender<WorkerMsg>,
|
||||||
) -> Self
|
) -> Self
|
||||||
@@ -662,9 +663,7 @@ impl<I, P> AvailabilityBlockImport<I, P> {
|
|||||||
to_worker.clone(),
|
to_worker.clone(),
|
||||||
));
|
));
|
||||||
|
|
||||||
if let Err(_) = spawner.spawn(prune_available.map(drop)) {
|
spawner.spawn("polkadot-prune-availibility", prune_available.map(drop).boxed());
|
||||||
error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
|
|
||||||
}
|
|
||||||
|
|
||||||
AvailabilityBlockImport {
|
AvailabilityBlockImport {
|
||||||
client,
|
client,
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
use futures::{future, Future, Stream, FutureExt, StreamExt, task::Spawn};
|
use futures::{future, Future, Stream, FutureExt, StreamExt};
|
||||||
use log::warn;
|
use log::warn;
|
||||||
use sc_client_api::{StateBackend, BlockchainEvents};
|
use sc_client_api::{StateBackend, BlockchainEvents};
|
||||||
use sp_blockchain::HeaderBackend;
|
use sp_blockchain::HeaderBackend;
|
||||||
@@ -82,6 +82,7 @@ use polkadot_service_new::{
|
|||||||
Error as ServiceError, FullNodeHandles, PolkadotClient,
|
Error as ServiceError, FullNodeHandles, PolkadotClient,
|
||||||
};
|
};
|
||||||
use sc_service::SpawnTaskHandle;
|
use sc_service::SpawnTaskHandle;
|
||||||
|
use sp_core::traits::SpawnNamed;
|
||||||
|
|
||||||
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
|
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
@@ -133,7 +134,7 @@ pub trait BuildParachainContext {
|
|||||||
Client::Api: RuntimeApiCollection<Extrinsic>,
|
Client::Api: RuntimeApiCollection<Extrinsic>,
|
||||||
<Client::Api as ApiExt<Block>>::StateBackend: StateBackend<HashFor<Block>>,
|
<Client::Api as ApiExt<Block>>::StateBackend: StateBackend<HashFor<Block>>,
|
||||||
Extrinsic: codec::Codec + Send + Sync + 'static,
|
Extrinsic: codec::Codec + Send + Sync + 'static,
|
||||||
SP: Spawn + Clone + Send + Sync + 'static;
|
SP: SpawnNamed + Clone + Send + Sync + 'static;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parachain context needed for collation.
|
/// Parachain context needed for collation.
|
||||||
@@ -233,7 +234,7 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
|
|||||||
P::ParachainContext: Send + 'static,
|
P::ParachainContext: Send + 'static,
|
||||||
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
|
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
|
||||||
Extrinsic: service::Codec + Send + Sync + 'static,
|
Extrinsic: service::Codec + Send + Sync + 'static,
|
||||||
SP: Spawn + Clone + Send + Sync + 'static,
|
SP: SpawnNamed + Clone + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
Err("Collator is not functional with the new service yet".into())
|
Err("Collator is not functional with the new service yet".into())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -295,7 +295,7 @@ pub(crate) fn pov_block_topic(parent_hash: Hash) -> Hash {
|
|||||||
pub fn register_validator<C: ChainContext + 'static>(
|
pub fn register_validator<C: ChainContext + 'static>(
|
||||||
service: Arc<NetworkService<Block, Hash>>,
|
service: Arc<NetworkService<Block, Hash>>,
|
||||||
chain: C,
|
chain: C,
|
||||||
executor: &impl futures::task::Spawn,
|
executor: &impl sp_core::traits::SpawnNamed,
|
||||||
) -> RegisteredMessageValidator
|
) -> RegisteredMessageValidator
|
||||||
{
|
{
|
||||||
let s = service.clone();
|
let s = service.clone();
|
||||||
@@ -331,12 +331,7 @@ pub fn register_validator<C: ChainContext + 'static>(
|
|||||||
let fut = futures::future::poll_fn(move |cx| {
|
let fut = futures::future::poll_fn(move |cx| {
|
||||||
gossip_engine.lock().poll_unpin(cx)
|
gossip_engine.lock().poll_unpin(cx)
|
||||||
});
|
});
|
||||||
let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(fut)));
|
executor.spawn("polkadot-legacy-gossip-engine", fut.boxed());
|
||||||
|
|
||||||
// Note: we consider the chances of an error to spawn a background task almost null.
|
|
||||||
if spawn_res.is_err() {
|
|
||||||
log::error!(target: "polkadot-gossip", "Failed to spawn background task");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RegisteredMessageValidator {
|
RegisteredMessageValidator {
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ use codec::{Decode, Encode};
|
|||||||
use futures::channel::{mpsc, oneshot};
|
use futures::channel::{mpsc, oneshot};
|
||||||
use futures::future::Either;
|
use futures::future::Either;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::task::{Spawn, SpawnExt, Context, Poll};
|
use futures::task::{Context, Poll};
|
||||||
use futures::stream::{FuturesUnordered, StreamFuture};
|
use futures::stream::{FuturesUnordered, StreamFuture};
|
||||||
use log::{debug, trace};
|
use log::{debug, trace};
|
||||||
|
|
||||||
@@ -44,6 +44,7 @@ use polkadot_validation::{
|
|||||||
use sc_network::{ObservedRole, Event, PeerId};
|
use sc_network::{ObservedRole, Event, PeerId};
|
||||||
use sp_api::ProvideRuntimeApi;
|
use sp_api::ProvideRuntimeApi;
|
||||||
use sp_runtime::ConsensusEngineId;
|
use sp_runtime::ConsensusEngineId;
|
||||||
|
use sp_core::traits::SpawnNamed;
|
||||||
|
|
||||||
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
|
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
@@ -126,7 +127,9 @@ enum ServiceToWorkerMsg {
|
|||||||
/// Messages from a background task to the main worker task.
|
/// Messages from a background task to the main worker task.
|
||||||
enum BackgroundToWorkerMsg {
|
enum BackgroundToWorkerMsg {
|
||||||
// Spawn a given future.
|
// Spawn a given future.
|
||||||
Spawn(future::BoxFuture<'static, ()>),
|
//
|
||||||
|
// The name is used for the future task.
|
||||||
|
Spawn(&'static str, future::BoxFuture<'static, ()>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Operations that a handle to an underlying network service should provide.
|
/// Operations that a handle to an underlying network service should provide.
|
||||||
@@ -221,7 +224,7 @@ pub fn start<C, Api, SP>(
|
|||||||
C: ChainContext + 'static,
|
C: ChainContext + 'static,
|
||||||
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||||
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
||||||
SP: Spawn + Clone + Send + 'static,
|
SP: SpawnNamed + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
const SERVICE_TO_WORKER_BUF: usize = 256;
|
const SERVICE_TO_WORKER_BUF: usize = 256;
|
||||||
|
|
||||||
@@ -234,21 +237,26 @@ pub fn start<C, Api, SP>(
|
|||||||
chain_context,
|
chain_context,
|
||||||
&executor,
|
&executor,
|
||||||
);
|
);
|
||||||
executor.spawn(worker_loop(
|
executor.spawn(
|
||||||
|
"polkadot-network-worker",
|
||||||
|
worker_loop(
|
||||||
config,
|
config,
|
||||||
service.clone(),
|
service.clone(),
|
||||||
gossip_validator,
|
gossip_validator,
|
||||||
api,
|
api,
|
||||||
worker_receiver,
|
worker_receiver,
|
||||||
executor.clone(),
|
executor.clone(),
|
||||||
))?;
|
).boxed(),
|
||||||
|
);
|
||||||
|
|
||||||
let polkadot_service = Service {
|
let polkadot_service = Service {
|
||||||
sender: worker_sender.clone(),
|
sender: worker_sender.clone(),
|
||||||
network_service: service.clone(),
|
network_service: service.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
executor.spawn(async move {
|
executor.spawn(
|
||||||
|
"polkadot-network-notifications",
|
||||||
|
async move {
|
||||||
while let Some(event) = event_stream.next().await {
|
while let Some(event) = event_stream.next().await {
|
||||||
let res = match event {
|
let res = match event {
|
||||||
Event::Dht(_) => continue,
|
Event::Dht(_) => continue,
|
||||||
@@ -294,7 +302,8 @@ pub fn start<C, Api, SP>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})?;
|
}.boxed(),
|
||||||
|
);
|
||||||
|
|
||||||
Ok(polkadot_service)
|
Ok(polkadot_service)
|
||||||
}
|
}
|
||||||
@@ -845,7 +854,7 @@ struct Worker<Api, Sp, Gossip> {
|
|||||||
impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
||||||
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||||
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
||||||
Sp: Spawn + Clone,
|
Sp: SpawnNamed + Clone,
|
||||||
Gossip: GossipOps,
|
Gossip: GossipOps,
|
||||||
{
|
{
|
||||||
// spawns a background task to spawn consensus networking.
|
// spawns a background task to spawn consensus networking.
|
||||||
@@ -888,14 +897,17 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
|||||||
|
|
||||||
// glue the incoming messages, shared table, and validation
|
// glue the incoming messages, shared table, and validation
|
||||||
// work together.
|
// work together.
|
||||||
let _ = self.executor.spawn(statement_import_loop(
|
self.executor.spawn(
|
||||||
|
"polkadot-statement-import-loop",
|
||||||
|
statement_import_loop(
|
||||||
relay_parent,
|
relay_parent,
|
||||||
table,
|
table,
|
||||||
self.api.clone(),
|
self.api.clone(),
|
||||||
self.gossip_handle.clone(),
|
self.gossip_handle.clone(),
|
||||||
self.background_to_main_sender.clone(),
|
self.background_to_main_sender.clone(),
|
||||||
exit,
|
exit,
|
||||||
));
|
).boxed(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_service_message(&mut self, message: ServiceToWorkerMsg) {
|
fn handle_service_message(&mut self, message: ServiceToWorkerMsg) {
|
||||||
@@ -932,12 +944,15 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
|||||||
// before placing in the pool, so we can safely check by candidate hash.
|
// before placing in the pool, so we can safely check by candidate hash.
|
||||||
let get_msg = fetch_pov_from_gossip(&candidate, &self.gossip_handle);
|
let get_msg = fetch_pov_from_gossip(&candidate, &self.gossip_handle);
|
||||||
|
|
||||||
let _ = self.executor.spawn(async move {
|
self.executor.spawn(
|
||||||
|
"polkadot-fetch-pov-block",
|
||||||
|
async move {
|
||||||
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
|
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
|
||||||
if let Either::Left((pov_block, _)) = res {
|
if let Either::Left((pov_block, _)) = res {
|
||||||
let _ = sender.send(pov_block);
|
let _ = sender.send(pov_block);
|
||||||
}
|
}
|
||||||
});
|
}.boxed(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => {
|
ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => {
|
||||||
let topic = crate::erasure_coding_topic(&candidate_hash);
|
let topic = crate::erasure_coding_topic(&candidate_hash);
|
||||||
@@ -963,12 +978,15 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
|||||||
"gossip message streams do not conclude early; qed"
|
"gossip message streams do not conclude early; qed"
|
||||||
));
|
));
|
||||||
|
|
||||||
let _ = self.executor.spawn(async move {
|
self.executor.spawn(
|
||||||
|
"polkadot-fetch-erasure-chunk",
|
||||||
|
async move {
|
||||||
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
|
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
|
||||||
if let Either::Left((chunk, _)) = res {
|
if let Either::Left((chunk, _)) = res {
|
||||||
let _ = sender.send(chunk);
|
let _ = sender.send(chunk);
|
||||||
}
|
}
|
||||||
});
|
}.boxed(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => {
|
ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => {
|
||||||
let topic = crate::erasure_coding_topic(&candidate_hash);
|
let topic = crate::erasure_coding_topic(&candidate_hash);
|
||||||
@@ -1017,8 +1035,8 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
|||||||
|
|
||||||
fn handle_background_message(&mut self, message: BackgroundToWorkerMsg) {
|
fn handle_background_message(&mut self, message: BackgroundToWorkerMsg) {
|
||||||
match message {
|
match message {
|
||||||
BackgroundToWorkerMsg::Spawn(task) => {
|
BackgroundToWorkerMsg::Spawn(name, task) => {
|
||||||
let _ = self.executor.spawn(task);
|
let _ = self.executor.spawn(name, task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1068,7 +1086,7 @@ async fn worker_loop<Api, Sp>(
|
|||||||
) where
|
) where
|
||||||
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||||
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
||||||
Sp: Spawn + Clone,
|
Sp: SpawnNamed + Clone,
|
||||||
{
|
{
|
||||||
const BACKGROUND_TO_MAIN_BUF: usize = 16;
|
const BACKGROUND_TO_MAIN_BUF: usize = 16;
|
||||||
|
|
||||||
@@ -1250,7 +1268,7 @@ async fn statement_import_loop<Api>(
|
|||||||
|
|
||||||
let work = future::select(work.boxed(), exit.clone()).map(drop);
|
let work = future::select(work.boxed(), exit.clone()).map(drop);
|
||||||
if let Err(_) = to_worker.send(
|
if let Err(_) = to_worker.send(
|
||||||
BackgroundToWorkerMsg::Spawn(work.boxed())
|
BackgroundToWorkerMsg::Spawn("polkadot-statement-import-loop-sub-task", work.boxed())
|
||||||
).await {
|
).await {
|
||||||
// can fail only if remote has hung up - worker is dead,
|
// can fail only if remote has hung up - worker is dead,
|
||||||
// we should die too. this is defensive, since the exit future
|
// we should die too. this is defensive, since the exit future
|
||||||
|
|||||||
@@ -30,11 +30,24 @@ use av_store::{Store as AvailabilityStore, ErasureNetworking};
|
|||||||
use sc_network_gossip::TopicNotification;
|
use sc_network_gossip::TopicNotification;
|
||||||
use sp_api::{ApiRef, ProvideRuntimeApi};
|
use sp_api::{ApiRef, ProvideRuntimeApi};
|
||||||
use sp_runtime::traits::Block as BlockT;
|
use sp_runtime::traits::Block as BlockT;
|
||||||
use sp_core::crypto::Pair;
|
use sp_core::{crypto::Pair, traits::SpawnNamed};
|
||||||
use sp_keyring::Sr25519Keyring;
|
use sp_keyring::Sr25519Keyring;
|
||||||
|
|
||||||
use futures::executor::LocalPool;
|
use futures::executor::{LocalPool, LocalSpawner};
|
||||||
use futures::task::LocalSpawnExt;
|
use futures::task::{LocalSpawnExt, SpawnExt};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct Executor(LocalSpawner);
|
||||||
|
|
||||||
|
impl SpawnNamed for Executor {
|
||||||
|
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
|
||||||
|
self.0.spawn_local(future).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
|
||||||
|
self.spawn(name, future);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct MockNetworkOps {
|
pub struct MockNetworkOps {
|
||||||
@@ -243,7 +256,7 @@ fn test_setup(config: Config) -> (
|
|||||||
mock_gossip.clone(),
|
mock_gossip.clone(),
|
||||||
api.clone(),
|
api.clone(),
|
||||||
worker_rx,
|
worker_rx,
|
||||||
pool.spawner(),
|
Executor(pool.spawner()),
|
||||||
);
|
);
|
||||||
|
|
||||||
let service = Service {
|
let service = Service {
|
||||||
|
|||||||
@@ -26,13 +26,12 @@
|
|||||||
//!
|
//!
|
||||||
//! These attestation sessions are kept live until they are periodically garbage-collected.
|
//! These attestation sessions are kept live until they are periodically garbage-collected.
|
||||||
|
|
||||||
use std::{time::{Duration, Instant}, sync::Arc, pin::Pin};
|
use std::{time::{Duration, Instant}, sync::Arc, pin::Pin, collections::HashMap};
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use crate::pipeline::FullOutput;
|
use crate::pipeline::FullOutput;
|
||||||
use sc_client_api::{BlockchainEvents, BlockBackend};
|
use sc_client_api::{BlockchainEvents, BlockBackend};
|
||||||
use consensus::SelectChain;
|
use consensus::SelectChain;
|
||||||
use futures::{prelude::*, task::{Spawn, SpawnExt}};
|
use futures::prelude::*;
|
||||||
use polkadot_primitives::{Block, Hash, BlockId};
|
use polkadot_primitives::{Block, Hash, BlockId};
|
||||||
use polkadot_primitives::parachain::{
|
use polkadot_primitives::parachain::{
|
||||||
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
|
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
|
||||||
@@ -42,17 +41,15 @@ use keystore::KeyStorePtr;
|
|||||||
use sp_api::{ProvideRuntimeApi, ApiExt};
|
use sp_api::{ProvideRuntimeApi, ApiExt};
|
||||||
use runtime_primitives::traits::HashFor;
|
use runtime_primitives::traits::HashFor;
|
||||||
use availability_store::Store as AvailabilityStore;
|
use availability_store::Store as AvailabilityStore;
|
||||||
|
use primitives::traits::SpawnNamed;
|
||||||
|
|
||||||
use ansi_term::Colour;
|
use ansi_term::Colour;
|
||||||
use log::{warn, error, info, debug, trace};
|
use log::{warn, info, debug, trace};
|
||||||
|
|
||||||
use super::{Network, Collators, SharedTable, TableRouter};
|
use super::{Network, Collators, SharedTable, TableRouter};
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
use crate::pipeline::ValidationPool;
|
use crate::pipeline::ValidationPool;
|
||||||
|
|
||||||
/// A handle to spawn background tasks onto.
|
|
||||||
pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
|
|
||||||
|
|
||||||
// Remote processes may request for a validation instance to be cloned or instantiated.
|
// Remote processes may request for a validation instance to be cloned or instantiated.
|
||||||
// They send a oneshot channel.
|
// They send a oneshot channel.
|
||||||
type ValidationInstanceRequest = (
|
type ValidationInstanceRequest = (
|
||||||
@@ -148,7 +145,7 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
|
|||||||
N::BuildTableRouter: Send + Unpin + 'static,
|
N::BuildTableRouter: Send + Unpin + 'static,
|
||||||
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
|
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
|
||||||
SC: SelectChain<Block> + 'static,
|
SC: SelectChain<Block> + 'static,
|
||||||
SP: Spawn + Send + 'static,
|
SP: SpawnNamed + Send + 'static,
|
||||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||||
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
|
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
|
||||||
{
|
{
|
||||||
@@ -337,7 +334,7 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
|
|||||||
N::TableRouter: Send + 'static + Sync,
|
N::TableRouter: Send + 'static + Sync,
|
||||||
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
|
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
|
||||||
N::BuildTableRouter: Unpin + Send + 'static,
|
N::BuildTableRouter: Unpin + Send + 'static,
|
||||||
SP: Spawn + Send + 'static,
|
SP: SpawnNamed + Send + 'static,
|
||||||
CF: CollationFetch + Clone + Send + Sync + 'static,
|
CF: CollationFetch + Clone + Send + Sync + 'static,
|
||||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||||
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
|
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
|
||||||
@@ -453,19 +450,16 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
|
|||||||
let collation_fetch = self.collation_fetch.clone();
|
let collation_fetch = self.collation_fetch.clone();
|
||||||
let router = router.clone();
|
let router = router.clone();
|
||||||
|
|
||||||
let res = self.spawner.spawn(
|
self.spawner.spawn(
|
||||||
|
"polkadot-parachain-validation-work",
|
||||||
launch_work(
|
launch_work(
|
||||||
move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
|
move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
|
||||||
availability_store,
|
availability_store,
|
||||||
router,
|
router,
|
||||||
n_validators,
|
n_validators,
|
||||||
index,
|
index,
|
||||||
),
|
).boxed(),
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Err(e) = res {
|
|
||||||
error!(target: "validation", "Failed to launch work: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let tracker = ValidationInstanceHandle {
|
let tracker = ValidationInstanceHandle {
|
||||||
@@ -549,7 +543,7 @@ async fn launch_work<CFF, E>(
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use futures::{executor::{ThreadPool, self}, future::ready, channel::mpsc};
|
use futures::{executor, future::ready, channel::mpsc};
|
||||||
use availability_store::ErasureNetworking;
|
use availability_store::ErasureNetworking;
|
||||||
use polkadot_primitives::parachain::{
|
use polkadot_primitives::parachain::{
|
||||||
PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex,
|
PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex,
|
||||||
@@ -559,6 +553,7 @@ mod tests {
|
|||||||
use runtime_primitives::traits::Block as BlockT;
|
use runtime_primitives::traits::Block as BlockT;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use sp_keyring::sr25519::Keyring;
|
use sp_keyring::sr25519::Keyring;
|
||||||
|
use primitives::testing::SpawnBlockingExecutor;
|
||||||
|
|
||||||
/// Events fired while running mock implementations to follow execution.
|
/// Events fired while running mock implementations to follow execution.
|
||||||
enum Events {
|
enum Events {
|
||||||
@@ -719,7 +714,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn launch_work_is_executed_properly() {
|
fn launch_work_is_executed_properly() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = SpawnBlockingExecutor::new();
|
||||||
let keystore = keystore::Store::new_in_memory();
|
let keystore = keystore::Store::new_in_memory();
|
||||||
|
|
||||||
// Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator.
|
// Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator.
|
||||||
@@ -759,7 +754,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn router_is_built_on_relay_chain_validator() {
|
fn router_is_built_on_relay_chain_validator() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = SpawnBlockingExecutor::new();
|
||||||
let keystore = keystore::Store::new_in_memory();
|
let keystore = keystore::Store::new_in_memory();
|
||||||
|
|
||||||
// Make sure `Alice` key is in the keystore, so this mocked node will be a relay-chain validator.
|
// Make sure `Alice` key is in the keystore, so this mocked node will be a relay-chain validator.
|
||||||
|
|||||||
Reference in New Issue
Block a user