diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 16fd080ff2..b3b8092966 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -36,22 +36,38 @@ use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost}; use sp_api::ProvideRuntimeApi; use sp_core::traits::SpawnNamed; -use futures::prelude::*; -use std::sync::Arc; +use futures::{prelude::*, stream::FuturesUnordered, channel::oneshot, select}; +use std::{sync::Arc, collections::VecDeque, pin::Pin}; const LOG_TARGET: &str = "runtime_api"; +/// The number of maximum runtime api requests can be executed in parallel. Further requests will be buffered. +const MAX_PARALLEL_REQUESTS: usize = 4; + +/// The name of the blocking task that executes a runtime api request. +const API_REQUEST_TASK_NAME: &str = "polkadot-runtime-api-request"; + /// The `RuntimeApiSubsystem`. See module docs for more details. pub struct RuntimeApiSubsystem { client: Arc, metrics: Metrics, spawn_handle: Box, + /// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed. + waiting_requests: VecDeque<(Pin + Send>>, oneshot::Receiver<()>)>, + /// All the active runtime api requests that are currently being executed. + active_requests: FuturesUnordered>, } impl RuntimeApiSubsystem { /// Create a new Runtime API subsystem wrapping the given client and metrics. pub fn new(client: Arc, metrics: Metrics, spawn_handle: impl SpawnNamed + 'static) -> Self { - RuntimeApiSubsystem { client, metrics, spawn_handle: Box::new(spawn_handle) } + RuntimeApiSubsystem { + client, + metrics, + spawn_handle: Box::new(spawn_handle), + waiting_requests: Default::default(), + active_requests: Default::default(), + } } } @@ -68,34 +84,82 @@ impl Subsystem for RuntimeApiSubsystem where } } +impl RuntimeApiSubsystem where + Client: ProvideRuntimeApi + Send + 'static + Sync, + Client::Api: ParachainHost, +{ + /// Spawn a runtime api request. + /// + /// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered. + fn spawn_request(&mut self, relay_parent: Hash, request: Request) { + let client = self.client.clone(); + let metrics = self.metrics.clone(); + let (sender, receiver) = oneshot::channel(); + + let request = async move { + make_runtime_api_request( + client, + metrics, + relay_parent, + request, + ); + let _ = sender.send(()); + }.boxed(); + + if self.active_requests.len() >= MAX_PARALLEL_REQUESTS { + self.waiting_requests.push_back((request, receiver)); + + if self.waiting_requests.len() > MAX_PARALLEL_REQUESTS * 10 { + tracing::warn!( + target: LOG_TARGET, + "{} runtime api requests waiting to be executed.", + self.waiting_requests.len(), + ) + } + } else { + self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, request); + self.active_requests.push(receiver); + } + } + + /// Poll the active runtime api requests. + async fn poll_requests(&mut self) { + // If there are no active requests, this future should be pending forever. + if self.active_requests.len() == 0 { + return futures::pending!() + } + + // If there are active requests, this will always resolve to `Some(_)` when a request is finished. + let _ = self.active_requests.next().await; + + if let Some((req, recv)) = self.waiting_requests.pop_front() { + self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req); + self.active_requests.push(recv); + } + } +} + #[tracing::instrument(skip(ctx, subsystem), fields(subsystem = LOG_TARGET))] async fn run( mut ctx: impl SubsystemContext, - subsystem: RuntimeApiSubsystem, + mut subsystem: RuntimeApiSubsystem, ) -> SubsystemResult<()> where Client: ProvideRuntimeApi + Send + Sync + 'static, Client::Api: ParachainHost, { loop { - match ctx.recv().await? { - FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), - FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}, - FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, - FromOverseer::Communication { msg } => match msg { - RuntimeApiMessage::Request(relay_parent, request) => { - let client = subsystem.client.clone(); - let metrics = subsystem.metrics.clone(); - - subsystem.spawn_handle.spawn_blocking("polkadot-runtime-api-request", async move { - make_runtime_api_request( - client, - metrics, - relay_parent, - request, - ) - }.boxed()) - }, - } + select! { + req = ctx.recv().fuse() => match req? { + FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), + FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}, + FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, + FromOverseer::Communication { msg } => match msg { + RuntimeApiMessage::Request(relay_parent, request) => { + subsystem.spawn_request(relay_parent, request); + }, + } + }, + _ = subsystem.poll_requests().fuse() => {}, } } } @@ -213,7 +277,7 @@ mod tests { }; use polkadot_node_subsystem_test_helpers as test_helpers; use sp_core::testing::TaskExecutor; - use std::collections::{HashMap, BTreeMap}; + use std::{collections::{HashMap, BTreeMap}, sync::{Arc, Mutex}}; use futures::channel::oneshot; #[derive(Default, Clone)] @@ -221,6 +285,7 @@ mod tests { validators: Vec, validator_groups: Vec>, availability_cores: Vec, + availability_cores_wait: Arc>, validation_data: HashMap, session_index_for_child: SessionIndex, session_info: HashMap, @@ -261,6 +326,7 @@ mod tests { } fn availability_cores(&self) -> Vec { + let _ = self.availability_cores_wait.lock().unwrap(); self.availability_cores.clone() } @@ -916,4 +982,44 @@ mod tests { futures::executor::block_on(future::join(subsystem_task, test_task)); } + + #[test] + fn multiple_requests_in_parallel_are_working() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let runtime_api = Arc::new(MockRuntimeApi::default()); + let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); + let mutex = runtime_api.availability_cores_wait.clone(); + + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); + let test_task = async move { + // Make all requests block until we release this mutex. + let lock = mutex.lock().unwrap(); + + let mut receivers = Vec::new(); + + for _ in 0..MAX_PARALLEL_REQUESTS * 10 { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx)) + }).await; + + receivers.push(rx); + } + + let join = future::join_all(receivers); + + drop(lock); + + join.await + .into_iter() + .for_each(|r| assert_eq!(r.unwrap().unwrap(), runtime_api.availability_cores)); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } }