From ffc6f7c731c88685145712b0245bc6f71b988544 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 22 Jun 2021 03:43:40 +0200 Subject: [PATCH] make `ctx.spawn` blocking (#3337) * make spawn sync * improve error type --- polkadot/node/collation-generation/src/lib.rs | 2 +- polkadot/node/core/approval-voting/src/lib.rs | 1 - .../node/core/candidate-validation/src/lib.rs | 2 +- polkadot/node/malus/src/lib.rs | 8 ++-- polkadot/node/metered-channel/src/lib.rs | 3 +- .../node/metered-channel/src/unbounded.rs | 48 +------------------ .../src/pov_requester/mod.rs | 1 - .../src/requester/fetch_task/mod.rs | 1 - .../network/availability-recovery/src/lib.rs | 2 +- polkadot/node/network/bridge/src/lib.rs | 2 +- .../network/statement-distribution/src/lib.rs | 7 +-- .../node/overseer/examples/minimal-example.rs | 2 +- polkadot/node/overseer/src/lib.rs | 12 ++--- .../node/subsystem-test-helpers/src/lib.rs | 4 +- polkadot/node/subsystem-util/src/lib.rs | 4 +- polkadot/node/subsystem/src/lib.rs | 8 ++-- 16 files changed, 27 insertions(+), 80 deletions(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index c52dcc846c..0d23cf2595 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -412,7 +412,7 @@ async fn handle_new_activations( "failed to send collation result", ); } - })).await?; + }))?; } } diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 597bf378cc..e219671920 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -2100,7 +2100,6 @@ async fn launch_approval( let (background, remote_handle) = background.remote_handle(); ctx.spawn("approval-checks", Box::pin(background)) - .await .map(move |()| Some(remote_handle)) } diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 245a37d3e5..aabdf55517 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -107,7 +107,7 @@ async fn run( let (mut validation_host, task) = polkadot_node_core_pvf::start( polkadot_node_core_pvf::Config::new(cache_path, program_path), ); - ctx.spawn_blocking("pvf-validation-host", task.boxed()).await?; + ctx.spawn_blocking("pvf-validation-host", task.boxed())?; loop { match ctx.recv().await? { diff --git a/polkadot/node/malus/src/lib.rs b/polkadot/node/malus/src/lib.rs index 6cd45a3c4d..1b8945a6c4 100644 --- a/polkadot/node/malus/src/lib.rs +++ b/polkadot/node/malus/src/lib.rs @@ -134,20 +134,20 @@ where } } - async fn spawn( + fn spawn( &mut self, name: &'static str, s: Pin + Send>>, ) -> SubsystemResult<()> { - self.inner.spawn(name, s).await + self.inner.spawn(name, s) } - async fn spawn_blocking( + fn spawn_blocking( &mut self, name: &'static str, s: Pin + Send>>, ) -> SubsystemResult<()> { - self.inner.spawn_blocking(name, s).await + self.inner.spawn_blocking(name, s) } fn sender(&mut self) -> &mut Self::Sender { diff --git a/polkadot/node/metered-channel/src/lib.rs b/polkadot/node/metered-channel/src/lib.rs index e2fc0d84b5..917c04264d 100644 --- a/polkadot/node/metered-channel/src/lib.rs +++ b/polkadot/node/metered-channel/src/lib.rs @@ -169,14 +169,13 @@ mod tests { #[test] fn failed_send_does_not_inc_sent() { let (mut bounded, _) = channel::(5); - let (mut unbounded, _) = unbounded::(); + let (unbounded, _) = unbounded::(); block_on(async move { assert!(bounded.send(Msg::default()).await.is_err()); assert!(bounded.try_send(Msg::default()).is_err()); assert_eq!(bounded.meter().read(), Readout { sent: 0, received: 0 }); - assert!(unbounded.send(Msg::default()).await.is_err()); assert!(unbounded.unbounded_send(Msg::default()).is_err()); assert_eq!(unbounded.meter().read(), Readout { sent: 0, received: 0 }); }); diff --git a/polkadot/node/metered-channel/src/unbounded.rs b/polkadot/node/metered-channel/src/unbounded.rs index 70a2115368..bf1400681a 100644 --- a/polkadot/node/metered-channel/src/unbounded.rs +++ b/polkadot/node/metered-channel/src/unbounded.rs @@ -16,7 +16,7 @@ //! Metered variant of unbounded mpsc channels to be able to extract metrics. -use futures::{channel::mpsc, task::Poll, task::Context, sink::SinkExt, stream::Stream}; +use futures::{channel::mpsc, task::Poll, task::Context, stream::Stream}; use std::result; use std::pin::Pin; @@ -130,21 +130,6 @@ impl UnboundedMeteredSender { &self.meter } - /// Send message, wait until capacity is available. - pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError> - where - Self: Unpin, - { - self.meter.note_sent(); - let fut = self.inner.send(item); - futures::pin_mut!(fut); - fut.await.map_err(|e| { - self.meter.retract_sent(); - e - }) - } - - /// Attempt to send message or fail immediately. pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError> { self.meter.note_sent(); @@ -154,34 +139,3 @@ impl UnboundedMeteredSender { }) } } - -impl futures::sink::Sink for UnboundedMeteredSender { - type Error = as futures::sink::Sink>::Error; - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - Pin::new(&mut self.inner).start_send(item) - } - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_ready(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.inner).poll_ready(cx) { - val @ Poll::Ready(_)=> { - val - } - other => other, - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.inner).poll_ready(cx) { - val @ Poll::Ready(_)=> { - self.meter.note_sent(); - val - } - other => other, - } - } -} diff --git a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs index e5ee1656e0..3d7e41e6ae 100644 --- a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs @@ -76,7 +76,6 @@ where .with_validator_index(from_validator) .with_relay_parent(parent); ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed()) - .await .map_err(|e| Fatal::SpawnTask(e))?; Ok(()) } diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 1a4d6574e1..c936d443fc 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -189,7 +189,6 @@ impl FetchTask { let (handle, kill) = oneshot::channel(); ctx.spawn("chunk-fetcher", running.run(kill).boxed()) - .await .map_err(|e| Fatal::SpawnTask(e))?; Ok(FetchTask { diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 711c191c81..efd8d37128 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -650,7 +650,7 @@ async fn launch_interaction( awaiting: vec![response_sender], }); - if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)).await { + if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)) { tracing::warn!( target: LOG_TARGET, err = ?e, diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 8e8dd5147c..399b6e6c26 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -887,7 +887,7 @@ where shared.clone(), ).remote_handle(); - ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?; + ctx.spawn("network-bridge-network-worker", Box::pin(remote))?; ctx.send_message(AllMessages::StatementDistribution( StatementDistributionMessage::StatementFetchingReceiver(statement_receiver) diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 238ea4672f..b0b2d8d8f8 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -1238,8 +1238,7 @@ async fn launch_request( ) .remote_handle(); - let result = ctx.spawn("large-statement-fetcher", task.boxed()) - .await; + let result = ctx.spawn("large-statement-fetcher", task.boxed()); if let Err(err) = result { tracing::error!(target: LOG_TARGET, ?err, "Spawning task failed."); return None @@ -1952,9 +1951,7 @@ impl StatementDistribution { ctx.spawn( "large-statement-responder", respond(receiver, res_sender.clone()).boxed() - ) - .await - .map_err(Fatal::SpawnTask)?; + ).map_err(Fatal::SpawnTask)?; } } } diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index 830c65e89d..bd3170fa13 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -103,7 +103,7 @@ impl Subsystem2 { Delay::new(Duration::from_secs(1)).await; } }), - ).await.unwrap(); + ).unwrap(); loop { match ctx.try_recv().await { diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 5f2422fc73..e40ce9e9e8 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -926,22 +926,22 @@ impl SubsystemContext for OverseerSubsystemContext { } } - async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) + fn spawn(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()> { - self.to_overseer.send(ToOverseer::SpawnJob { + self.to_overseer.unbounded_send(ToOverseer::SpawnJob { name, s, - }).await.map_err(Into::into) + }).map_err(|_| SubsystemError::TaskSpawn(name)) } - async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) + fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()> { - self.to_overseer.send(ToOverseer::SpawnBlockingJob { + self.to_overseer.unbounded_send(ToOverseer::SpawnBlockingJob { name, s, - }).await.map_err(Into::into) + }).map_err(|_| SubsystemError::TaskSpawn(name)) } fn sender(&mut self) -> &mut OverseerSubsystemSender { diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index f5333f72aa..c09d0ed000 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -224,7 +224,7 @@ impl SubsystemContext .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned())) } - async fn spawn( + fn spawn( &mut self, name: &'static str, s: Pin + Send>>, @@ -233,7 +233,7 @@ impl SubsystemContext Ok(()) } - async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) + fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()> { self.spawn.spawn_blocking(name, s); diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index d474a46dd1..85348b1457 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -726,9 +726,9 @@ impl JobSubsystem { } outgoing = jobs.next() => { let res = match outgoing.expect("the Jobs stream never ends; qed") { - FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await, + FromJobCommand::Spawn(name, task) => ctx.spawn(name, task), FromJobCommand::SpawnBlocking(name, task) - => ctx.spawn_blocking(name, task).await, + => ctx.spawn_blocking(name, task), }; if let Err(e) = res { diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index d070ff4d6c..d722508ca4 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -197,8 +197,8 @@ pub enum SubsystemError { #[error(transparent)] QueueError(#[from] mpsc::SendError), - #[error(transparent)] - TaskSpawn(#[from] futures::task::SpawnError), + #[error("Failed to spawn a task: {0}")] + TaskSpawn(&'static str), #[error(transparent)] Infallible(#[from] std::convert::Infallible), @@ -293,10 +293,10 @@ pub trait SubsystemContext: Send + Sized + 'static { async fn recv(&mut self) -> SubsystemResult>; /// Spawn a child task on the executor. - async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()>; + fn spawn(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()>; /// Spawn a blocking child task on the executor's dedicated thread pool. - async fn spawn_blocking( + fn spawn_blocking( &mut self, name: &'static str, s: Pin + Send>>,