Don't cache unavailable results. (#4509)

This commit is contained in:
Robert Klotzner
2021-12-10 23:52:20 +01:00
committed by GitHub
parent 77d70ac0d3
commit 34339c6805
@@ -20,6 +20,7 @@
use std::{
collections::{HashMap, VecDeque},
convert::TryFrom,
pin::Pin,
time::Duration,
};
@@ -686,6 +687,38 @@ impl Future for RecoveryHandle {
}
}
/// Cached result of an availability recovery operation.
#[derive(Debug, Clone)]
enum CachedRecovery {
/// Availability was successfully retrieved before.
Valid(AvailableData),
/// Availability was successfully retrieved before, but was found to be invalid.
Invalid,
}
impl CachedRecovery {
/// Convert back to `Result` to deliver responses.
fn into_result(self) -> Result<AvailableData, RecoveryError> {
match self {
Self::Valid(d) => Ok(d),
Self::Invalid => Err(RecoveryError::Invalid),
}
}
}
impl TryFrom<Result<AvailableData, RecoveryError>> for CachedRecovery {
type Error = ();
fn try_from(o: Result<AvailableData, RecoveryError>) -> Result<CachedRecovery, Self::Error> {
match o {
Ok(d) => Ok(Self::Valid(d)),
Err(RecoveryError::Invalid) => Ok(Self::Invalid),
// We don't want to cache unavailable state, as that state might change, so if
// requested again we want to try again!
Err(RecoveryError::Unavailable) => Err(()),
}
}
}
struct State {
/// Each recovery task is implemented as its own async task,
/// and these handles are for communicating with them.
@@ -695,7 +728,7 @@ struct State {
live_block: (BlockNumber, Hash),
/// An LRU cache of recently recovered data.
availability_lru: LruCache<CandidateHash, Result<AvailableData, RecoveryError>>,
availability_lru: LruCache<CandidateHash, CachedRecovery>,
}
impl Default for State {
@@ -812,8 +845,10 @@ where
let span = jaeger::Span::new(candidate_hash, "availbility-recovery")
.with_stage(jaeger::Stage::AvailabilityRecovery);
if let Some(result) = state.availability_lru.get(&candidate_hash) {
if let Err(e) = response_sender.send(result.clone()) {
if let Some(result) =
state.availability_lru.get(&candidate_hash).cloned().map(|v| v.into_result())
{
if let Err(e) = response_sender.send(result) {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
@@ -972,7 +1007,9 @@ impl AvailabilityRecoverySubsystem {
}
output = state.ongoing_recoveries.select_next_some() => {
if let Some((candidate_hash, result)) = output {
state.availability_lru.put(candidate_hash, result);
if let Ok(recovery) = CachedRecovery::try_from(result) {
state.availability_lru.put(candidate_hash, recovery);
}
}
}
}