diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 38f01da139..1e8908ebe5 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -39,7 +39,7 @@ use sp_core::traits::SpawnNamed; use cache::{RequestResult, RequestResultCache}; use futures::{channel::oneshot, prelude::*, select, stream::FuturesUnordered}; -use std::{collections::VecDeque, pin::Pin, sync::Arc}; +use std::sync::Arc; mod cache; @@ -51,7 +51,8 @@ mod tests; const LOG_TARGET: &str = "parachain::runtime-api"; -/// The number of maximum runtime API requests can be executed in parallel. Further requests will be buffered. +/// The number of maximum runtime API requests can be executed in parallel. +/// Further requests will backpressure the bounded channel. const MAX_PARALLEL_REQUESTS: usize = 4; /// The name of the blocking task that executes a runtime API request. @@ -62,11 +63,6 @@ pub struct RuntimeApiSubsystem { client: Arc, metrics: Metrics, spawn_handle: Box, - /// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed. - waiting_requests: VecDeque<( - Pin + Send>>, - oneshot::Receiver>, - )>, /// All the active runtime API requests that are currently being executed. active_requests: FuturesUnordered>>, /// Requests results cache @@ -84,7 +80,6 @@ impl RuntimeApiSubsystem { client, metrics, spawn_handle: Box::new(spawn_handle), - waiting_requests: Default::default(), active_requests: Default::default(), requests_cache: RequestResultCache::default(), } @@ -276,13 +271,12 @@ where } /// Spawn a runtime API request. - /// - /// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered. fn spawn_request(&mut self, relay_parent: Hash, request: Request) { let client = self.client.clone(); let metrics = self.metrics.clone(); let (sender, receiver) = oneshot::channel(); + // TODO: make the cache great again https://github.com/paritytech/polkadot/issues/5546 let request = match self.query_cache(relay_parent.clone(), request) { Some(request) => request, None => return, @@ -294,21 +288,9 @@ where } .boxed(); - if self.active_requests.len() >= MAX_PARALLEL_REQUESTS { - self.waiting_requests.push_back((request, receiver)); - - if self.waiting_requests.len() > MAX_PARALLEL_REQUESTS * 10 { - gum::warn!( - target: LOG_TARGET, - "{} runtime API requests waiting to be executed.", - self.waiting_requests.len(), - ) - } - } else { - self.spawn_handle - .spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request); - self.active_requests.push(receiver); - } + self.spawn_handle + .spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request); + self.active_requests.push(receiver); } /// Poll the active runtime API requests. @@ -322,12 +304,11 @@ where if let Some(Ok(Some(result))) = self.active_requests.next().await { self.store_cache(result); } + } - if let Some((req, recv)) = self.waiting_requests.pop_front() { - self.spawn_handle - .spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), req); - self.active_requests.push(recv); - } + /// Returns true if our `active_requests` queue is full. + fn is_busy(&self) -> bool { + self.active_requests.len() >= MAX_PARALLEL_REQUESTS } } @@ -341,6 +322,17 @@ where Client::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, { loop { + // Let's add some back pressure when the subsystem is running at `MAX_PARALLEL_REQUESTS`. + // This can never block forever, because `active_requests` is owned by this task and any mutations + // happen either in `poll_requests` or `spawn_request` - so if `is_busy` returns true, then + // even if all of the requests finish before us calling `poll_requests` the `active_requests` length + // remains invariant. + if subsystem.is_busy() { + // Since we are not using any internal waiting queues, we need to wait for exactly + // one request to complete before we can read the next one from the overseer channel. + let _ = subsystem.poll_requests().await; + } + select! { req = ctx.recv().fuse() => match req? { FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), diff --git a/polkadot/node/core/runtime-api/src/tests.rs b/polkadot/node/core/runtime-api/src/tests.rs index 286465886d..4e75df1005 100644 --- a/polkadot/node/core/runtime-api/src/tests.rs +++ b/polkadot/node/core/runtime-api/src/tests.rs @@ -17,7 +17,6 @@ use super::*; use ::test_helpers::{dummy_committed_candidate_receipt, dummy_validation_code}; -use futures::channel::oneshot; use polkadot_node_primitives::{BabeAllowedSlots, BabeEpoch, BabeEpochConfiguration}; use polkadot_node_subsystem_test_helpers::make_subsystem_context; use polkadot_primitives::v2::{ @@ -847,8 +846,7 @@ fn multiple_requests_in_parallel_are_working() { let lock = mutex.lock().unwrap(); let mut receivers = Vec::new(); - - for _ in 0..MAX_PARALLEL_REQUESTS * 10 { + for _ in 0..MAX_PARALLEL_REQUESTS { let (tx, rx) = oneshot::channel(); ctx_handle @@ -856,14 +854,25 @@ fn multiple_requests_in_parallel_are_working() { msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx)), }) .await; + receivers.push(rx); + } + // The backpressure from reaching `MAX_PARALLEL_REQUESTS` will make the test block, we need to drop the lock. + drop(lock); + + for _ in 0..MAX_PARALLEL_REQUESTS * 100 { + let (tx, rx) = oneshot::channel(); + + ctx_handle + .send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx)), + }) + .await; receivers.push(rx); } let join = future::join_all(receivers); - drop(lock); - join.await .into_iter() .for_each(|r| assert_eq!(r.unwrap().unwrap(), runtime_api.availability_cores));