diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 1300be292e..7a53fea791 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5450,12 +5450,10 @@ dependencies = [ "futures 0.3.8", "hex-literal", "kusama-runtime", - "lazy_static", "pallet-babe", "pallet-im-online", "pallet-staking", "pallet-transaction-payment-rpc-runtime-api", - "parking_lot 0.11.1", "polkadot-availability-bitfield-distribution", "polkadot-availability-distribution", "polkadot-collator-protocol", @@ -5497,7 +5495,6 @@ dependencies = [ "sc-telemetry", "sc-transaction-pool", "serde", - "slog", "sp-api", "sp-authority-discovery", "sp-block-builder", diff --git a/polkadot/node/core/runtime-api/Cargo.toml b/polkadot/node/core/runtime-api/Cargo.toml index abf88e0634..3fd5a7be01 100644 --- a/polkadot/node/core/runtime-api/Cargo.toml +++ b/polkadot/node/core/runtime-api/Cargo.toml @@ -9,6 +9,7 @@ futures = "0.3.8" tracing = "0.1.22" tracing-futures = "0.2.4" sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-primitives = { path = "../../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 5fb8ac73cb..38b78648f1 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -30,15 +30,14 @@ use polkadot_subsystem::{ }, errors::RuntimeApiError, }; -use polkadot_node_subsystem_util::{ - metrics::{self, prometheus}, -}; +use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost}; -use std::sync::Arc; -use sp_api::{ProvideRuntimeApi}; +use sp_api::ProvideRuntimeApi; +use sp_core::traits::SpawnNamed; use futures::prelude::*; +use std::sync::Arc; const LOG_TARGET: &str = "runtime_api"; @@ -46,12 +45,13 @@ const LOG_TARGET: &str = "runtime_api"; pub struct RuntimeApiSubsystem { client: Arc, metrics: Metrics, + spawn_handle: Box, } impl RuntimeApiSubsystem { /// Create a new Runtime API subsystem wrapping the given client and metrics. - pub fn new(client: Arc, metrics: Metrics) -> Self { - RuntimeApiSubsystem { client, metrics } + pub fn new(client: Arc, metrics: Metrics, spawn_handle: impl SpawnNamed + 'static) -> Self { + RuntimeApiSubsystem { client, metrics, spawn_handle: Box::new(spawn_handle) } } } @@ -73,7 +73,7 @@ async fn run( mut ctx: impl SubsystemContext, subsystem: RuntimeApiSubsystem, ) -> SubsystemResult<()> where - Client: ProvideRuntimeApi, + Client: ProvideRuntimeApi + Send + Sync + 'static, Client::Api: ParachainHost, { loop { @@ -82,12 +82,19 @@ async fn run( FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}, FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, FromOverseer::Communication { msg } => match msg { - RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request( - &*subsystem.client, - &subsystem.metrics, - relay_parent, - request, - ), + RuntimeApiMessage::Request(relay_parent, request) => { + let client = subsystem.client.clone(); + let metrics = subsystem.metrics.clone(); + + subsystem.spawn_handle.spawn_blocking("polkadot-runtime-api-request", async move { + make_runtime_api_request( + client, + metrics, + relay_parent, + request, + ) + }.boxed()) + }, } } } @@ -95,8 +102,8 @@ async fn run( #[tracing::instrument(level = "trace", skip(client, metrics), fields(subsystem = LOG_TARGET))] fn make_runtime_api_request( - client: &Client, - metrics: &Metrics, + client: Arc, + metrics: Metrics, relay_parent: Hash, request: Request, ) where @@ -347,8 +354,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -370,8 +378,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -393,8 +402,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -414,14 +424,16 @@ mod tests { #[test] fn requests_persisted_validation_data() { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); - let mut runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); - Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default()); + let mut runtime_api = MockRuntimeApi::default(); + runtime_api.validation_data.insert(para_a, Default::default()); + let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -454,14 +466,16 @@ mod tests { #[test] fn requests_full_validation_data() { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); - let mut runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); - Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default()); + let mut runtime_api = MockRuntimeApi::default(); + runtime_api.validation_data.insert(para_a, Default::default()); + let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -499,13 +513,14 @@ mod tests { let para_a = 5.into(); let para_b = 6.into(); let commitments = polkadot_primitives::v1::CandidateCommitments::default(); + let spawner = sp_core::testing::TaskExecutor::new(); runtime_api.validation_outputs_results.insert(para_a, false); runtime_api.validation_outputs_results.insert(para_b, true); let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -552,8 +567,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -577,10 +593,11 @@ mod tests { let session_index = 1; runtime_api.session_info.insert(session_index, Default::default()); let runtime_api = Arc::new(runtime_api); + let spawner = sp_core::testing::TaskExecutor::new(); let relay_parent = [1; 32].into(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -600,14 +617,17 @@ mod tests { #[test] fn requests_validation_code() { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); - let mut runtime_api = Arc::new(MockRuntimeApi::default()); + let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); - Arc::get_mut(&mut runtime_api).unwrap().validation_code.insert(para_a, Default::default()); + let mut runtime_api = MockRuntimeApi::default(); + runtime_api.validation_code.insert(para_a, Default::default()); + let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -640,16 +660,16 @@ mod tests { #[test] fn requests_candidate_pending_availability() { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); - let mut runtime_api = MockRuntimeApi::default(); let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); + let mut runtime_api = MockRuntimeApi::default(); runtime_api.candidate_pending_availability.insert(para_a, Default::default()); - let runtime_api = Arc::new(runtime_api); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -685,8 +705,9 @@ mod tests { let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); let runtime_api = Arc::new(MockRuntimeApi::default()); let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -710,6 +731,7 @@ mod tests { let relay_parent = [1; 32].into(); let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); let runtime_api = Arc::new({ let mut runtime_api = MockRuntimeApi::default(); @@ -726,7 +748,7 @@ mod tests { runtime_api }); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -766,6 +788,7 @@ mod tests { let para_a = 99.into(); let para_b = 66.into(); let para_c = 33.into(); + let spawner = sp_core::testing::TaskExecutor::new(); let para_b_inbound_channels = [ (para_a, vec![]), @@ -792,7 +815,7 @@ mod tests { runtime_api }); - let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -830,6 +853,7 @@ mod tests { let para_a = 5.into(); let para_b = 6.into(); + let spawner = sp_core::testing::TaskExecutor::new(); let runtime_api = Arc::new({ let mut runtime_api = MockRuntimeApi::default(); @@ -848,7 +872,7 @@ mod tests { }); let relay_parent = [1; 32].into(); - let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None)); + let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None), spawner); let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { { diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 10f89d66ef..74a55445ef 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -54,12 +54,9 @@ prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https: # External Crates futures = "0.3.8" hex-literal = "0.3.1" -lazy_static = "1.4.0" tracing = "0.1.22" tracing-futures = "0.2.4" -parking_lot = "0.11.1" serde = { version = "1.0.117", features = ["derive"] } -slog = "2.5.2" # Polkadot polkadot-node-core-proposer = { path = "../core/proposer" } diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 7c882eb458..e59994cdcc 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -406,6 +406,7 @@ where runtime_api: RuntimeApiSubsystem::new( runtime_client, Metrics::register(registry)?, + spawner.clone(), ), statement_distribution: StatementDistributionSubsystem::new( Metrics::register(registry)?,