From 1508024a47096cbc6fafa1921a080f5761bc1dbb Mon Sep 17 00:00:00 2001 From: Robert Klotzner Date: Thu, 6 May 2021 16:15:23 +0200 Subject: [PATCH] Some overdue cleanup (#2989) * Cleanup obsolete code. * Move session cache to requester. --- .../availability-distribution/src/error.rs | 37 +----- .../availability-distribution/src/lib.rs | 15 +-- .../src/requester/fetch_task/mod.rs | 2 +- .../src/requester/mod.rs | 26 ++-- .../src/{ => requester}/session_cache.rs | 115 ++++-------------- 5 files changed, 47 insertions(+), 148 deletions(-) rename polkadot/node/network/availability-distribution/src/{ => requester}/session_cache.rs (67%) diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index 6b6f62ae98..893bb6bc46 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -18,13 +18,12 @@ //! Error handling related code and Error/Result definitions. use polkadot_node_network_protocol::request_response::request::RequestError; -use polkadot_primitives::v1::SessionIndex; use thiserror::Error; use futures::channel::oneshot; -use polkadot_node_subsystem_util::{Fault, Error as UtilError, runtime, unwrap_non_fatal}; -use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError}; +use polkadot_node_subsystem_util::{Fault, runtime, unwrap_non_fatal}; +use polkadot_subsystem::SubsystemError; use crate::LOG_TARGET; @@ -57,10 +56,6 @@ pub enum Fatal { #[error("Spawning subsystem task failed")] SpawnTask(#[source] SubsystemError), - /// Runtime API subsystem is down, which means we're shutting down. - #[error("Runtime request canceled")] - RuntimeRequestCanceled(oneshot::Canceled), - /// Requester stream exhausted. #[error("Erasure chunk requester stream exhausted")] RequesterExhausted, @@ -88,24 +83,10 @@ pub enum NonFatal { #[error("Session is not cached.")] NoSuchCachedSession, - /// We tried reporting bad validators, although we are not a validator ourselves. - #[error("Not a validator.")] - NotAValidator, - /// Sending request response failed (Can happen on timeouts for example). #[error("Sending a request's response failed.")] SendResponse, - /// Some request to utility functions failed. - /// This can be either `RuntimeRequestCanceled` or `RuntimeApiError`. - #[error("Utility request failed")] - UtilRequest(UtilError), - - /// Some request to the runtime failed. - /// For example if we prune a block we're requesting info about. - #[error("Runtime API error")] - RuntimeRequest(RuntimeApiError), - /// Fetching PoV failed with `RequestError`. #[error("FetchPoV request error")] FetchPoV(#[source] RequestError), @@ -121,10 +102,6 @@ pub enum NonFatal { #[error("Given validator index could not be found")] InvalidValidatorIndex, - /// We tried fetching a session info which was not available. - #[error("There was no session with the given index")] - NoSuchSession(SessionIndex), - /// Errors coming from runtime::Runtime. #[error("Error while accessing runtime information")] Runtime(#[from] #[source] runtime::NonFatal), @@ -144,13 +121,3 @@ pub fn log_error(result: Result<()>, ctx: &'static str) } Ok(()) } - -/// Receive a response from a runtime request and convert errors. -pub(crate) async fn recv_runtime( - r: oneshot::Receiver>, -) -> Result { - let result = r.await - .map_err(Fatal::RuntimeRequestCanceled)? - .map_err(NonFatal::RuntimeRequest)?; - Ok(result) -} diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 9ebc0af130..bb2341ab3f 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -25,7 +25,7 @@ use polkadot_subsystem::{ /// Error and [`Result`] type for this subsystem. mod error; -pub use error::{Fatal, NonFatal}; +use error::Fatal; use error::{Result, log_error}; use polkadot_node_subsystem_util::runtime::RuntimeInfo; @@ -42,9 +42,6 @@ use pov_requester::PoVRequester; mod responder; use responder::{answer_chunk_request_log, answer_pov_request_log}; -/// Cache for session information. -mod session_cache; - mod metrics; /// Prometheus `Metrics` for availability distribution. pub use metrics::Metrics; @@ -56,8 +53,6 @@ const LOG_TARGET: &'static str = "parachain::availability-distribution"; /// The availability distribution subsystem. pub struct AvailabilityDistributionSubsystem { - /// Pointer to a keystore, which is required for determining this nodes validator index. - keystore: SyncCryptoStorePtr, /// Easy and efficient runtime access for this subsystem. runtime: RuntimeInfo, /// Prometheus metrics. @@ -85,8 +80,8 @@ impl AvailabilityDistributionSubsystem { /// Create a new instance of the availability distribution. pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self { - let runtime = RuntimeInfo::new(Some(keystore.clone())); - Self { keystore, runtime, metrics } + let runtime = RuntimeInfo::new(Some(keystore)); + Self { runtime, metrics } } /// Start processing work as passed on from the Overseer. @@ -94,7 +89,7 @@ impl AvailabilityDistributionSubsystem { where Context: SubsystemContext + Sync + Send, { - let mut requester = Requester::new(self.keystore.clone(), self.metrics.clone()).fuse(); + let mut requester = Requester::new(self.metrics.clone()).fuse(); let mut pov_requester = PoVRequester::new(); loop { let action = { @@ -131,7 +126,7 @@ impl AvailabilityDistributionSubsystem { ); } log_error( - requester.get_mut().update_fetching_heads(&mut ctx, update).await, + requester.get_mut().update_fetching_heads(&mut ctx, &mut self.runtime, update).await, "Error in Requester::update_fetching_heads" )?; } diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 8fe099770f..bdb51bdd2a 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -35,7 +35,7 @@ use polkadot_subsystem::{SubsystemContext, jaeger}; use crate::{ error::{Fatal, Result}, - session_cache::{BadValidators, SessionInfo}, + requester::session_cache::{BadValidators, SessionInfo}, LOG_TARGET, metrics::{Metrics, SUCCEEDED, FAILED}, }; diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 052b7cded4..b91297987b 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -30,15 +30,17 @@ use futures::{ Stream, }; -use sp_keystore::SyncCryptoStorePtr; - -use polkadot_node_subsystem_util::runtime::get_occupied_cores; +use polkadot_node_subsystem_util::runtime::{RuntimeInfo, get_occupied_cores}; use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore}; use polkadot_subsystem::{ messages::AllMessages, ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf, }; -use super::{session_cache::SessionCache, LOG_TARGET, Metrics}; +use super::{LOG_TARGET, Metrics}; + +/// Cache for session information. +mod session_cache; +use session_cache::SessionCache; /// A task fetching a particular chunk. @@ -76,12 +78,12 @@ impl Requester { /// /// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress /// by advancing the stream. - #[tracing::instrument(level = "trace", skip(keystore, metrics), fields(subsystem = LOG_TARGET))] - pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self { + #[tracing::instrument(level = "trace", skip(metrics), fields(subsystem = LOG_TARGET))] + pub fn new(metrics: Metrics) -> Self { let (tx, rx) = mpsc::channel(1); Requester { fetches: HashMap::new(), - session_cache: SessionCache::new(keystore), + session_cache: SessionCache::new(), tx, rx, metrics, @@ -90,10 +92,11 @@ impl Requester { /// Update heads that need availability distribution. /// /// For all active heads we will be fetching our chunks for availabilty distribution. - #[tracing::instrument(level = "trace", skip(self, ctx, update), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, ctx, runtime, update), fields(subsystem = LOG_TARGET))] pub async fn update_fetching_heads( &mut self, ctx: &mut Context, + runtime: &mut RuntimeInfo, update: ActiveLeavesUpdate, ) -> super::Result<()> where @@ -110,7 +113,7 @@ impl Requester { } = update; // Order important! We need to handle activated, prior to deactivated, otherwise we might // cancel still needed jobs. - self.start_requesting_chunks(ctx, activated.into_iter()).await?; + self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?; self.stop_requesting_chunks(deactivated.into_iter()); Ok(()) } @@ -119,6 +122,7 @@ impl Requester { async fn start_requesting_chunks( &mut self, ctx: &mut Context, + runtime: &mut RuntimeInfo, new_heads: impl Iterator, ) -> super::Result<()> where @@ -131,7 +135,7 @@ impl Requester { occupied_cores = ?cores, "Query occupied core" ); - self.add_cores(ctx, leaf, cores).await?; + self.add_cores(ctx, runtime, leaf, cores).await?; } Ok(()) } @@ -156,6 +160,7 @@ impl Requester { async fn add_cores( &mut self, ctx: &mut Context, + runtime: &mut RuntimeInfo, leaf: Hash, cores: impl IntoIterator, ) -> super::Result<()> @@ -177,6 +182,7 @@ impl Requester { .session_cache .with_session_info( ctx, + runtime, // We use leaf here, as relay_parent must be in the same session as the // leaf. (Cores are dropped at session boundaries.) At the same time, // only leaves are guaranteed to be fetchable by the state trie. diff --git a/polkadot/node/network/availability-distribution/src/session_cache.rs b/polkadot/node/network/availability-distribution/src/requester/session_cache.rs similarity index 67% rename from polkadot/node/network/availability-distribution/src/session_cache.rs rename to polkadot/node/network/availability-distribution/src/requester/session_cache.rs index bbd40e2a5d..380471566f 100644 --- a/polkadot/node/network/availability-distribution/src/session_cache.rs +++ b/polkadot/node/network/availability-distribution/src/requester/session_cache.rs @@ -19,47 +19,28 @@ use std::collections::HashSet; use lru::LruCache; use rand::{seq::SliceRandom, thread_rng}; -use sp_application_crypto::AppKey; -use sp_core::crypto::Public; -use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; - -use polkadot_node_subsystem_util::{ - request_session_index_for_child, request_session_info, -}; -use polkadot_primitives::v1::SessionInfo as GlobalSessionInfo; +use polkadot_node_subsystem_util::runtime::RuntimeInfo; use polkadot_primitives::v1::{ - AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex, ValidatorId, ValidatorIndex, + AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex, ValidatorIndex, }; use polkadot_subsystem::SubsystemContext; -use super::{ - error::{recv_runtime, Error, NonFatal}, +use crate::{ + error::{Error, NonFatal}, LOG_TARGET, }; -/// Caching of session info as needed by availability distribution. +/// Caching of session info as needed by availability chunk distribution. /// /// It should be ensured that a cached session stays live in the cache as long as we might need it. pub struct SessionCache { - /// Get the session index for a given relay parent. - /// - /// We query this up to a 100 times per block, so caching it here without roundtrips over the - /// overseer seems sensible. - session_index_cache: LruCache, /// Look up cached sessions by SessionIndex. /// /// Note: Performance of fetching is really secondary here, but we need to ensure we are going /// to get any existing cache entry, before fetching new information, as we should not mess up - /// the order of validators in `SessionInfo::validator_groups`. (We want live TCP connections - /// wherever possible.) - /// - /// We store `None` in case we are not a validator, so we won't do needless fetches for non - /// validator nodes. - session_info_cache: LruCache>, - - /// Key store for determining whether we are a validator and what `ValidatorIndex` we have. - keystore: SyncCryptoStorePtr, + /// the order of validators in `SessionInfo::validator_groups`. + session_info_cache: LruCache, } /// Localized session information, tailored for the needs of availability distribution. @@ -101,13 +82,10 @@ pub struct BadValidators { impl SessionCache { /// Create a new `SessionCache`. - pub fn new(keystore: SyncCryptoStorePtr) -> Self { + pub fn new() -> Self { SessionCache { - // 5 relatively conservative, 1 to 2 should suffice: - session_index_cache: LruCache::new(5), // We need to cache the current and the last session the most: session_info_cache: LruCache::new(2), - keystore, } } @@ -117,10 +95,11 @@ impl SessionCache { /// /// Use this function over any `fetch_session_info` if all you need is a reference to /// `SessionInfo`, as it avoids an expensive clone. - #[tracing::instrument(level = "trace", skip(self, ctx, with_info), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, ctx, runtime, with_info), fields(subsystem = LOG_TARGET))] pub async fn with_session_info( &mut self, ctx: &mut Context, + runtime: &mut RuntimeInfo, parent: Hash, with_info: F, ) -> Result, Error> @@ -128,40 +107,23 @@ impl SessionCache { Context: SubsystemContext, F: FnOnce(&SessionInfo) -> R, { - let session_index = match self.session_index_cache.get(&parent) { - Some(index) => *index, - None => { - let index = - recv_runtime(request_session_index_for_child(parent, ctx.sender()).await) - .await?; - self.session_index_cache.put(parent, index); - index - } - }; + let session_index = runtime.get_session_index(ctx, parent).await?; if let Some(o_info) = self.session_info_cache.get(&session_index) { tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru"); - if let Some(info) = o_info { - return Ok(Some(with_info(info))); - } else { - // Info was cached - we are not a validator: return early: - return Ok(None) - } + return Ok(Some(with_info(o_info))); } if let Some(info) = self - .query_info_from_runtime(ctx, parent, session_index) + .query_info_from_runtime(ctx, runtime, parent, session_index) .await? { tracing::trace!(target: LOG_TARGET, session_index, "Calling `with_info`"); let r = with_info(&info); tracing::trace!(target: LOG_TARGET, session_index, "Storing session info in lru!"); - self.session_info_cache.put(session_index, Some(info)); + self.session_info_cache.put(session_index, info); Ok(Some(r)) } else { - // Avoid needless fetches if we are not a validator: - self.session_info_cache.put(session_index, None); - tracing::trace!(target: LOG_TARGET, session_index, "No session info found!"); Ok(None) } } @@ -185,13 +147,11 @@ impl SessionCache { /// We assume validators in a group are tried in reverse order, so the reported bad validators /// will be put at the beginning of the group. #[tracing::instrument(level = "trace", skip(self, report), fields(subsystem = LOG_TARGET))] - pub fn report_bad(&mut self, report: BadValidators) -> super::Result<()> { + pub fn report_bad(&mut self, report: BadValidators) -> crate::Result<()> { let session = self .session_info_cache .get_mut(&report.session_index) - .ok_or(NonFatal::NoSuchCachedSession)? - .as_mut() - .ok_or(NonFatal::NotAValidator)?; + .ok_or(NonFatal::NoSuchCachedSession)?; let group = session .validator_groups .get_mut(report.group_index.0 as usize) @@ -218,36 +178,21 @@ impl SessionCache { async fn query_info_from_runtime( &self, ctx: &mut Context, + runtime: &mut RuntimeInfo, parent: Hash, session_index: SessionIndex, ) -> Result, Error> where Context: SubsystemContext, { - let GlobalSessionInfo { - validators, - discovery_keys, - mut validator_groups, - .. - } = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await) - .await? - .ok_or(NonFatal::NoSuchSession(session_index))?; + let info = runtime.get_session_info_by_index(ctx, parent, session_index).await?; - if let Some(our_index) = self.get_our_index(validators).await { + let discovery_keys = info.session_info.discovery_keys.clone(); + let mut validator_groups = info.session_info.validator_groups.clone(); + + if let Some(our_index) = info.validator_info.our_index { // Get our group index: - let our_group = validator_groups - .iter() - .enumerate() - .find_map(|(i, g)| { - g.iter().find_map(|v| { - if *v == our_index { - Some(GroupIndex(i as u32)) - } else { - None - } - }) - } - ); + let our_group = info.validator_info.our_group; // Shuffle validators in groups: let mut rng = thread_rng(); @@ -279,18 +224,4 @@ impl SessionCache { } return Ok(None) } - - /// Get our `ValidatorIndex`. - /// - /// Returns: None if we are not a validator. - async fn get_our_index(&self, validators: Vec) -> Option { - for (i, v) in validators.iter().enumerate() { - if CryptoStore::has_keys(&*self.keystore, &[(v.to_raw_vec(), ValidatorId::ID)]) - .await - { - return Some(ValidatorIndex(i as u32)); - } - } - None - } }