mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 15:47:58 +00:00
Add an upper number of maximum parallel runtime api requests (#2069)
* Add an upper number of maximum parallel runtime api requests Instead of spawning all runtime api requests in the background and using all wasm instances. This pr adds a maximum number of parallel requests. * Update node/core/runtime-api/src/lib.rs Co-authored-by: Sergei Shulepov <sergei@parity.io> * Review feedback * Increase instances * Add warning * Update node/core/runtime-api/src/lib.rs Co-authored-by: Sergei Shulepov <sergei@parity.io> Co-authored-by: Sergei Shulepov <sergei@parity.io>
This commit is contained in:
@@ -36,22 +36,38 @@ use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_core::traits::SpawnNamed;
|
||||
|
||||
use futures::prelude::*;
|
||||
use std::sync::Arc;
|
||||
use futures::{prelude::*, stream::FuturesUnordered, channel::oneshot, select};
|
||||
use std::{sync::Arc, collections::VecDeque, pin::Pin};
|
||||
|
||||
const LOG_TARGET: &str = "runtime_api";
|
||||
|
||||
/// The number of maximum runtime api requests can be executed in parallel. Further requests will be buffered.
|
||||
const MAX_PARALLEL_REQUESTS: usize = 4;
|
||||
|
||||
/// The name of the blocking task that executes a runtime api request.
|
||||
const API_REQUEST_TASK_NAME: &str = "polkadot-runtime-api-request";
|
||||
|
||||
/// The `RuntimeApiSubsystem`. See module docs for more details.
|
||||
pub struct RuntimeApiSubsystem<Client> {
|
||||
client: Arc<Client>,
|
||||
metrics: Metrics,
|
||||
spawn_handle: Box<dyn SpawnNamed>,
|
||||
/// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed.
|
||||
waiting_requests: VecDeque<(Pin<Box<dyn Future<Output = ()> + Send>>, oneshot::Receiver<()>)>,
|
||||
/// All the active runtime api requests that are currently being executed.
|
||||
active_requests: FuturesUnordered<oneshot::Receiver<()>>,
|
||||
}
|
||||
|
||||
impl<Client> RuntimeApiSubsystem<Client> {
|
||||
/// Create a new Runtime API subsystem wrapping the given client and metrics.
|
||||
pub fn new(client: Arc<Client>, metrics: Metrics, spawn_handle: impl SpawnNamed + 'static) -> Self {
|
||||
RuntimeApiSubsystem { client, metrics, spawn_handle: Box::new(spawn_handle) }
|
||||
RuntimeApiSubsystem {
|
||||
client,
|
||||
metrics,
|
||||
spawn_handle: Box::new(spawn_handle),
|
||||
waiting_requests: Default::default(),
|
||||
active_requests: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,34 +84,82 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Client> RuntimeApiSubsystem<Client> where
|
||||
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
|
||||
Client::Api: ParachainHost<Block>,
|
||||
{
|
||||
/// Spawn a runtime api request.
|
||||
///
|
||||
/// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered.
|
||||
fn spawn_request(&mut self, relay_parent: Hash, request: Request) {
|
||||
let client = self.client.clone();
|
||||
let metrics = self.metrics.clone();
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
let request = async move {
|
||||
make_runtime_api_request(
|
||||
client,
|
||||
metrics,
|
||||
relay_parent,
|
||||
request,
|
||||
);
|
||||
let _ = sender.send(());
|
||||
}.boxed();
|
||||
|
||||
if self.active_requests.len() >= MAX_PARALLEL_REQUESTS {
|
||||
self.waiting_requests.push_back((request, receiver));
|
||||
|
||||
if self.waiting_requests.len() > MAX_PARALLEL_REQUESTS * 10 {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"{} runtime api requests waiting to be executed.",
|
||||
self.waiting_requests.len(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, request);
|
||||
self.active_requests.push(receiver);
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll the active runtime api requests.
|
||||
async fn poll_requests(&mut self) {
|
||||
// If there are no active requests, this future should be pending forever.
|
||||
if self.active_requests.len() == 0 {
|
||||
return futures::pending!()
|
||||
}
|
||||
|
||||
// If there are active requests, this will always resolve to `Some(_)` when a request is finished.
|
||||
let _ = self.active_requests.next().await;
|
||||
|
||||
if let Some((req, recv)) = self.waiting_requests.pop_front() {
|
||||
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req);
|
||||
self.active_requests.push(recv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(ctx, subsystem), fields(subsystem = LOG_TARGET))]
|
||||
async fn run<Client>(
|
||||
mut ctx: impl SubsystemContext<Message = RuntimeApiMessage>,
|
||||
subsystem: RuntimeApiSubsystem<Client>,
|
||||
mut subsystem: RuntimeApiSubsystem<Client>,
|
||||
) -> SubsystemResult<()> where
|
||||
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||
Client::Api: ParachainHost<Block>,
|
||||
{
|
||||
loop {
|
||||
match ctx.recv().await? {
|
||||
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
|
||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
|
||||
FromOverseer::Communication { msg } => match msg {
|
||||
RuntimeApiMessage::Request(relay_parent, request) => {
|
||||
let client = subsystem.client.clone();
|
||||
let metrics = subsystem.metrics.clone();
|
||||
|
||||
subsystem.spawn_handle.spawn_blocking("polkadot-runtime-api-request", async move {
|
||||
make_runtime_api_request(
|
||||
client,
|
||||
metrics,
|
||||
relay_parent,
|
||||
request,
|
||||
)
|
||||
}.boxed())
|
||||
},
|
||||
}
|
||||
select! {
|
||||
req = ctx.recv().fuse() => match req? {
|
||||
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
|
||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
|
||||
FromOverseer::Communication { msg } => match msg {
|
||||
RuntimeApiMessage::Request(relay_parent, request) => {
|
||||
subsystem.spawn_request(relay_parent, request);
|
||||
},
|
||||
}
|
||||
},
|
||||
_ = subsystem.poll_requests().fuse() => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -213,7 +277,7 @@ mod tests {
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||
use sp_core::testing::TaskExecutor;
|
||||
use std::collections::{HashMap, BTreeMap};
|
||||
use std::{collections::{HashMap, BTreeMap}, sync::{Arc, Mutex}};
|
||||
use futures::channel::oneshot;
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
@@ -221,6 +285,7 @@ mod tests {
|
||||
validators: Vec<ValidatorId>,
|
||||
validator_groups: Vec<Vec<ValidatorIndex>>,
|
||||
availability_cores: Vec<CoreState>,
|
||||
availability_cores_wait: Arc<Mutex<()>>,
|
||||
validation_data: HashMap<ParaId, ValidationData>,
|
||||
session_index_for_child: SessionIndex,
|
||||
session_info: HashMap<SessionIndex, SessionInfo>,
|
||||
@@ -261,6 +326,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn availability_cores(&self) -> Vec<CoreState> {
|
||||
let _ = self.availability_cores_wait.lock().unwrap();
|
||||
self.availability_cores.clone()
|
||||
}
|
||||
|
||||
@@ -916,4 +982,44 @@ mod tests {
|
||||
|
||||
futures::executor::block_on(future::join(subsystem_task, test_task));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_requests_in_parallel_are_working() {
|
||||
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
|
||||
let runtime_api = Arc::new(MockRuntimeApi::default());
|
||||
let relay_parent = [1; 32].into();
|
||||
let spawner = sp_core::testing::TaskExecutor::new();
|
||||
let mutex = runtime_api.availability_cores_wait.clone();
|
||||
|
||||
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
|
||||
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
|
||||
let test_task = async move {
|
||||
// Make all requests block until we release this mutex.
|
||||
let lock = mutex.lock().unwrap();
|
||||
|
||||
let mut receivers = Vec::new();
|
||||
|
||||
for _ in 0..MAX_PARALLEL_REQUESTS * 10 {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
ctx_handle.send(FromOverseer::Communication {
|
||||
msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx))
|
||||
}).await;
|
||||
|
||||
receivers.push(rx);
|
||||
}
|
||||
|
||||
let join = future::join_all(receivers);
|
||||
|
||||
drop(lock);
|
||||
|
||||
join.await
|
||||
.into_iter()
|
||||
.for_each(|r| assert_eq!(r.unwrap().unwrap(), runtime_api.availability_cores));
|
||||
|
||||
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
|
||||
};
|
||||
|
||||
futures::executor::block_on(future::join(subsystem_task, test_task));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user