diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 3957789453..38400d4999 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -1317,6 +1317,33 @@ dependencies = [ "libc", ] +[[package]] +name = "ethbloom" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22a621dcebea74f2a6f2002d0a885c81ccf6cbdf86760183316a7722b5707ca4" +dependencies = [ + "crunchy", + "fixed-hash", + "impl-rlp", + "impl-serde", + "tiny-keccak", +] + +[[package]] +name = "ethereum-types" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05dc5f0df4915fa6dff7f975a8366ecfaaa8959c74235469495153e7bb1b280e" +dependencies = [ + "ethbloom", + "fixed-hash", + "impl-rlp", + "impl-serde", + "primitive-types", + "uint", +] + [[package]] name = "event-listener" version = "2.5.1" @@ -2350,6 +2377,15 @@ dependencies = [ "parity-scale-codec", ] +[[package]] +name = "impl-rlp" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28220f89297a075ddc7245cd538076ee98b01f2a9c23a53a4f1105d5a322808" +dependencies = [ + "rlp", +] + [[package]] name = "impl-serde" version = "0.3.1" @@ -3438,6 +3474,15 @@ dependencies = [ "parity-util-mem", ] +[[package]] +name = "memory-lru" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "beeb98b3d1ed2c0054bd81b5ba949a0243c3ccad751d45ea898fa8059fa2860a" +dependencies = [ + "lru", +] + [[package]] name = "memory_units" version = "0.3.0" @@ -4548,9 +4593,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f17f15cb05897127bf36a240085a1f0bbef7bce3024849eccf7f93f6171bc27" dependencies = [ "cfg-if 1.0.0", + "ethereum-types", "hashbrown", "impl-trait-for-tuples 0.2.0", "jemallocator", + "lru", "parity-util-mem-derive", "parking_lot 0.11.1", "primitive-types", @@ -5028,6 +5075,7 @@ name = "polkadot-core-primitives" version = "0.7.30" dependencies = [ "parity-scale-codec", + "parity-util-mem", "sp-core", "sp-runtime", "sp-std", @@ -5263,6 +5311,8 @@ name = "polkadot-node-core-runtime-api" version = "0.1.0" dependencies = [ "futures 0.3.12", + "memory-lru", + "parity-util-mem", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", @@ -5429,6 +5479,7 @@ dependencies = [ "futures 0.3.12", "log", "parity-scale-codec", + "parity-util-mem", "parking_lot 0.11.1", "polkadot-core-primitives", "sc-executor", @@ -5471,6 +5522,7 @@ dependencies = [ "frame-system", "hex-literal", "parity-scale-codec", + "parity-util-mem", "polkadot-core-primitives", "polkadot-parachain", "pretty_assertions", @@ -6029,6 +6081,7 @@ checksum = "b3824ae2c5e27160113b9e029a10ec9e3f0237bad8029f69c7724393c9fdefd8" dependencies = [ "fixed-hash", "impl-codec", + "impl-rlp", "impl-serde", "uint", ] @@ -6567,6 +6620,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "rlp" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e54369147e3e7796c9b885c7304db87ca3d09a0a98f72843d532868675bbfba8" +dependencies = [ + "bytes 1.0.1", + "rustc-hex", +] + [[package]] name = "rocksdb" version = "0.15.0" diff --git a/polkadot/core-primitives/Cargo.toml b/polkadot/core-primitives/Cargo.toml index 1f4e599f8e..88d1afcca9 100644 --- a/polkadot/core-primitives/Cargo.toml +++ b/polkadot/core-primitives/Cargo.toml @@ -9,6 +9,7 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", sp-std = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } parity-scale-codec = { version = "1.3.6", default-features = false, features = [ "derive" ] } +parity-util-mem = { version = "0.8.0", default-features = false, optional = true } [features] default = [ "std" ] @@ -17,4 +18,5 @@ std = [ "sp-runtime/std", "sp-std/std", "parity-scale-codec/std", + "parity-util-mem", ] diff --git a/polkadot/core-primitives/src/lib.rs b/polkadot/core-primitives/src/lib.rs index e96d0b4a5f..d72322cf3f 100644 --- a/polkadot/core-primitives/src/lib.rs +++ b/polkadot/core-primitives/src/lib.rs @@ -22,6 +22,8 @@ use sp_runtime::{generic, MultiSignature, traits::{Verify, BlakeTwo256, IdentifyAccount}}; use parity_scale_codec::{Encode, Decode}; +#[cfg(feature = "std")] +use parity_util_mem::MallocSizeOf; /// The block number type used by Polkadot. /// 32-bits will allow for 136 years of blocks assuming 1 block per second. @@ -57,6 +59,7 @@ pub type Hash = sp_core::H256; /// /// This type makes it easy to enforce that a hash is a candidate hash on the type level. #[derive(Clone, Copy, Encode, Decode, Hash, Eq, PartialEq, Debug, Default)] +#[cfg_attr(feature = "std", derive(MallocSizeOf))] pub struct CandidateHash(pub Hash); #[cfg(feature="std")] @@ -103,6 +106,7 @@ pub type DownwardMessage = sp_std::vec::Vec; /// A wrapped version of `DownwardMessage`. The difference is that it has attached the block number when /// the message was sent. #[derive(Encode, Decode, Clone, sp_runtime::RuntimeDebug, PartialEq)] +#[cfg_attr(feature = "std", derive(MallocSizeOf))] pub struct InboundDownwardMessage { /// The block number at which this messages was put into the downward message queue. pub sent_at: BlockNumber, @@ -112,6 +116,7 @@ pub struct InboundDownwardMessage { /// An HRMP message seen from the perspective of a recipient. #[derive(Encode, Decode, Clone, sp_runtime::RuntimeDebug, PartialEq)] +#[cfg_attr(feature = "std", derive(MallocSizeOf))] pub struct InboundHrmpMessage { /// The block number at which this message was sent. /// Specifically, it is the block number at which the candidate that sends this message was @@ -123,6 +128,7 @@ pub struct InboundHrmpMessage { /// An HRMP message seen from the perspective of a sender. #[derive(Encode, Decode, Clone, sp_runtime::RuntimeDebug, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "std", derive(MallocSizeOf))] pub struct OutboundHrmpMessage { /// The para that will get this message in its downward message queue. pub recipient: Id, diff --git a/polkadot/node/core/runtime-api/Cargo.toml b/polkadot/node/core/runtime-api/Cargo.toml index a7602a5a9e..ca13642769 100644 --- a/polkadot/node/core/runtime-api/Cargo.toml +++ b/polkadot/node/core/runtime-api/Cargo.toml @@ -8,6 +8,9 @@ edition = "2018" futures = "0.3.12" tracing = "0.1.22" tracing-futures = "0.2.4" +memory-lru = "0.1.0" +parity-util-mem = { version = "0.8.0", default-features = false } + sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/core/runtime-api/src/cache.rs b/polkadot/node/core/runtime-api/src/cache.rs new file mode 100644 index 0000000000..ac9e0e844e --- /dev/null +++ b/polkadot/node/core/runtime-api/src/cache.rs @@ -0,0 +1,208 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use polkadot_primitives::v1::{ + BlockNumber, CandidateCommitments, CommittedCandidateReceipt, CandidateEvent, + CoreState, GroupRotationInfo, InboundDownwardMessage, InboundHrmpMessage, Hash, + PersistedValidationData, Id as ParaId, OccupiedCoreAssumption, + SessionIndex, SessionInfo, ValidationCode, ValidatorId, ValidatorIndex, +}; +use parity_util_mem::{MallocSizeOf, MallocSizeOfExt}; + + +use memory_lru::{MemoryLruCache, ResidentSize}; + +use std::collections::btree_map::BTreeMap; + +const VALIDATORS_CACHE_SIZE: usize = 64 * 1024; +const VALIDATOR_GROUPS_CACHE_SIZE: usize = 64 * 1024; +const AVAILABILITY_CORES_CACHE_SIZE: usize = 64 * 1024; +const PERSISTED_VALIDATION_DATA_CACHE_SIZE: usize = 64 * 1024; +const CHECK_VALIDATION_OUTPUTS_CACHE_SIZE: usize = 64 * 1024; +const SESSION_INDEX_FOR_CHILD_CACHE_SIZE: usize = 64 * 1024; +const VALIDATION_CODE_CACHE_SIZE: usize = 10 * 1024 * 1024; +const HISTORICAL_VALIDATION_CODE_CACHE_SIZE: usize = 10 * 1024 * 1024; +const CANDIDATE_PENDING_AVAILABILITY_CACHE_SIZE: usize = 64 * 1024; +const CANDIDATE_EVENTS_CACHE_SIZE: usize = 64 * 1024; +const SESSION_INFO_CACHE_SIZE: usize = 64 * 1024; +const DMQ_CONTENTS_CACHE_SIZE: usize = 64 * 1024; +const INBOUND_HRMP_CHANNELS_CACHE_SIZE: usize = 64 * 1024; + +struct ResidentSizeOf(T); + +impl ResidentSize for ResidentSizeOf { + fn resident_size(&self) -> usize { + std::mem::size_of::() + self.0.malloc_size_of() + } +} + +pub(crate) struct RequestResultCache { + validators: MemoryLruCache>>, + validator_groups: MemoryLruCache>, GroupRotationInfo)>>, + availability_cores: MemoryLruCache>>, + persisted_validation_data: MemoryLruCache<(Hash, ParaId, OccupiedCoreAssumption), ResidentSizeOf>>, + check_validation_outputs: MemoryLruCache<(Hash, ParaId, CandidateCommitments), ResidentSizeOf>, + session_index_for_child: MemoryLruCache>, + validation_code: MemoryLruCache<(Hash, ParaId, OccupiedCoreAssumption), ResidentSizeOf>>, + historical_validation_code: MemoryLruCache<(Hash, ParaId, BlockNumber), ResidentSizeOf>>, + candidate_pending_availability: MemoryLruCache<(Hash, ParaId), ResidentSizeOf>>, + candidate_events: MemoryLruCache>>, + session_info: MemoryLruCache<(Hash, SessionIndex), ResidentSizeOf>>, + dmq_contents: MemoryLruCache<(Hash, ParaId), ResidentSizeOf>>>, + inbound_hrmp_channels_contents: MemoryLruCache<(Hash, ParaId), ResidentSizeOf>>>>, +} + +impl Default for RequestResultCache { + fn default() -> Self { + Self { + validators: MemoryLruCache::new(VALIDATORS_CACHE_SIZE), + validator_groups: MemoryLruCache::new(VALIDATOR_GROUPS_CACHE_SIZE), + availability_cores: MemoryLruCache::new(AVAILABILITY_CORES_CACHE_SIZE), + persisted_validation_data: MemoryLruCache::new(PERSISTED_VALIDATION_DATA_CACHE_SIZE), + check_validation_outputs: MemoryLruCache::new(CHECK_VALIDATION_OUTPUTS_CACHE_SIZE), + session_index_for_child: MemoryLruCache::new(SESSION_INDEX_FOR_CHILD_CACHE_SIZE), + validation_code: MemoryLruCache::new(VALIDATION_CODE_CACHE_SIZE), + historical_validation_code: MemoryLruCache::new(HISTORICAL_VALIDATION_CODE_CACHE_SIZE), + candidate_pending_availability: MemoryLruCache::new(CANDIDATE_PENDING_AVAILABILITY_CACHE_SIZE), + candidate_events: MemoryLruCache::new(CANDIDATE_EVENTS_CACHE_SIZE), + session_info: MemoryLruCache::new(SESSION_INFO_CACHE_SIZE), + dmq_contents: MemoryLruCache::new(DMQ_CONTENTS_CACHE_SIZE), + inbound_hrmp_channels_contents: MemoryLruCache::new(INBOUND_HRMP_CHANNELS_CACHE_SIZE), + } + } +} + +impl RequestResultCache { + pub(crate) fn validators(&mut self, relay_parent: &Hash) -> Option<&Vec> { + self.validators.get(relay_parent).map(|v| &v.0) + } + + pub(crate) fn cache_validators(&mut self, relay_parent: Hash, validators: Vec) { + self.validators.insert(relay_parent, ResidentSizeOf(validators)); + } + + pub(crate) fn validator_groups(&mut self, relay_parent: &Hash) -> Option<&(Vec>, GroupRotationInfo)> { + self.validator_groups.get(relay_parent).map(|v| &v.0) + } + + pub(crate) fn cache_validator_groups(&mut self, relay_parent: Hash, groups: (Vec>, GroupRotationInfo)) { + self.validator_groups.insert(relay_parent, ResidentSizeOf(groups)); + } + + pub(crate) fn availability_cores(&mut self, relay_parent: &Hash) -> Option<&Vec> { + self.availability_cores.get(relay_parent).map(|v| &v.0) + } + + pub(crate) fn cache_availability_cores(&mut self, relay_parent: Hash, cores: Vec) { + self.availability_cores.insert(relay_parent, ResidentSizeOf(cores)); + } + + pub(crate) fn persisted_validation_data(&mut self, key: (Hash, ParaId, OccupiedCoreAssumption)) -> Option<&Option> { + self.persisted_validation_data.get(&key).map(|v| &v.0) + } + + pub(crate) fn cache_persisted_validation_data(&mut self, key: (Hash, ParaId, OccupiedCoreAssumption), data: Option) { + self.persisted_validation_data.insert(key, ResidentSizeOf(data)); + } + + pub(crate) fn check_validation_outputs(&mut self, key: (Hash, ParaId, CandidateCommitments)) -> Option<&bool> { + self.check_validation_outputs.get(&key).map(|v| &v.0) + } + + pub(crate) fn cache_check_validation_outputs(&mut self, key: (Hash, ParaId, CandidateCommitments), value: bool) { + self.check_validation_outputs.insert(key, ResidentSizeOf(value)); + } + + pub(crate) fn session_index_for_child(&mut self, relay_parent: &Hash) -> Option<&SessionIndex> { + self.session_index_for_child.get(relay_parent).map(|v| &v.0) + } + + pub(crate) fn cache_session_index_for_child(&mut self, relay_parent: Hash, index: SessionIndex) { + self.session_index_for_child.insert(relay_parent, ResidentSizeOf(index)); + } + + pub(crate) fn validation_code(&mut self, key: (Hash, ParaId, OccupiedCoreAssumption)) -> Option<&Option> { + self.validation_code.get(&key).map(|v| &v.0) + } + + pub(crate) fn cache_validation_code(&mut self, key: (Hash, ParaId, OccupiedCoreAssumption), value: Option) { + self.validation_code.insert(key, ResidentSizeOf(value)); + } + + pub(crate) fn historical_validation_code(&mut self, key: (Hash, ParaId, BlockNumber)) -> Option<&Option> { + self.historical_validation_code.get(&key).map(|v| &v.0) + } + + pub(crate) fn cache_historical_validation_code(&mut self, key: (Hash, ParaId, BlockNumber), value: Option) { + self.historical_validation_code.insert(key, ResidentSizeOf(value)); + } + + pub(crate) fn candidate_pending_availability(&mut self, key: (Hash, ParaId)) -> Option<&Option> { + self.candidate_pending_availability.get(&key).map(|v| &v.0) + } + + pub(crate) fn cache_candidate_pending_availability(&mut self, key: (Hash, ParaId), value: Option) { + self.candidate_pending_availability.insert(key, ResidentSizeOf(value)); + } + + pub(crate) fn candidate_events(&mut self, relay_parent: &Hash) -> Option<&Vec> { + self.candidate_events.get(relay_parent).map(|v| &v.0) + } + + pub(crate) fn cache_candidate_events(&mut self, relay_parent: Hash, events: Vec) { + self.candidate_events.insert(relay_parent, ResidentSizeOf(events)); + } + + pub(crate) fn session_info(&mut self, key: (Hash, SessionIndex)) -> Option<&Option> { + self.session_info.get(&key).map(|v| &v.0) + } + + pub(crate) fn cache_session_info(&mut self, key: (Hash, SessionIndex), value: Option) { + self.session_info.insert(key, ResidentSizeOf(value)); + } + + pub(crate) fn dmq_contents(&mut self, key: (Hash, ParaId)) -> Option<&Vec>> { + self.dmq_contents.get(&key).map(|v| &v.0) + } + + pub(crate) fn cache_dmq_contents(&mut self, key: (Hash, ParaId), value: Vec>) { + self.dmq_contents.insert(key, ResidentSizeOf(value)); + } + + pub(crate) fn inbound_hrmp_channels_contents(&mut self, key: (Hash, ParaId)) -> Option<&BTreeMap>>> { + self.inbound_hrmp_channels_contents.get(&key).map(|v| &v.0) + } + + pub(crate) fn cache_inbound_hrmp_channel_contents(&mut self, key: (Hash, ParaId), value: BTreeMap>>) { + self.inbound_hrmp_channels_contents.insert(key, ResidentSizeOf(value)); + } +} + +pub(crate) enum RequestResult { + Validators(Hash, Vec), + ValidatorGroups(Hash, (Vec>, GroupRotationInfo)), + AvailabilityCores(Hash, Vec), + PersistedValidationData(Hash, ParaId, OccupiedCoreAssumption, Option), + CheckValidationOutputs(Hash, ParaId, CandidateCommitments, bool), + SessionIndexForChild(Hash, SessionIndex), + ValidationCode(Hash, ParaId, OccupiedCoreAssumption, Option), + HistoricalValidationCode(Hash, ParaId, BlockNumber, Option), + CandidatePendingAvailability(Hash, ParaId, Option), + CandidateEvents(Hash, Vec), + SessionInfo(Hash, SessionIndex, Option), + DmqContents(Hash, ParaId, Vec>), + InboundHrmpChannelsContents(Hash, ParaId, BTreeMap>>), +} diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 24a3a1e101..294b1be919 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -38,6 +38,9 @@ use sp_core::traits::SpawnNamed; use futures::{prelude::*, stream::FuturesUnordered, channel::oneshot, select}; use std::{sync::Arc, collections::VecDeque, pin::Pin}; +use cache::{RequestResult, RequestResultCache}; + +mod cache; const LOG_TARGET: &str = "runtime_api"; @@ -53,9 +56,14 @@ pub struct RuntimeApiSubsystem { metrics: Metrics, spawn_handle: Box, /// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed. - waiting_requests: VecDeque<(Pin + Send>>, oneshot::Receiver<()>)>, + waiting_requests: VecDeque<( + Pin + Send>>, + oneshot::Receiver>, + )>, /// All the active runtime api requests that are currently being executed. - active_requests: FuturesUnordered>, + active_requests: FuturesUnordered>>, + /// Requests results cache + requests_cache: RequestResultCache, } impl RuntimeApiSubsystem { @@ -67,6 +75,7 @@ impl RuntimeApiSubsystem { spawn_handle: Box::new(spawn_handle), waiting_requests: Default::default(), active_requests: Default::default(), + requests_cache: RequestResultCache::default(), } } } @@ -88,6 +97,102 @@ impl RuntimeApiSubsystem where Client: ProvideRuntimeApi + Send + 'static + Sync, Client::Api: ParachainHost, { + fn store_cache(&mut self, result: RequestResult) { + use RequestResult::*; + + match result { + Validators(relay_parent, validators) => + self.requests_cache.cache_validators(relay_parent, validators), + ValidatorGroups(relay_parent, groups) => + self.requests_cache.cache_validator_groups(relay_parent, groups), + AvailabilityCores(relay_parent, cores) => + self.requests_cache.cache_availability_cores(relay_parent, cores), + PersistedValidationData(relay_parent, para_id, assumption, data) => + self.requests_cache.cache_persisted_validation_data((relay_parent, para_id, assumption), data), + CheckValidationOutputs(relay_parent, para_id, commitments, b) => + self.requests_cache.cache_check_validation_outputs((relay_parent, para_id, commitments), b), + SessionIndexForChild(relay_parent, session_index) => + self.requests_cache.cache_session_index_for_child(relay_parent, session_index), + ValidationCode(relay_parent, para_id, assumption, code) => + self.requests_cache.cache_validation_code((relay_parent, para_id, assumption), code), + HistoricalValidationCode(relay_parent, para_id, n, code) => + self.requests_cache.cache_historical_validation_code((relay_parent, para_id, n), code), + CandidatePendingAvailability(relay_parent, para_id, candidate) => + self.requests_cache.cache_candidate_pending_availability((relay_parent, para_id), candidate), + CandidateEvents(relay_parent, events) => + self.requests_cache.cache_candidate_events(relay_parent, events), + SessionInfo(relay_parent, session_index, info) => + self.requests_cache.cache_session_info((relay_parent, session_index), info), + DmqContents(relay_parent, para_id, messages) => + self.requests_cache.cache_dmq_contents((relay_parent, para_id), messages), + InboundHrmpChannelsContents(relay_parent, para_id, contents) => + self.requests_cache.cache_inbound_hrmp_channel_contents((relay_parent, para_id), contents), + } + } + + fn query_cache(&mut self, relay_parent: Hash, request: Request) -> Option { + macro_rules! query { + // Just query by relay parent + ($cache_api_name:ident (), $sender:expr) => {{ + let sender = $sender; + if let Some(value) = self.requests_cache.$cache_api_name(&relay_parent) { + let _ = sender.send(Ok(value.clone())); + self.metrics.on_cached_request(); + None + } else { + Some(sender) + } + }}; + // Query by relay parent + additional parameters + ($cache_api_name:ident ($($param:expr),+), $sender:expr) => {{ + let sender = $sender; + if let Some(value) = self.requests_cache.$cache_api_name((relay_parent.clone(), $($param.clone()),+)) { + self.metrics.on_cached_request(); + let _ = sender.send(Ok(value.clone())); + None + } else { + Some(sender) + } + }} + } + + match request { + Request::Validators(sender) => query!(validators(), sender) + .map(|sender| Request::Validators(sender)), + Request::ValidatorGroups(sender) => query!(validator_groups(), sender) + .map(|sender| Request::ValidatorGroups(sender)), + Request::AvailabilityCores(sender) => query!(availability_cores(), sender) + .map(|sender| Request::AvailabilityCores(sender)), + Request::PersistedValidationData(para, assumption, sender) => + query!(persisted_validation_data(para, assumption), sender) + .map(|sender| Request::PersistedValidationData(para, assumption, sender)), + Request::CheckValidationOutputs(para, commitments, sender) => + query!(check_validation_outputs(para, commitments), sender) + .map(|sender| Request::CheckValidationOutputs(para, commitments, sender)), + Request::SessionIndexForChild(sender) => + query!(session_index_for_child(), sender) + .map(|sender| Request::SessionIndexForChild(sender)), + Request::ValidationCode(para, assumption, sender) => + query!(validation_code(para, assumption), sender) + .map(|sender| Request::ValidationCode(para, assumption, sender)), + Request::HistoricalValidationCode(para, at, sender) => + query!(historical_validation_code(para, at), sender) + .map(|sender| Request::HistoricalValidationCode(para, at, sender)), + Request::CandidatePendingAvailability(para, sender) => + query!(candidate_pending_availability(para), sender) + .map(|sender| Request::CandidatePendingAvailability(para, sender)), + Request::CandidateEvents(sender) => query!(candidate_events(), sender) + .map(|sender| Request::CandidateEvents(sender)), + Request::SessionInfo(index, sender) => query!(session_info(index), sender) + .map(|sender| Request::SessionInfo(index, sender)), + Request::DmqContents(id, sender) => query!(dmq_contents(id), sender) + .map(|sender| Request::DmqContents(id, sender)), + Request::InboundHrmpChannelsContents(id, sender) => + query!(inbound_hrmp_channels_contents(id), sender) + .map(|sender| Request::InboundHrmpChannelsContents(id, sender)) + } + } + /// Spawn a runtime api request. /// /// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered. @@ -96,14 +201,19 @@ impl RuntimeApiSubsystem where let metrics = self.metrics.clone(); let (sender, receiver) = oneshot::channel(); + let request = match self.query_cache(relay_parent.clone(), request) { + Some(request) => request, + None => return, + }; + let request = async move { - make_runtime_api_request( + let result = make_runtime_api_request( client, metrics, relay_parent, request, ); - let _ = sender.send(()); + let _ = sender.send(result); }.boxed(); if self.active_requests.len() >= MAX_PARALLEL_REQUESTS { @@ -130,7 +240,9 @@ impl RuntimeApiSubsystem where } // If there are active requests, this will always resolve to `Some(_)` when a request is finished. - let _ = self.active_requests.next().await; + if let Some(Ok(Some(result))) = self.active_requests.next().await { + self.store_cache(result); + } if let Some((req, recv)) = self.waiting_requests.pop_front() { self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req); @@ -170,42 +282,63 @@ fn make_runtime_api_request( metrics: Metrics, relay_parent: Hash, request: Request, -) where +) -> Option +where Client: ProvideRuntimeApi, Client::Api: ParachainHost, { let _timer = metrics.time_make_runtime_api_request(); macro_rules! query { - ($api_name:ident ($($param:expr),*), $sender:expr) => {{ + ($req_variant:ident, $api_name:ident (), $sender:expr) => {{ let sender = $sender; let api = client.runtime_api(); - let res = api.$api_name(&BlockId::Hash(relay_parent), $($param),*) + let res = api.$api_name(&BlockId::Hash(relay_parent)) .map_err(|e| RuntimeApiError::from(format!("{:?}", e))); metrics.on_request(res.is_ok()); - let _ = sender.send(res); + let _ = sender.send(res.clone()); + + if let Ok(res) = res { + Some(RequestResult::$req_variant(relay_parent, res.clone())) + } else { + None + } + }}; + ($req_variant:ident, $api_name:ident ($($param:expr),+), $sender:expr) => {{ + let sender = $sender; + let api = client.runtime_api(); + let res = api.$api_name(&BlockId::Hash(relay_parent), $($param.clone()),*) + .map_err(|e| RuntimeApiError::from(format!("{:?}", e))); + metrics.on_request(res.is_ok()); + let _ = sender.send(res.clone()); + + if let Ok(res) = res { + Some(RequestResult::$req_variant(relay_parent, $($param),+, res.clone())) + } else { + None + } }} } match request { - Request::Validators(sender) => query!(validators(), sender), - Request::ValidatorGroups(sender) => query!(validator_groups(), sender), - Request::AvailabilityCores(sender) => query!(availability_cores(), sender), + Request::Validators(sender) => query!(Validators, validators(), sender), + Request::ValidatorGroups(sender) => query!(ValidatorGroups, validator_groups(), sender), + Request::AvailabilityCores(sender) => query!(AvailabilityCores, availability_cores(), sender), Request::PersistedValidationData(para, assumption, sender) => - query!(persisted_validation_data(para, assumption), sender), + query!(PersistedValidationData, persisted_validation_data(para, assumption), sender), Request::CheckValidationOutputs(para, commitments, sender) => - query!(check_validation_outputs(para, commitments), sender), - Request::SessionIndexForChild(sender) => query!(session_index_for_child(), sender), + query!(CheckValidationOutputs, check_validation_outputs(para, commitments), sender), + Request::SessionIndexForChild(sender) => query!(SessionIndexForChild, session_index_for_child(), sender), Request::ValidationCode(para, assumption, sender) => - query!(validation_code(para, assumption), sender), + query!(ValidationCode, validation_code(para, assumption), sender), Request::HistoricalValidationCode(para, at, sender) => - query!(historical_validation_code(para, at), sender), + query!(HistoricalValidationCode, historical_validation_code(para, at), sender), Request::CandidatePendingAvailability(para, sender) => - query!(candidate_pending_availability(para), sender), - Request::CandidateEvents(sender) => query!(candidate_events(), sender), - Request::SessionInfo(index, sender) => query!(session_info(index), sender), - Request::DmqContents(id, sender) => query!(dmq_contents(id), sender), - Request::InboundHrmpChannelsContents(id, sender) => query!(inbound_hrmp_channels_contents(id), sender), + query!(CandidatePendingAvailability, candidate_pending_availability(para), sender), + Request::CandidateEvents(sender) => query!(CandidateEvents, candidate_events(), sender), + Request::SessionInfo(index, sender) => query!(SessionInfo, session_info(index), sender), + Request::DmqContents(id, sender) => query!(DmqContents, dmq_contents(id), sender), + Request::InboundHrmpChannelsContents(id, sender) => query!(InboundHrmpChannelsContents, inbound_hrmp_channels_contents(id), sender), } } @@ -230,6 +363,11 @@ impl Metrics { } } + fn on_cached_request(&self) { + self.0.as_ref() + .map(|metrics| metrics.chain_api_requests.with_label_values(&["cached"]).inc()); + } + /// Provide a timer for `make_runtime_api_request` which observes on drop. fn time_make_runtime_api_request(&self) -> Option { self.0.as_ref().map(|metrics| metrics.make_runtime_api_request.start_timer()) diff --git a/polkadot/parachain/Cargo.toml b/polkadot/parachain/Cargo.toml index a1619109ef..901fa2ec01 100644 --- a/polkadot/parachain/Cargo.toml +++ b/polkadot/parachain/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" # this crate for WASM. This is critical to avoid forcing all parachain WASM into implementing # various unnecessary Substrate-specific endpoints. parity-scale-codec = { version = "1.3.6", default-features = false, features = [ "derive" ] } +parity-util-mem = { version = "0.8.0", optional = true } sp-std = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } @@ -44,6 +45,7 @@ std = [ "sp-core/std", "parking_lot", "log", + "parity-util-mem", "sp-externalities", "sc-executor", "sp-io", diff --git a/polkadot/parachain/src/primitives.rs b/polkadot/parachain/src/primitives.rs index 991bc260fb..0304d59d61 100644 --- a/polkadot/parachain/src/primitives.rs +++ b/polkadot/parachain/src/primitives.rs @@ -28,6 +28,9 @@ use serde::{Serialize, Deserialize}; #[cfg(feature = "std")] use sp_core::bytes; +#[cfg(feature = "std")] +use parity_util_mem::MallocSizeOf; + use polkadot_core_primitives::{Hash, OutboundHrmpMessage}; /// Block number type used by the relay chain. @@ -35,7 +38,7 @@ pub use polkadot_core_primitives::BlockNumber as RelayChainBlockNumber; /// Parachain head data included in the chain. #[derive(PartialEq, Eq, Clone, PartialOrd, Ord, Encode, Decode, RuntimeDebug, derive_more::From)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Default, Hash))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Default, Hash, MallocSizeOf))] pub struct HeadData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); #[cfg(feature = "std")] @@ -49,14 +52,14 @@ impl HeadData { /// Parachain validation code. #[derive(Default, PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug, derive_more::From)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Hash))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Hash, MallocSizeOf))] pub struct ValidationCode(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); /// Parachain block data. /// /// Contains everything required to validate para-block, may contain block and witness data. #[derive(PartialEq, Eq, Clone, Encode, Decode, derive_more::From)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, MallocSizeOf))] pub struct BlockData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); /// Unique identifier of a parachain. @@ -64,7 +67,9 @@ pub struct BlockData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec usize { + 0 + } + fn constant_size() -> Option { + Some(0) + } +} + /// A Parachain collator keypair. #[cfg(feature = "std")] pub type CollatorPair = collator_app::Pair; @@ -67,6 +79,16 @@ pub type CollatorPair = collator_app::Pair; /// Signature on candidate's block data by a collator. pub type CollatorSignature = collator_app::Signature; +#[cfg(feature = "std")] +impl MallocSizeOf for CollatorSignature { + fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { + 0 + } + fn constant_size() -> Option { + Some(0) + } +} + /// The key type ID for a parachain validator key. pub const PARACHAIN_KEY_TYPE_ID: KeyTypeId = KeyTypeId(*b"para"); @@ -81,6 +103,16 @@ mod validator_app { /// so we define it to be the same type as `SessionKey`. In the future it may have different crypto. pub type ValidatorId = validator_app::Public; +#[cfg(feature = "std")] +impl MallocSizeOf for ValidatorId { + fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { + 0 + } + fn constant_size() -> Option { + Some(0) + } +} + /// Index of the validator is used as a lightweight replacement of the `ValidatorId` when appropriate. pub type ValidatorIndex = u32; @@ -95,6 +127,16 @@ application_crypto::with_pair! { /// so we define it to be the same type as `SessionKey`. In the future it may have different crypto. pub type ValidatorSignature = validator_app::Signature; +#[cfg(feature = "std")] +impl MallocSizeOf for ValidatorSignature { + fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { + 0 + } + fn constant_size() -> Option { + Some(0) + } +} + /// Retriability for a given active para. #[derive(Clone, Eq, PartialEq, Encode, Decode)] #[cfg_attr(feature = "std", derive(Debug))] diff --git a/polkadot/primitives/src/v1.rs b/polkadot/primitives/src/v1.rs index f2cbaa7d35..1208e88092 100644 --- a/polkadot/primitives/src/v1.rs +++ b/polkadot/primitives/src/v1.rs @@ -48,6 +48,9 @@ pub use crate::v0::{ CompactStatement, SignedStatement, ErasureChunk, EncodeAs, }; +#[cfg(feature = "std")] +use parity_util_mem::{MallocSizeOf, MallocSizeOfOps}; + // More exports from v0 for std. #[cfg(feature = "std")] pub use crate::v0::{ValidatorPair, CollatorPair}; @@ -150,6 +153,16 @@ mod assigment_app { /// to approve included parachain candidates. pub type AssignmentId = assigment_app::Public; +#[cfg(feature = "std")] +impl MallocSizeOf for AssignmentId { + fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { + 0 + } + fn constant_size() -> Option { + Some(0) + } +} + /// The index of the candidate in the list of candidates fully included as-of the block. pub type CandidateIndex = u32; @@ -195,7 +208,7 @@ fn check_collator_signature>( /// A unique descriptor of the candidate receipt. #[derive(PartialEq, Eq, Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Default, Hash))] +#[cfg_attr(feature = "std", derive(Debug, Default, Hash, MallocSizeOf))] pub struct CandidateDescriptor { /// The ID of the para this is a candidate for. pub para_id: Id, @@ -234,7 +247,7 @@ impl> CandidateDescriptor { /// A candidate-receipt. #[derive(PartialEq, Eq, Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Default))] +#[cfg_attr(feature = "std", derive(Debug, Default, MallocSizeOf))] pub struct CandidateReceipt { /// The descriptor of the candidate. pub descriptor: CandidateDescriptor, @@ -269,7 +282,7 @@ pub struct FullCandidateReceipt { /// A candidate-receipt with commitments directly included. #[derive(PartialEq, Eq, Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Default, Hash))] +#[cfg_attr(feature = "std", derive(Debug, Default, Hash, MallocSizeOf))] pub struct CommittedCandidateReceipt { /// The descriptor of the candidate. pub descriptor: CandidateDescriptor, @@ -342,7 +355,7 @@ impl Ord for CommittedCandidateReceipt { /// The `PersistedValidationData` should be relatively lightweight primarly because it is constructed /// during inclusion for each candidate and therefore lies on the critical path of inclusion. #[derive(PartialEq, Eq, Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Default))] +#[cfg_attr(feature = "std", derive(Debug, Default, MallocSizeOf))] pub struct PersistedValidationData { /// The parent head-data. pub parent_head: HeadData, @@ -372,7 +385,7 @@ impl PersistedValidationData { /// Commitments made in a `CandidateReceipt`. Many of these are outputs of validation. #[derive(PartialEq, Eq, Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Default, Hash))] +#[cfg_attr(feature = "std", derive(Debug, Default, Hash, MallocSizeOf))] pub struct CandidateCommitments { /// Messages destined to be interpreted by the Relay chain itself. pub upward_messages: Vec, @@ -519,7 +532,7 @@ impl From for CoreIndex { /// The unique (during session) index of a validator group. #[derive(Encode, Decode, Default, Clone, Copy, Debug)] -#[cfg_attr(feature = "std", derive(Eq, Hash, PartialEq))] +#[cfg_attr(feature = "std", derive(Eq, Hash, PartialEq, MallocSizeOf))] pub struct GroupIndex(pub u32); impl From for GroupIndex { @@ -565,7 +578,7 @@ pub struct AvailableData { /// A helper data-type for tracking validator-group rotations. #[derive(Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(PartialEq, Debug))] +#[cfg_attr(feature = "std", derive(PartialEq, Debug, MallocSizeOf))] pub struct GroupRotationInfo { /// The block number where the session started. pub session_start_block: N, @@ -614,7 +627,7 @@ impl GroupRotationInfo { /// Information about a core which is currently occupied. #[derive(Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, PartialEq))] +#[cfg_attr(feature = "std", derive(Debug, PartialEq, MallocSizeOf))] pub struct OccupiedCore { // NOTE: this has no ParaId as it can be deduced from the candidate descriptor. @@ -632,6 +645,7 @@ pub struct OccupiedCore { /// A bitfield with 1 bit for each validator in the set. `1` bits mean that the corresponding /// validators has attested to availability on-chain. A 2/3+ majority of `1` bits means that /// this will be available. + #[cfg_attr(feature = "std", ignore_malloc_size_of = "outside type")] pub availability: BitVec, /// The group assigned to distribute availability pieces of this candidate. pub group_responsible: GroupIndex, @@ -650,7 +664,7 @@ impl OccupiedCore { /// Information about a core which is currently occupied. #[derive(Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, PartialEq, Default))] +#[cfg_attr(feature = "std", derive(Debug, PartialEq, Default, MallocSizeOf))] pub struct ScheduledCore { /// The ID of a para scheduled. pub para_id: Id, @@ -660,7 +674,7 @@ pub struct ScheduledCore { /// The state of a particular availability core. #[derive(Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, PartialEq))] +#[cfg_attr(feature = "std", derive(Debug, PartialEq, MallocSizeOf))] pub enum CoreState { /// The core is currently occupied. #[codec(index = "0")] @@ -696,7 +710,7 @@ impl CoreState { /// An assumption being made about the state of an occupied core. #[derive(Clone, Copy, Encode, Decode)] -#[cfg_attr(feature = "std", derive(PartialEq, Debug))] +#[cfg_attr(feature = "std", derive(PartialEq, Eq, Hash, Debug))] pub enum OccupiedCoreAssumption { /// The candidate occupying the core was made available and included to free the core. #[codec(index = "0")] @@ -711,7 +725,7 @@ pub enum OccupiedCoreAssumption { /// An even concerning a candidate. #[derive(Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(PartialEq, Debug))] +#[cfg_attr(feature = "std", derive(PartialEq, Debug, MallocSizeOf))] pub enum CandidateEvent { /// This candidate receipt was backed in the most recent block. #[codec(index = "0")] @@ -726,11 +740,12 @@ pub enum CandidateEvent { /// Information about validator sets of a session. #[derive(Clone, Encode, Decode, RuntimeDebug)] -#[cfg_attr(feature = "std", derive(PartialEq, Default))] +#[cfg_attr(feature = "std", derive(PartialEq, Default, MallocSizeOf))] pub struct SessionInfo { /// Validators in canonical ordering. pub validators: Vec, /// Validators' authority discovery keys for the session in canonical ordering. + #[cfg_attr(feature = "std", ignore_malloc_size_of = "outside type")] pub discovery_keys: Vec, /// The assignment keys for validators. pub assignment_keys: Vec,