mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 15:47:58 +00:00
runtime-api: remove internal queue to make ToFs relevant again (#5545)
* Make back pressure great again Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Test commit - parallel vs caching Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Increase concurrency Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Fixups Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * fix comment Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * update todo issue Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * another doc change Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * More comments Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
@@ -39,7 +39,7 @@ use sp_core::traits::SpawnNamed;
|
||||
|
||||
use cache::{RequestResult, RequestResultCache};
|
||||
use futures::{channel::oneshot, prelude::*, select, stream::FuturesUnordered};
|
||||
use std::{collections::VecDeque, pin::Pin, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
mod cache;
|
||||
|
||||
@@ -51,7 +51,8 @@ mod tests;
|
||||
|
||||
const LOG_TARGET: &str = "parachain::runtime-api";
|
||||
|
||||
/// The number of maximum runtime API requests can be executed in parallel. Further requests will be buffered.
|
||||
/// The number of maximum runtime API requests can be executed in parallel.
|
||||
/// Further requests will backpressure the bounded channel.
|
||||
const MAX_PARALLEL_REQUESTS: usize = 4;
|
||||
|
||||
/// The name of the blocking task that executes a runtime API request.
|
||||
@@ -62,11 +63,6 @@ pub struct RuntimeApiSubsystem<Client> {
|
||||
client: Arc<Client>,
|
||||
metrics: Metrics,
|
||||
spawn_handle: Box<dyn SpawnNamed>,
|
||||
/// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed.
|
||||
waiting_requests: VecDeque<(
|
||||
Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
oneshot::Receiver<Option<RequestResult>>,
|
||||
)>,
|
||||
/// All the active runtime API requests that are currently being executed.
|
||||
active_requests: FuturesUnordered<oneshot::Receiver<Option<RequestResult>>>,
|
||||
/// Requests results cache
|
||||
@@ -84,7 +80,6 @@ impl<Client> RuntimeApiSubsystem<Client> {
|
||||
client,
|
||||
metrics,
|
||||
spawn_handle: Box::new(spawn_handle),
|
||||
waiting_requests: Default::default(),
|
||||
active_requests: Default::default(),
|
||||
requests_cache: RequestResultCache::default(),
|
||||
}
|
||||
@@ -276,13 +271,12 @@ where
|
||||
}
|
||||
|
||||
/// Spawn a runtime API request.
|
||||
///
|
||||
/// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered.
|
||||
fn spawn_request(&mut self, relay_parent: Hash, request: Request) {
|
||||
let client = self.client.clone();
|
||||
let metrics = self.metrics.clone();
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
// TODO: make the cache great again https://github.com/paritytech/polkadot/issues/5546
|
||||
let request = match self.query_cache(relay_parent.clone(), request) {
|
||||
Some(request) => request,
|
||||
None => return,
|
||||
@@ -294,21 +288,9 @@ where
|
||||
}
|
||||
.boxed();
|
||||
|
||||
if self.active_requests.len() >= MAX_PARALLEL_REQUESTS {
|
||||
self.waiting_requests.push_back((request, receiver));
|
||||
|
||||
if self.waiting_requests.len() > MAX_PARALLEL_REQUESTS * 10 {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
"{} runtime API requests waiting to be executed.",
|
||||
self.waiting_requests.len(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
self.spawn_handle
|
||||
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request);
|
||||
self.active_requests.push(receiver);
|
||||
}
|
||||
self.spawn_handle
|
||||
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request);
|
||||
self.active_requests.push(receiver);
|
||||
}
|
||||
|
||||
/// Poll the active runtime API requests.
|
||||
@@ -322,12 +304,11 @@ where
|
||||
if let Some(Ok(Some(result))) = self.active_requests.next().await {
|
||||
self.store_cache(result);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((req, recv)) = self.waiting_requests.pop_front() {
|
||||
self.spawn_handle
|
||||
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), req);
|
||||
self.active_requests.push(recv);
|
||||
}
|
||||
/// Returns true if our `active_requests` queue is full.
|
||||
fn is_busy(&self) -> bool {
|
||||
self.active_requests.len() >= MAX_PARALLEL_REQUESTS
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,6 +322,17 @@ where
|
||||
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
{
|
||||
loop {
|
||||
// Let's add some back pressure when the subsystem is running at `MAX_PARALLEL_REQUESTS`.
|
||||
// This can never block forever, because `active_requests` is owned by this task and any mutations
|
||||
// happen either in `poll_requests` or `spawn_request` - so if `is_busy` returns true, then
|
||||
// even if all of the requests finish before us calling `poll_requests` the `active_requests` length
|
||||
// remains invariant.
|
||||
if subsystem.is_busy() {
|
||||
// Since we are not using any internal waiting queues, we need to wait for exactly
|
||||
// one request to complete before we can read the next one from the overseer channel.
|
||||
let _ = subsystem.poll_requests().await;
|
||||
}
|
||||
|
||||
select! {
|
||||
req = ctx.recv().fuse() => match req? {
|
||||
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
|
||||
|
||||
Reference in New Issue
Block a user