mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 18:07:58 +00:00
runtime-api subsystem lru cache (#2309)
* Add memory-lru cache to runtime-api * Add cache.rs * Adds MallocSizeOf * Review nits * Add a cached requests metric * More review nits * Some more review nits
This commit is contained in:
@@ -38,6 +38,9 @@ use sp_core::traits::SpawnNamed;
|
||||
|
||||
use futures::{prelude::*, stream::FuturesUnordered, channel::oneshot, select};
|
||||
use std::{sync::Arc, collections::VecDeque, pin::Pin};
|
||||
use cache::{RequestResult, RequestResultCache};
|
||||
|
||||
mod cache;
|
||||
|
||||
const LOG_TARGET: &str = "runtime_api";
|
||||
|
||||
@@ -53,9 +56,14 @@ pub struct RuntimeApiSubsystem<Client> {
|
||||
metrics: Metrics,
|
||||
spawn_handle: Box<dyn SpawnNamed>,
|
||||
/// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed.
|
||||
waiting_requests: VecDeque<(Pin<Box<dyn Future<Output = ()> + Send>>, oneshot::Receiver<()>)>,
|
||||
waiting_requests: VecDeque<(
|
||||
Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
oneshot::Receiver<Option<RequestResult>>,
|
||||
)>,
|
||||
/// All the active runtime api requests that are currently being executed.
|
||||
active_requests: FuturesUnordered<oneshot::Receiver<()>>,
|
||||
active_requests: FuturesUnordered<oneshot::Receiver<Option<RequestResult>>>,
|
||||
/// Requests results cache
|
||||
requests_cache: RequestResultCache,
|
||||
}
|
||||
|
||||
impl<Client> RuntimeApiSubsystem<Client> {
|
||||
@@ -67,6 +75,7 @@ impl<Client> RuntimeApiSubsystem<Client> {
|
||||
spawn_handle: Box::new(spawn_handle),
|
||||
waiting_requests: Default::default(),
|
||||
active_requests: Default::default(),
|
||||
requests_cache: RequestResultCache::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -88,6 +97,102 @@ impl<Client> RuntimeApiSubsystem<Client> where
|
||||
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
|
||||
Client::Api: ParachainHost<Block>,
|
||||
{
|
||||
fn store_cache(&mut self, result: RequestResult) {
|
||||
use RequestResult::*;
|
||||
|
||||
match result {
|
||||
Validators(relay_parent, validators) =>
|
||||
self.requests_cache.cache_validators(relay_parent, validators),
|
||||
ValidatorGroups(relay_parent, groups) =>
|
||||
self.requests_cache.cache_validator_groups(relay_parent, groups),
|
||||
AvailabilityCores(relay_parent, cores) =>
|
||||
self.requests_cache.cache_availability_cores(relay_parent, cores),
|
||||
PersistedValidationData(relay_parent, para_id, assumption, data) =>
|
||||
self.requests_cache.cache_persisted_validation_data((relay_parent, para_id, assumption), data),
|
||||
CheckValidationOutputs(relay_parent, para_id, commitments, b) =>
|
||||
self.requests_cache.cache_check_validation_outputs((relay_parent, para_id, commitments), b),
|
||||
SessionIndexForChild(relay_parent, session_index) =>
|
||||
self.requests_cache.cache_session_index_for_child(relay_parent, session_index),
|
||||
ValidationCode(relay_parent, para_id, assumption, code) =>
|
||||
self.requests_cache.cache_validation_code((relay_parent, para_id, assumption), code),
|
||||
HistoricalValidationCode(relay_parent, para_id, n, code) =>
|
||||
self.requests_cache.cache_historical_validation_code((relay_parent, para_id, n), code),
|
||||
CandidatePendingAvailability(relay_parent, para_id, candidate) =>
|
||||
self.requests_cache.cache_candidate_pending_availability((relay_parent, para_id), candidate),
|
||||
CandidateEvents(relay_parent, events) =>
|
||||
self.requests_cache.cache_candidate_events(relay_parent, events),
|
||||
SessionInfo(relay_parent, session_index, info) =>
|
||||
self.requests_cache.cache_session_info((relay_parent, session_index), info),
|
||||
DmqContents(relay_parent, para_id, messages) =>
|
||||
self.requests_cache.cache_dmq_contents((relay_parent, para_id), messages),
|
||||
InboundHrmpChannelsContents(relay_parent, para_id, contents) =>
|
||||
self.requests_cache.cache_inbound_hrmp_channel_contents((relay_parent, para_id), contents),
|
||||
}
|
||||
}
|
||||
|
||||
fn query_cache(&mut self, relay_parent: Hash, request: Request) -> Option<Request> {
|
||||
macro_rules! query {
|
||||
// Just query by relay parent
|
||||
($cache_api_name:ident (), $sender:expr) => {{
|
||||
let sender = $sender;
|
||||
if let Some(value) = self.requests_cache.$cache_api_name(&relay_parent) {
|
||||
let _ = sender.send(Ok(value.clone()));
|
||||
self.metrics.on_cached_request();
|
||||
None
|
||||
} else {
|
||||
Some(sender)
|
||||
}
|
||||
}};
|
||||
// Query by relay parent + additional parameters
|
||||
($cache_api_name:ident ($($param:expr),+), $sender:expr) => {{
|
||||
let sender = $sender;
|
||||
if let Some(value) = self.requests_cache.$cache_api_name((relay_parent.clone(), $($param.clone()),+)) {
|
||||
self.metrics.on_cached_request();
|
||||
let _ = sender.send(Ok(value.clone()));
|
||||
None
|
||||
} else {
|
||||
Some(sender)
|
||||
}
|
||||
}}
|
||||
}
|
||||
|
||||
match request {
|
||||
Request::Validators(sender) => query!(validators(), sender)
|
||||
.map(|sender| Request::Validators(sender)),
|
||||
Request::ValidatorGroups(sender) => query!(validator_groups(), sender)
|
||||
.map(|sender| Request::ValidatorGroups(sender)),
|
||||
Request::AvailabilityCores(sender) => query!(availability_cores(), sender)
|
||||
.map(|sender| Request::AvailabilityCores(sender)),
|
||||
Request::PersistedValidationData(para, assumption, sender) =>
|
||||
query!(persisted_validation_data(para, assumption), sender)
|
||||
.map(|sender| Request::PersistedValidationData(para, assumption, sender)),
|
||||
Request::CheckValidationOutputs(para, commitments, sender) =>
|
||||
query!(check_validation_outputs(para, commitments), sender)
|
||||
.map(|sender| Request::CheckValidationOutputs(para, commitments, sender)),
|
||||
Request::SessionIndexForChild(sender) =>
|
||||
query!(session_index_for_child(), sender)
|
||||
.map(|sender| Request::SessionIndexForChild(sender)),
|
||||
Request::ValidationCode(para, assumption, sender) =>
|
||||
query!(validation_code(para, assumption), sender)
|
||||
.map(|sender| Request::ValidationCode(para, assumption, sender)),
|
||||
Request::HistoricalValidationCode(para, at, sender) =>
|
||||
query!(historical_validation_code(para, at), sender)
|
||||
.map(|sender| Request::HistoricalValidationCode(para, at, sender)),
|
||||
Request::CandidatePendingAvailability(para, sender) =>
|
||||
query!(candidate_pending_availability(para), sender)
|
||||
.map(|sender| Request::CandidatePendingAvailability(para, sender)),
|
||||
Request::CandidateEvents(sender) => query!(candidate_events(), sender)
|
||||
.map(|sender| Request::CandidateEvents(sender)),
|
||||
Request::SessionInfo(index, sender) => query!(session_info(index), sender)
|
||||
.map(|sender| Request::SessionInfo(index, sender)),
|
||||
Request::DmqContents(id, sender) => query!(dmq_contents(id), sender)
|
||||
.map(|sender| Request::DmqContents(id, sender)),
|
||||
Request::InboundHrmpChannelsContents(id, sender) =>
|
||||
query!(inbound_hrmp_channels_contents(id), sender)
|
||||
.map(|sender| Request::InboundHrmpChannelsContents(id, sender))
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a runtime api request.
|
||||
///
|
||||
/// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered.
|
||||
@@ -96,14 +201,19 @@ impl<Client> RuntimeApiSubsystem<Client> where
|
||||
let metrics = self.metrics.clone();
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
let request = match self.query_cache(relay_parent.clone(), request) {
|
||||
Some(request) => request,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let request = async move {
|
||||
make_runtime_api_request(
|
||||
let result = make_runtime_api_request(
|
||||
client,
|
||||
metrics,
|
||||
relay_parent,
|
||||
request,
|
||||
);
|
||||
let _ = sender.send(());
|
||||
let _ = sender.send(result);
|
||||
}.boxed();
|
||||
|
||||
if self.active_requests.len() >= MAX_PARALLEL_REQUESTS {
|
||||
@@ -130,7 +240,9 @@ impl<Client> RuntimeApiSubsystem<Client> where
|
||||
}
|
||||
|
||||
// If there are active requests, this will always resolve to `Some(_)` when a request is finished.
|
||||
let _ = self.active_requests.next().await;
|
||||
if let Some(Ok(Some(result))) = self.active_requests.next().await {
|
||||
self.store_cache(result);
|
||||
}
|
||||
|
||||
if let Some((req, recv)) = self.waiting_requests.pop_front() {
|
||||
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req);
|
||||
@@ -170,42 +282,63 @@ fn make_runtime_api_request<Client>(
|
||||
metrics: Metrics,
|
||||
relay_parent: Hash,
|
||||
request: Request,
|
||||
) where
|
||||
) -> Option<RequestResult>
|
||||
where
|
||||
Client: ProvideRuntimeApi<Block>,
|
||||
Client::Api: ParachainHost<Block>,
|
||||
{
|
||||
let _timer = metrics.time_make_runtime_api_request();
|
||||
|
||||
macro_rules! query {
|
||||
($api_name:ident ($($param:expr),*), $sender:expr) => {{
|
||||
($req_variant:ident, $api_name:ident (), $sender:expr) => {{
|
||||
let sender = $sender;
|
||||
let api = client.runtime_api();
|
||||
let res = api.$api_name(&BlockId::Hash(relay_parent), $($param),*)
|
||||
let res = api.$api_name(&BlockId::Hash(relay_parent))
|
||||
.map_err(|e| RuntimeApiError::from(format!("{:?}", e)));
|
||||
metrics.on_request(res.is_ok());
|
||||
let _ = sender.send(res);
|
||||
let _ = sender.send(res.clone());
|
||||
|
||||
if let Ok(res) = res {
|
||||
Some(RequestResult::$req_variant(relay_parent, res.clone()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}};
|
||||
($req_variant:ident, $api_name:ident ($($param:expr),+), $sender:expr) => {{
|
||||
let sender = $sender;
|
||||
let api = client.runtime_api();
|
||||
let res = api.$api_name(&BlockId::Hash(relay_parent), $($param.clone()),*)
|
||||
.map_err(|e| RuntimeApiError::from(format!("{:?}", e)));
|
||||
metrics.on_request(res.is_ok());
|
||||
let _ = sender.send(res.clone());
|
||||
|
||||
if let Ok(res) = res {
|
||||
Some(RequestResult::$req_variant(relay_parent, $($param),+, res.clone()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}}
|
||||
}
|
||||
|
||||
match request {
|
||||
Request::Validators(sender) => query!(validators(), sender),
|
||||
Request::ValidatorGroups(sender) => query!(validator_groups(), sender),
|
||||
Request::AvailabilityCores(sender) => query!(availability_cores(), sender),
|
||||
Request::Validators(sender) => query!(Validators, validators(), sender),
|
||||
Request::ValidatorGroups(sender) => query!(ValidatorGroups, validator_groups(), sender),
|
||||
Request::AvailabilityCores(sender) => query!(AvailabilityCores, availability_cores(), sender),
|
||||
Request::PersistedValidationData(para, assumption, sender) =>
|
||||
query!(persisted_validation_data(para, assumption), sender),
|
||||
query!(PersistedValidationData, persisted_validation_data(para, assumption), sender),
|
||||
Request::CheckValidationOutputs(para, commitments, sender) =>
|
||||
query!(check_validation_outputs(para, commitments), sender),
|
||||
Request::SessionIndexForChild(sender) => query!(session_index_for_child(), sender),
|
||||
query!(CheckValidationOutputs, check_validation_outputs(para, commitments), sender),
|
||||
Request::SessionIndexForChild(sender) => query!(SessionIndexForChild, session_index_for_child(), sender),
|
||||
Request::ValidationCode(para, assumption, sender) =>
|
||||
query!(validation_code(para, assumption), sender),
|
||||
query!(ValidationCode, validation_code(para, assumption), sender),
|
||||
Request::HistoricalValidationCode(para, at, sender) =>
|
||||
query!(historical_validation_code(para, at), sender),
|
||||
query!(HistoricalValidationCode, historical_validation_code(para, at), sender),
|
||||
Request::CandidatePendingAvailability(para, sender) =>
|
||||
query!(candidate_pending_availability(para), sender),
|
||||
Request::CandidateEvents(sender) => query!(candidate_events(), sender),
|
||||
Request::SessionInfo(index, sender) => query!(session_info(index), sender),
|
||||
Request::DmqContents(id, sender) => query!(dmq_contents(id), sender),
|
||||
Request::InboundHrmpChannelsContents(id, sender) => query!(inbound_hrmp_channels_contents(id), sender),
|
||||
query!(CandidatePendingAvailability, candidate_pending_availability(para), sender),
|
||||
Request::CandidateEvents(sender) => query!(CandidateEvents, candidate_events(), sender),
|
||||
Request::SessionInfo(index, sender) => query!(SessionInfo, session_info(index), sender),
|
||||
Request::DmqContents(id, sender) => query!(DmqContents, dmq_contents(id), sender),
|
||||
Request::InboundHrmpChannelsContents(id, sender) => query!(InboundHrmpChannelsContents, inbound_hrmp_channels_contents(id), sender),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,6 +363,11 @@ impl Metrics {
|
||||
}
|
||||
}
|
||||
|
||||
fn on_cached_request(&self) {
|
||||
self.0.as_ref()
|
||||
.map(|metrics| metrics.chain_api_requests.with_label_values(&["cached"]).inc());
|
||||
}
|
||||
|
||||
/// Provide a timer for `make_runtime_api_request` which observes on drop.
|
||||
fn time_make_runtime_api_request(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.make_runtime_api_request.start_timer())
|
||||
|
||||
Reference in New Issue
Block a user