From 5624bd8bf4da5eaff14059c32076fc5cc1809712 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Fri, 17 Jul 2020 20:04:02 +0300 Subject: [PATCH] Use SpawnNamed instead of Spawn in Overseer (#1430) * Use SpawnNamed instead of Spawn in Overseer * reexport SpawnNamed and fix doc tests * Fix deps --- polkadot/Cargo.lock | 7 + polkadot/node/core/backing/Cargo.toml | 2 +- polkadot/node/core/backing/src/lib.rs | 16 +- polkadot/node/network/bridge/Cargo.toml | 1 + polkadot/node/network/bridge/src/lib.rs | 10 +- .../node/network/pov-distribution/Cargo.toml | 1 + .../node/network/pov-distribution/src/lib.rs | 31 +-- .../network/statement-distribution/Cargo.toml | 1 + .../network/statement-distribution/src/lib.rs | 11 +- polkadot/node/overseer/Cargo.toml | 2 + .../node/overseer/examples/minimal-example.rs | 38 ++- polkadot/node/overseer/src/lib.rs | 232 ++++++++++-------- polkadot/node/primitives/Cargo.toml | 1 + polkadot/node/primitives/src/lib.rs | 2 + polkadot/node/service/src/lib.rs | 3 +- polkadot/node/subsystem/src/lib.rs | 18 +- polkadot/node/subsystem/src/util.rs | 29 ++- .../node/test-helpers/subsystem/Cargo.toml | 1 + .../node/test-helpers/subsystem/src/lib.rs | 11 +- 19 files changed, 245 insertions(+), 172 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 90f469bac6..55b034e1c2 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4457,6 +4457,7 @@ dependencies = [ "polkadot-primitives", "polkadot-subsystem-test-helpers", "sc-network", + "sp-core", "sp-runtime", "streamunordered", ] @@ -4538,6 +4539,7 @@ dependencies = [ "parity-scale-codec", "polkadot-primitives", "polkadot-statement-table", + "sp-core", "sp-runtime", ] @@ -4571,9 +4573,11 @@ dependencies = [ "futures-timer 3.0.2", "kv-log-macro", "log 0.4.8", + "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-primitives", "sc-client-api", + "sp-core", "streamunordered", ] @@ -4611,6 +4615,7 @@ dependencies = [ "polkadot-primitives", "polkadot-subsystem-test-helpers", "sc-network", + "sp-core", "sp-runtime", "streamunordered", ] @@ -4963,6 +4968,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-primitives", "polkadot-subsystem-test-helpers", + "sp-core", "sp-keyring", "sp-runtime", "sp-staking", @@ -4986,6 +4992,7 @@ dependencies = [ "futures 0.3.5", "parking_lot 0.10.2", "polkadot-node-subsystem", + "sp-core", ] [[package]] diff --git a/polkadot/node/core/backing/Cargo.toml b/polkadot/node/core/backing/Cargo.toml index 720b8af418..6c8155e3cf 100644 --- a/polkadot/node/core/backing/Cargo.toml +++ b/polkadot/node/core/backing/Cargo.toml @@ -10,7 +10,6 @@ sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" } -primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-primitives = { path = "../../../primitives" } polkadot-node-primitives = { path = "../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } @@ -20,6 +19,7 @@ derive_more = "0.99.9" bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] } [dev-dependencies] +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } futures = { version = "0.3.5", features = ["thread-pool"] } subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index e0309ec842..260cd2eacc 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -24,7 +24,6 @@ use std::sync::Arc; use bitvec::vec::BitVec; use futures::{ channel::{mpsc, oneshot}, - task::{Spawn, SpawnError}, Future, FutureExt, SinkExt, StreamExt, }; @@ -37,7 +36,7 @@ use polkadot_primitives::v1::{ }; use polkadot_node_primitives::{ FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, - ValidationOutputs, ValidationResult, + ValidationOutputs, ValidationResult, SpawnNamed, }; use polkadot_subsystem::{ Subsystem, SubsystemContext, SpawnedSubsystem, @@ -77,8 +76,6 @@ enum Error { #[from] Mpsc(mpsc::SendError), #[from] - Spawn(SpawnError), - #[from] UtilError(util::Error), } @@ -735,7 +732,7 @@ pub struct CandidateBackingSubsystem { impl CandidateBackingSubsystem where - Spawner: Clone + Spawn + Send + Unpin, + Spawner: Clone + SpawnNamed + Send + Unpin, Context: SubsystemContext, ToJob: From<::Message>, { @@ -754,7 +751,7 @@ where impl Subsystem for CandidateBackingSubsystem where - Spawner: Spawn + Send + Clone + Unpin + 'static, + Spawner: SpawnNamed + Send + Clone + Unpin + 'static, Context: SubsystemContext, ::Message: Into, { @@ -769,10 +766,7 @@ where mod tests { use super::*; use assert_matches::assert_matches; - use futures::{ - executor::{self, ThreadPool}, - future, Future, - }; + use futures::{executor, future, Future}; use polkadot_primitives::v1::{ AssignmentKind, BlockData, CandidateCommitments, CollatorId, CoreAssignment, CoreIndex, LocalValidationData, GlobalValidationSchedule, GroupIndex, HeadData, @@ -905,7 +899,7 @@ mod tests { } fn test_harness>(keystore: KeyStorePtr, test: impl FnOnce(TestHarness) -> T) { - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone()); diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml index 4f6c8631e2..49e84b8597 100644 --- a/polkadot/node/network/bridge/Cargo.toml +++ b/polkadot/node/network/bridge/Cargo.toml @@ -20,3 +20,4 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys parking_lot = "0.10.0" subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } assert_matches = "1.3.0" +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 46b9dbd840..f2dbcd0aac 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -184,7 +184,10 @@ impl Subsystem for NetworkBridge fn start(self, ctx: Context) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run_network`. - SpawnedSubsystem(run_network(self.0, ctx).map(|_| ()).boxed()) + SpawnedSubsystem { + name: "network-bridge-subsystem", + future: run_network(self.0, ctx).map(|_| ()).boxed(), + } } } @@ -521,7 +524,7 @@ async fn run_network( mod tests { use super::*; use futures::channel::mpsc; - use futures::executor::{self, ThreadPool}; + use futures::executor; use std::sync::Arc; use parking_lot::Mutex; @@ -632,8 +635,7 @@ mod tests { } fn test_harness>(test: impl FnOnce(TestHarness) -> T) { - let pool = ThreadPool::new().unwrap(); - + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (network, network_handle) = new_test_network(); let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool); diff --git a/polkadot/node/network/pov-distribution/Cargo.toml b/polkadot/node/network/pov-distribution/Cargo.toml index a99e5e3d56..232e84f7eb 100644 --- a/polkadot/node/network/pov-distribution/Cargo.toml +++ b/polkadot/node/network/pov-distribution/Cargo.toml @@ -20,3 +20,4 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys parking_lot = "0.10.0" subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } assert_matches = "1.3.0" +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index 84d6e803d2..ea5365f8fc 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -69,7 +69,10 @@ impl Subsystem for PoVDistribution fn start(self, ctx: C) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run`. - SpawnedSubsystem(run(ctx).map(|_| ()).boxed()) + SpawnedSubsystem { + name: "pov-distribution-subsystem", + future: run(ctx).map(|_| ()).boxed(), + } } } @@ -548,7 +551,7 @@ async fn run( #[cfg(test)] mod tests { use super::*; - use futures::executor::{self, ThreadPool}; + use futures::executor; use polkadot_primitives::v1::BlockData; use assert_matches::assert_matches; @@ -616,7 +619,7 @@ mod tests { our_view: View(vec![hash_a, hash_b]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); let mut descriptor = CandidateDescriptor::default(); descriptor.pov_hash = pov_hash; @@ -696,7 +699,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); let mut descriptor = CandidateDescriptor::default(); descriptor.pov_hash = pov_hash; @@ -774,7 +777,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { @@ -846,7 +849,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { @@ -934,7 +937,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { @@ -997,7 +1000,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { @@ -1058,7 +1061,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { @@ -1116,7 +1119,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { @@ -1201,7 +1204,7 @@ mod tests { our_view: View(vec![hash_a, hash_b]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { @@ -1263,7 +1266,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { @@ -1340,7 +1343,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { @@ -1424,7 +1427,7 @@ mod tests { our_view: View(vec![hash_a]), }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { diff --git a/polkadot/node/network/statement-distribution/Cargo.toml b/polkadot/node/network/statement-distribution/Cargo.toml index 2f8da8ee3d..955830ef20 100644 --- a/polkadot/node/network/statement-distribution/Cargo.toml +++ b/polkadot/node/network/statement-distribution/Cargo.toml @@ -24,3 +24,4 @@ parking_lot = "0.10.0" subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } assert_matches = "1.3.0" sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index cef499eae9..67ac4c6b64 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -70,7 +70,10 @@ impl Subsystem for StatementDistribution fn start(self, ctx: C) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run`. - SpawnedSubsystem(run(ctx).map(|_| ()).boxed()) + SpawnedSubsystem { + name: "statement-distribution-subsystem", + future: run(ctx).map(|_| ()).boxed(), + } } } @@ -892,7 +895,7 @@ mod tests { use node_primitives::Statement; use polkadot_primitives::v1::CommittedCandidateReceipt; use assert_matches::assert_matches; - use futures::executor::{self, ThreadPool}; + use futures::executor; #[test] fn active_head_accepts_only_2_seconded_per_validator() { @@ -1209,7 +1212,7 @@ mod tests { }, }; - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); let peer = PeerId::random(); @@ -1301,7 +1304,7 @@ mod tests { (peer_c.clone(), peer_data_from_view(peer_c_view)), ].into_iter().collect(); - let pool = ThreadPool::new().unwrap(); + let pool = sp_core::testing::SpawnBlockingExecutor::new(); let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); executor::block_on(async move { diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index 6c6ce304e6..da19a26e0a 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -12,9 +12,11 @@ streamunordered = "0.5.1" polkadot-primitives = { path = "../../primitives" } client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } +polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } async-trait = "0.1" [dev-dependencies] +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } futures = { version = "0.3.5", features = ["thread-pool"] } futures-timer = "3.0.2" femme = "2.0.1" diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index d5f7f043ff..35ebf8f336 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -21,7 +21,7 @@ use std::time::Duration; use futures::{ channel::oneshot, - pending, pin_mut, executor, select, stream, + pending, pin_mut, select, stream, FutureExt, StreamExt, }; use futures_timer::Delay; @@ -77,9 +77,14 @@ impl Subsystem for Subsystem1 where C: SubsystemContext { fn start(self, ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { + let future = Box::pin(async move { Self::run(ctx).await; - })) + }); + + SpawnedSubsystem { + name: "subsystem-1", + future, + } } } @@ -87,12 +92,15 @@ struct Subsystem2; impl Subsystem2 { async fn run(mut ctx: impl SubsystemContext) { - ctx.spawn(Box::pin(async { - loop { - log::info!("Job tick"); - Delay::new(Duration::from_secs(1)).await; - } - })).await.unwrap(); + ctx.spawn( + "subsystem-2-job", + Box::pin(async { + loop { + log::info!("Job tick"); + Delay::new(Duration::from_secs(1)).await; + } + }), + ).await.unwrap(); loop { match ctx.try_recv().await { @@ -114,16 +122,20 @@ impl Subsystem for Subsystem2 where C: SubsystemContext { fn start(self, ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { + let future = Box::pin(async move { Self::run(ctx).await; - })) + }); + + SpawnedSubsystem { + name: "subsystem-2", + future, + } } } fn main() { femme::with_level(femme::LevelFilter::Trace); - let spawner = executor::ThreadPool::new().unwrap(); - + let spawner = sp_core::testing::SpawnBlockingExecutor::new(); futures::executor::block_on(async { let timer_stream = stream::repeat(()).then(|_| async { Delay::new(Duration::from_secs(1)).await; diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 91fceb0d3b..c6ac6444c2 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -64,9 +64,8 @@ use std::collections::HashSet; use futures::channel::{mpsc, oneshot}; use futures::{ pending, poll, select, - future::{BoxFuture, RemoteHandle}, + future::BoxFuture, stream::{self, FuturesUnordered}, - task::{Spawn, SpawnExt}, Future, FutureExt, SinkExt, StreamExt, }; use futures_timer::Delay; @@ -86,6 +85,7 @@ pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, SpawnedSubsystem, }; +use polkadot_node_primitives::SpawnNamed; // A capacity of bounded channels inside the overseer. @@ -109,8 +109,8 @@ enum ToOverseer { /// spawn on the overseer and a `oneshot::Sender` to signal the result /// of the spawn. SpawnJob { + name: &'static str, s: BoxFuture<'static, ()>, - res: oneshot::Sender>, }, } @@ -279,14 +279,15 @@ impl SubsystemContext for OverseerSubsystemContext { self.rx.next().await.ok_or(SubsystemError) } - async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { - let (tx, rx) = oneshot::channel(); + async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) + -> SubsystemResult<()> + { self.tx.send(ToOverseer::SpawnJob { + name, s, - res: tx, }).await?; - rx.await? + Ok(()) } async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { @@ -322,7 +323,7 @@ struct OverseenSubsystem { } /// The `Overseer` itself. -pub struct Overseer { +pub struct Overseer { /// A candidate validation subsystem. candidate_validation_subsystem: OverseenSubsystem, @@ -361,7 +362,7 @@ pub struct Overseer { s: S, /// Here we keep handles to spawned subsystems to be notified when they terminate. - running_subsystems: FuturesUnordered>, + running_subsystems: FuturesUnordered>, /// Gather running subsystms' outbound streams into one. running_subsystems_rx: StreamUnordered>, @@ -416,7 +417,7 @@ pub struct AllSubsystems { impl Overseer where - S: Spawn, + S: SpawnNamed, { /// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s. /// @@ -467,16 +468,19 @@ where /// self, /// mut ctx: C, /// ) -> SpawnedSubsystem { - /// SpawnedSubsystem(Box::pin(async move { - /// loop { - /// Delay::new(Duration::from_secs(1)).await; - /// } - /// })) + /// SpawnedSubsystem { + /// name: "validation-subsystem", + /// future: Box::pin(async move { + /// loop { + /// Delay::new(Duration::from_secs(1)).await; + /// } + /// }), + /// } /// } /// } /// /// # fn main() { executor::block_on(async move { - /// let spawner = executor::ThreadPool::new().unwrap(); + /// let spawner = sp_core::testing::SpawnBlockingExecutor::new(); /// let all_subsystems = AllSubsystems { /// candidate_validation: ValidationSubsystem, /// candidate_backing: DummySubsystem, @@ -737,10 +741,8 @@ where ) { match msg { ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await, - ToOverseer::SpawnJob { s, res } => { - let s = self.spawn_job(s); - - let _ = res.send(s); + ToOverseer::SpawnJob { name, s } => { + self.spawn_job(name, s); } } } @@ -897,26 +899,33 @@ where } } - fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> { - self.s.spawn(j).map_err(|_| SubsystemError) + fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) { + self.s.spawn(name, j); } } -fn spawn( +fn spawn( spawner: &mut S, - futures: &mut FuturesUnordered>, + futures: &mut FuturesUnordered>, streams: &mut StreamUnordered>, s: impl Subsystem>, ) -> SubsystemResult> { let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx }; - let f = s.start(ctx); + let SpawnedSubsystem { future, name } = s.start(ctx); - let handle = spawner.spawn_with_handle(f.0)?; + let (tx, rx) = oneshot::channel(); + + let fut = Box::pin(async move { + future.await; + let _ = tx.send(()); + }); + + spawner.spawn(name, fut); streams.push(from_rx); - futures.push(handle); + futures.push(Box::pin(rx.map(|_| ()))); let instance = Some(SubsystemInstance { tx: to_tx, @@ -944,21 +953,24 @@ mod tests { { fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0; - SpawnedSubsystem(Box::pin(async move { - let mut i = 0; - loop { - match ctx.recv().await { - Ok(FromOverseer::Communication { .. }) => { - let _ = sender.send(i).await; - i += 1; - continue; + SpawnedSubsystem { + name: "test-subsystem-1", + future: Box::pin(async move { + let mut i = 0; + loop { + match ctx.recv().await { + Ok(FromOverseer::Communication { .. }) => { + let _ = sender.send(i).await; + i += 1; + continue; + } + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return, + Err(_) => return, + _ => (), } - Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return, - Err(_) => return, - _ => (), } - } - })) + }), + } } } @@ -969,39 +981,42 @@ mod tests { { fn start(self, mut ctx: C) -> SpawnedSubsystem { let sender = self.0.clone(); - SpawnedSubsystem(Box::pin(async move { - let _sender = sender; - let mut c: usize = 0; - loop { - if c < 10 { - let (tx, _) = oneshot::channel(); - ctx.send_message( - AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromChainState( - Default::default(), - PoV { - block_data: BlockData(Vec::new()), - }.into(), - tx, + SpawnedSubsystem { + name: "test-subsystem-2", + future: Box::pin(async move { + let _sender = sender; + let mut c: usize = 0; + loop { + if c < 10 { + let (tx, _) = oneshot::channel(); + ctx.send_message( + AllMessages::CandidateValidation( + CandidateValidationMessage::ValidateFromChainState( + Default::default(), + PoV { + block_data: BlockData(Vec::new()), + }.into(), + tx, + ) ) - ) - ).await.unwrap(); - c += 1; - continue; - } - match ctx.try_recv().await { - Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => { - break; - } - Ok(Some(_)) => { + ).await.unwrap(); + c += 1; continue; } - Err(_) => return, - _ => (), + match ctx.try_recv().await { + Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => { + break; + } + Ok(Some(_)) => { + continue; + } + Err(_) => return, + _ => (), + } + pending!(); } - pending!(); - } - })) + }), + } } } @@ -1011,16 +1026,19 @@ mod tests { where C: SubsystemContext { fn start(self, mut _ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { - // Do nothing and exit. - })) + SpawnedSubsystem { + name: "test-subsystem-4", + future: Box::pin(async move { + // Do nothing and exit. + }), + } } } // Checks that a minimal configuration of two jobs can run and exchange messages. #[test] fn overseer_works() { - let spawner = executor::ThreadPool::new().unwrap(); + let spawner = sp_core::testing::SpawnBlockingExecutor::new(); executor::block_on(async move { let (s1_tx, mut s1_rx) = mpsc::channel(64); @@ -1084,7 +1102,7 @@ mod tests { // Should immediately conclude the overseer itself with an error. #[test] fn overseer_panics_on_subsystem_exit() { - let spawner = executor::ThreadPool::new().unwrap(); + let spawner = sp_core::testing::SpawnBlockingExecutor::new(); executor::block_on(async move { let (s1_tx, _) = mpsc::channel(64); @@ -1124,21 +1142,24 @@ mod tests { fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); - SpawnedSubsystem(Box::pin(async move { - loop { - match ctx.try_recv().await { - Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break, - Ok(Some(FromOverseer::Signal(s))) => { - sender.send(s).await.unwrap(); - continue; - }, - Ok(Some(_)) => continue, - Err(_) => return, - _ => (), + SpawnedSubsystem { + name: "test-subsystem-5", + future: Box::pin(async move { + loop { + match ctx.try_recv().await { + Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break, + Ok(Some(FromOverseer::Signal(s))) => { + sender.send(s).await.unwrap(); + continue; + }, + Ok(Some(_)) => continue, + Err(_) => return, + _ => (), + } + pending!(); } - pending!(); - } - })) + }), + } } } @@ -1150,21 +1171,24 @@ mod tests { fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); - SpawnedSubsystem(Box::pin(async move { - loop { - match ctx.try_recv().await { - Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break, - Ok(Some(FromOverseer::Signal(s))) => { - sender.send(s).await.unwrap(); - continue; - }, - Ok(Some(_)) => continue, - Err(_) => return, - _ => (), + SpawnedSubsystem { + name: "test-subsystem-6", + future: Box::pin(async move { + loop { + match ctx.try_recv().await { + Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break, + Ok(Some(FromOverseer::Signal(s))) => { + sender.send(s).await.unwrap(); + continue; + }, + Ok(Some(_)) => continue, + Err(_) => return, + _ => (), + } + pending!(); } - pending!(); - } - })) + }), + } } } @@ -1172,7 +1196,7 @@ mod tests { // notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats. #[test] fn overseer_start_stop_works() { - let spawner = executor::ThreadPool::new().unwrap(); + let spawner = sp_core::testing::SpawnBlockingExecutor::new(); executor::block_on(async move { let first_block_hash = [1; 32].into(); @@ -1267,7 +1291,7 @@ mod tests { // notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats. #[test] fn overseer_finalize_works() { - let spawner = executor::ThreadPool::new().unwrap(); + let spawner = sp_core::testing::SpawnBlockingExecutor::new(); executor::block_on(async move { let first_block_hash = [1; 32].into(); diff --git a/polkadot/node/primitives/Cargo.toml b/polkadot/node/primitives/Cargo.toml index b2bc9231ae..aea4cca3b7 100644 --- a/polkadot/node/primitives/Cargo.toml +++ b/polkadot/node/primitives/Cargo.toml @@ -11,3 +11,4 @@ polkadot-statement-table = { path = "../../statement-table" } parity-scale-codec = { version = "1.3.0", default-features = false, features = ["derive"] } runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } async-trait = "0.1" +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs index f29be57374..6630199ae5 100644 --- a/polkadot/node/primitives/src/lib.rs +++ b/polkadot/node/primitives/src/lib.rs @@ -35,6 +35,8 @@ use polkadot_statement_table::{ v1::Misbehavior as TableMisbehavior, }; +pub use sp_core::traits::SpawnNamed; + /// A statement, where the candidate receipt is included in the `Seconded` variant. /// /// This is the committed candidate receipt instead of the bare candidate receipt. As such, diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 70867092c7..6d5cdbda42 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -33,6 +33,7 @@ use polkadot_overseer::{self as overseer, AllSubsystems, BlockInfo, Overseer, Ov use polkadot_subsystem::DummySubsystem; use polkadot_node_core_proposer::ProposerFactory; use sp_trie::PrefixedMemoryDB; +use sp_core::traits::SpawnNamed; pub use service::{ Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis, TFullClient, TLightClient, TFullBackend, TLightBackend, TFullCallExecutor, TLightCallExecutor, @@ -269,7 +270,7 @@ macro_rules! new_full_start { }} } -fn real_overseer( +fn real_overseer( leaves: impl IntoIterator, s: S, ) -> Result<(Overseer, OverseerHandler), ServiceError> { diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index b6c3a79ef3..8637db45a2 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -101,7 +101,12 @@ impl From for SubsystemError { /// An asynchronous subsystem task.. /// /// In essence it's just a newtype wrapping a `BoxFuture`. -pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>); +pub struct SpawnedSubsystem { + /// Name of the subsystem being spawned. + pub name: &'static str, + /// The task of the subsystem being spawned. + pub future: BoxFuture<'static, ()>, +} /// A `Result` type that wraps [`SubsystemError`]. /// @@ -130,7 +135,7 @@ pub trait SubsystemContext: Send + 'static { async fn recv(&mut self) -> SubsystemResult>; /// Spawn a child task on the executor. - async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()>; + async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()>; /// Send a direct message to some other `Subsystem`, routed based on message type. async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>; @@ -159,7 +164,7 @@ pub struct DummySubsystem; impl Subsystem for DummySubsystem { fn start(self, mut ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { + let future = Box::pin(async move { loop { match ctx.recv().await { Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return, @@ -167,6 +172,11 @@ impl Subsystem for DummySubsystem { _ => continue, } } - })) + }); + + SpawnedSubsystem { + name: "DummySubsystem", + future, + } } } diff --git a/polkadot/node/subsystem/src/util.rs b/polkadot/node/subsystem/src/util.rs index 1b89bfb053..94bb95c26d 100644 --- a/polkadot/node/subsystem/src/util.rs +++ b/polkadot/node/subsystem/src/util.rs @@ -30,7 +30,7 @@ use futures::{ prelude::*, select, stream::Stream, - task::{self, Spawn, SpawnError, SpawnExt}, + task, }; use futures_timer::Delay; use keystore::KeyStorePtr; @@ -40,7 +40,10 @@ use polkadot_primitives::v1::{ EncodeAs, Hash, HeadData, Id as ParaId, Signed, SigningContext, ValidatorId, ValidatorIndex, ValidatorPair, }; -use sp_core::Pair; +use sp_core::{ + Pair, + traits::SpawnNamed, +}; use std::{ collections::HashMap, convert::{TryFrom, TryInto}, @@ -64,9 +67,6 @@ pub enum Error { /// Attempted to send on a MPSC channel which has been canceled #[from] Mpsc(mpsc::SendError), - /// Attempted to spawn a new task, and failed - #[from] - Spawn(SpawnError), /// Attempted to convert from an AllMessages to a FromJob, and failed. SenderConversion(String), /// The local node is not a validator. @@ -358,7 +358,7 @@ pub struct Jobs { job: std::marker::PhantomData, } -impl Jobs { +impl Jobs { /// Create a new Jobs manager which handles spawning appropriate jobs. pub fn new(spawner: Spawner) -> Self { Self { @@ -391,7 +391,7 @@ impl Jobs { let _ = future.await; let _ = finished_tx.send(()); }; - self.spawner.spawn(future)?; + self.spawner.spawn(Job::NAME, future.boxed()); // this handle lets us remove the appropriate receiver from self.outgoing_msgs // when it's time to stop the job. @@ -444,7 +444,7 @@ impl PinnedDrop for Jobs { impl Stream for Jobs where - Spawner: Spawn, + Spawner: SpawnNamed, Job: JobTrait, { type Item = Job::FromJob; @@ -476,7 +476,7 @@ pub struct JobManager { impl JobManager where - Spawner: Spawn + Clone + Send + Unpin, + Spawner: SpawnNamed + Clone + Send + Unpin, Context: SubsystemContext, Job: JobTrait, Job::RunArgs: Clone, @@ -595,7 +595,7 @@ where impl Subsystem for JobManager where - Spawner: Spawn + Send + Clone + Unpin + 'static, + Spawner: SpawnNamed + Send + Clone + Unpin + 'static, Context: SubsystemContext, ::Message: Into, Job: JobTrait + Send, @@ -606,8 +606,13 @@ where let spawner = self.spawner.clone(); let run_args = self.run_args.clone(); - SpawnedSubsystem(Box::pin(async move { + let future = Box::pin(async move { Self::run(ctx, run_args, spawner).await; - })) + }); + + SpawnedSubsystem { + name: "JobManager", + future, + } } } diff --git a/polkadot/node/test-helpers/subsystem/Cargo.toml b/polkadot/node/test-helpers/subsystem/Cargo.toml index 0fc26a24ea..8175a5a6b1 100644 --- a/polkadot/node/test-helpers/subsystem/Cargo.toml +++ b/polkadot/node/test-helpers/subsystem/Cargo.toml @@ -10,3 +10,4 @@ futures = "0.3.5" async-trait = "0.1" polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } parking_lot = "0.10.0" +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/test-helpers/subsystem/src/lib.rs b/polkadot/node/test-helpers/subsystem/src/lib.rs index c99a33c78d..5fa7f0b9ec 100644 --- a/polkadot/node/test-helpers/subsystem/src/lib.rs +++ b/polkadot/node/test-helpers/subsystem/src/lib.rs @@ -21,9 +21,9 @@ use polkadot_subsystem::messages::AllMessages; use futures::prelude::*; use futures::channel::mpsc; -use futures::task::{Spawn, SpawnExt}; use futures::poll; use parking_lot::Mutex; +use sp_core::traits::SpawnNamed; use std::convert::Infallible; use std::pin::Pin; @@ -155,7 +155,7 @@ pub struct TestSubsystemContext { } #[async_trait::async_trait] -impl SubsystemContext for TestSubsystemContext { +impl SubsystemContext for TestSubsystemContext { type Message = M; async fn try_recv(&mut self) -> Result>, ()> { @@ -170,8 +170,11 @@ impl SubsystemContext for TestSubs self.rx.next().await.ok_or(SubsystemError) } - async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { - self.spawn.spawn(s).map_err(Into::into) + async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) + -> SubsystemResult<()> + { + self.spawn.spawn(name, s); + Ok(()) } async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {