Process runtime api requests in the background (#2035)

This pr changes how the runtime api subsystem processes runtime api
requests. Instead of answering all of them in the subsystem task and
thus, making all requests sequential, we now answer them in a background
task. This enables us to serve multiple requests at once.
This commit is contained in:
Bastian Köcher
2020-11-30 11:41:47 +01:00
committed by GitHub
parent f2606dbd4b
commit 9ce186227c
5 changed files with 64 additions and 44 deletions
@@ -9,6 +9,7 @@ futures = "0.3.8"
tracing = "0.1.22"
tracing-futures = "0.2.4"
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
+62 -38
View File
@@ -30,15 +30,14 @@ use polkadot_subsystem::{
},
errors::RuntimeApiError,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
use std::sync::Arc;
use sp_api::{ProvideRuntimeApi};
use sp_api::ProvideRuntimeApi;
use sp_core::traits::SpawnNamed;
use futures::prelude::*;
use std::sync::Arc;
const LOG_TARGET: &str = "runtime_api";
@@ -46,12 +45,13 @@ const LOG_TARGET: &str = "runtime_api";
pub struct RuntimeApiSubsystem<Client> {
client: Arc<Client>,
metrics: Metrics,
spawn_handle: Box<dyn SpawnNamed>,
}
impl<Client> RuntimeApiSubsystem<Client> {
/// Create a new Runtime API subsystem wrapping the given client and metrics.
pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
RuntimeApiSubsystem { client, metrics }
pub fn new(client: Arc<Client>, metrics: Metrics, spawn_handle: impl SpawnNamed + 'static) -> Self {
RuntimeApiSubsystem { client, metrics, spawn_handle: Box::new(spawn_handle) }
}
}
@@ -73,7 +73,7 @@ async fn run<Client>(
mut ctx: impl SubsystemContext<Message = RuntimeApiMessage>,
subsystem: RuntimeApiSubsystem<Client>,
) -> SubsystemResult<()> where
Client: ProvideRuntimeApi<Block>,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block>,
{
loop {
@@ -82,12 +82,19 @@ async fn run<Client>(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
FromOverseer::Communication { msg } => match msg {
RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request(
&*subsystem.client,
&subsystem.metrics,
relay_parent,
request,
),
RuntimeApiMessage::Request(relay_parent, request) => {
let client = subsystem.client.clone();
let metrics = subsystem.metrics.clone();
subsystem.spawn_handle.spawn_blocking("polkadot-runtime-api-request", async move {
make_runtime_api_request(
client,
metrics,
relay_parent,
request,
)
}.boxed())
},
}
}
}
@@ -95,8 +102,8 @@ async fn run<Client>(
#[tracing::instrument(level = "trace", skip(client, metrics), fields(subsystem = LOG_TARGET))]
fn make_runtime_api_request<Client>(
client: &Client,
metrics: &Metrics,
client: Arc<Client>,
metrics: Metrics,
relay_parent: Hash,
request: Request,
) where
@@ -347,8 +354,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -370,8 +378,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -393,8 +402,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -414,14 +424,16 @@ mod tests {
#[test]
fn requests_persisted_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_data.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -454,14 +466,16 @@ mod tests {
#[test]
fn requests_full_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_data.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -499,13 +513,14 @@ mod tests {
let para_a = 5.into();
let para_b = 6.into();
let commitments = polkadot_primitives::v1::CandidateCommitments::default();
let spawner = sp_core::testing::TaskExecutor::new();
runtime_api.validation_outputs_results.insert(para_a, false);
runtime_api.validation_outputs_results.insert(para_b, true);
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -552,8 +567,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -577,10 +593,11 @@ mod tests {
let session_index = 1;
runtime_api.session_info.insert(session_index, Default::default());
let runtime_api = Arc::new(runtime_api);
let spawner = sp_core::testing::TaskExecutor::new();
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -600,14 +617,17 @@ mod tests {
#[test]
fn requests_validation_code() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
Arc::get_mut(&mut runtime_api).unwrap().validation_code.insert(para_a, Default::default());
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_code.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -640,16 +660,16 @@ mod tests {
#[test]
fn requests_candidate_pending_availability() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let mut runtime_api = MockRuntimeApi::default();
runtime_api.candidate_pending_availability.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -685,8 +705,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -710,6 +731,7 @@ mod tests {
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
@@ -726,7 +748,7 @@ mod tests {
runtime_api
});
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -766,6 +788,7 @@ mod tests {
let para_a = 99.into();
let para_b = 66.into();
let para_c = 33.into();
let spawner = sp_core::testing::TaskExecutor::new();
let para_b_inbound_channels = [
(para_a, vec![]),
@@ -792,7 +815,7 @@ mod tests {
runtime_api
});
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -830,6 +853,7 @@ mod tests {
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
@@ -848,7 +872,7 @@ mod tests {
});
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
{