From 9ce186227ced123e0ce3516858bd9c790c3ea4e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 30 Nov 2020 11:41:47 +0100 Subject: [PATCH] Process runtime api requests in the background (#2035) This pr changes how the runtime api subsystem processes runtime api requests. Instead of answering all of them in the subsystem task and thus, making all requests sequential, we now answer them in a background task. This enables us to serve multiple requests at once. --- polkadot/Cargo.lock | 3 - polkadot/node/core/runtime-api/Cargo.toml | 1 + polkadot/node/core/runtime-api/src/lib.rs | 100 ++++++++++++++-------- polkadot/node/service/Cargo.toml | 3 - polkadot/node/service/src/lib.rs | 1 + 5 files changed, 64 insertions(+), 44 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 1300be292e..7a53fea791 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5450,12 +5450,10 @@ dependencies = [ "futures 0.3.8", "hex-literal", "kusama-runtime", - "lazy_static", "pallet-babe", "pallet-im-online", "pallet-staking", "pallet-transaction-payment-rpc-runtime-api", - "parking_lot 0.11.1", "polkadot-availability-bitfield-distribution", "polkadot-availability-distribution", "polkadot-collator-protocol", @@ -5497,7 +5495,6 @@ dependencies = [ "sc-telemetry", "sc-transaction-pool", "serde", - "slog", "sp-api", "sp-authority-discovery", "sp-block-builder", diff --git a/polkadot/node/core/runtime-api/Cargo.toml b/polkadot/node/core/runtime-api/Cargo.toml index abf88e0634..3fd5a7be01 100644 --- a/polkadot/node/core/runtime-api/Cargo.toml +++ b/polkadot/node/core/runtime-api/Cargo.toml @@ -9,6 +9,7 @@ futures = "0.3.8" tracing = "0.1.22" tracing-futures = "0.2.4" sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-primitives = { path = "../../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 5fb8ac73cb..38b78648f1 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -30,15 +30,14 @@ use polkadot_subsystem::{ }, errors::RuntimeApiError, }; -use polkadot_node_subsystem_util::{ - metrics::{self, prometheus}, -}; +use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost}; -use std::sync::Arc; -use sp_api::{ProvideRuntimeApi}; +use sp_api::ProvideRuntimeApi; +use sp_core::traits::SpawnNamed; use futures::prelude::*; +use std::sync::Arc; const LOG_TARGET: &str = "runtime_api"; @@ -46,12 +45,13 @@ const LOG_TARGET: &str = "runtime_api"; pub struct RuntimeApiSubsystem { client: Arc, metrics: Metrics, + spawn_handle: Box, } impl RuntimeApiSubsystem { /// Create a new Runtime API subsystem wrapping the given client and metrics. - pub fn new(client: Arc, metrics: Metrics) -> Self { - RuntimeApiSubsystem { client, metrics } + pub fn new(client: Arc, metrics: Metrics, spawn_handle: impl SpawnNamed + 'static) -> Self { + RuntimeApiSubsystem { client, metrics, spawn_handle: Box::new(spawn_handle) } } } @@ -73,7 +73,7 @@ async fn run( mut ctx: impl SubsystemContext, subsystem: RuntimeApiSubsystem, ) -> SubsystemResult<()> where - Client: ProvideRuntimeApi, + Client: ProvideRuntimeApi + Send + Sync + 'static, Client::Api: ParachainHost, { loop { @@ -82,12 +82,19 @@ async fn run( FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}, FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, FromOverseer::Communication { msg } => match msg { - RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request( - &*subsystem.client, - &subsystem.metrics, - relay_parent, - request, - ), + RuntimeApiMessage::Request(relay_parent, request) => { + let client = subsystem.client.clone(); + let metrics = subsystem.metrics.clone(); + + subsystem.spawn_handle.spawn_blocking("polkadot-runtime-api-request", async move { + make_runtime_api_request( + client, + metrics, + relay_parent, + request, + ) + }.boxed()) + }, } } } @@ -95,8 +102,8 @@ async fn run( #[tracing::instrument(level = "trace", skip(client, metrics), fields(subsystem = LOG_TARGET))] fn make_runtime_api_request( - client: &Client, - metrics: &Metrics, + client: Arc, + metrics: Metrics, relay_parent: Hash, request: Request, ) where @@ -347,8 +354,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -370,8 +378,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -393,8 +402,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -414,14 +424,16 @@ mod tests { #[test] fn requests_persisted_validation_data() { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); - let mut runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); - Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default()); + let mut runtime_api = MockRuntimeApi::default(); + runtime_api.validation_data.insert(para_a, Default::default()); + let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -454,14 +466,16 @@ mod tests { #[test] fn requests_full_validation_data() { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); - let mut runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); - Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default()); + let mut runtime_api = MockRuntimeApi::default(); + runtime_api.validation_data.insert(para_a, Default::default()); + let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -499,13 +513,14 @@ mod tests { let para_a = 5.into(); let para_b = 6.into(); let commitments = polkadot_primitives::v1::CandidateCommitments::default(); + let spawner = sp_core::testing::TaskExecutor::new(); runtime_api.validation_outputs_results.insert(para_a, false); runtime_api.validation_outputs_results.insert(para_b, true); let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -552,8 +567,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -577,10 +593,11 @@ mod tests { let session_index = 1; runtime_api.session_info.insert(session_index, Default::default()); let runtime_api = Arc::new(runtime_api); + let spawner = sp_core::testing::TaskExecutor::new(); let relay_parent = [1; 32].into(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -600,14 +617,17 @@ mod tests { #[test] fn requests_validation_code() { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); - let mut runtime_api = Arc::new(MockRuntimeApi::default()); + let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); - Arc::get_mut(&mut runtime_api).unwrap().validation_code.insert(para_a, Default::default()); + let mut runtime_api = MockRuntimeApi::default(); + runtime_api.validation_code.insert(para_a, Default::default()); + let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -640,16 +660,16 @@ mod tests { #[test] fn requests_candidate_pending_availability() { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); - let mut runtime_api = MockRuntimeApi::default(); let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); + let mut runtime_api = MockRuntimeApi::default(); runtime_api.candidate_pending_availability.insert(para_a, Default::default()); - let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -685,8 +705,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -710,6 +731,7 @@ mod tests { let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); let runtime_api = Arc::new({ let mut runtime_api = MockRuntimeApi::default(); @@ -726,7 +748,7 @@ mod tests { runtime_api }); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -766,6 +788,7 @@ mod tests { let para_a = 99.into(); let para_b = 66.into(); let para_c = 33.into(); + let spawner = sp_core::testing::TaskExecutor::new(); let para_b_inbound_channels = [ (para_a, vec![]), @@ -792,7 +815,7 @@ mod tests { runtime_api }); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -830,6 +853,7 @@ mod tests { let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); let runtime_api = Arc::new({ let mut runtime_api = MockRuntimeApi::default(); @@ -848,7 +872,7 @@ mod tests { }); let relay_parent = [1; 32].into(); - let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { { diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 10f89d66ef..74a55445ef 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -54,12 +54,9 @@ prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https: # External Crates futures = "0.3.8" hex-literal = "0.3.1" -lazy_static = "1.4.0" tracing = "0.1.22" tracing-futures = "0.2.4" -parking_lot = "0.11.1" serde = { version = "1.0.117", features = ["derive"] } -slog = "2.5.2" # Polkadot polkadot-node-core-proposer = { path = "../core/proposer" } diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 7c882eb458..e59994cdcc 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -406,6 +406,7 @@ where runtime_api: RuntimeApiSubsystem::new( runtime_client, Metrics::register(registry)?, + spawner.clone(), ), statement_distribution: StatementDistributionSubsystem::new( Metrics::register(registry)?,