mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-07 20:08:02 +00:00
Limit stagnant checks to a certain amount of entries (#5742)
* Limit number of elements loaded from the stagnant key This will likely be required if we enable stagnant prunning as currently database has way too many entries to be prunned in a single iteration * Fmt run * Slightly improve logging * Some more debug nits * Fmt pass
This commit is contained in:
@@ -46,10 +46,11 @@ pub(super) trait Backend {
|
||||
/// Load the stagnant list at the given timestamp.
|
||||
fn load_stagnant_at(&self, timestamp: Timestamp) -> Result<Vec<Hash>, Error>;
|
||||
/// Load all stagnant lists up to and including the given Unix timestamp
|
||||
/// in ascending order.
|
||||
/// in ascending order. Stop fetching stagnant entries upon reaching `max_elements`.
|
||||
fn load_stagnant_at_up_to(
|
||||
&self,
|
||||
up_to: Timestamp,
|
||||
max_elements: usize,
|
||||
) -> Result<Vec<(Timestamp, Vec<Hash>)>, Error>;
|
||||
/// Load the earliest kept block number.
|
||||
fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error>;
|
||||
|
||||
@@ -229,6 +229,7 @@ impl Backend for DbBackend {
|
||||
fn load_stagnant_at_up_to(
|
||||
&self,
|
||||
up_to: crate::Timestamp,
|
||||
max_elements: usize,
|
||||
) -> Result<Vec<(crate::Timestamp, Vec<Hash>)>, Error> {
|
||||
let stagnant_at_iter =
|
||||
self.inner.iter_with_prefix(self.config.col_data, &STAGNANT_AT_PREFIX[..]);
|
||||
@@ -240,7 +241,9 @@ impl Backend for DbBackend {
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.take_while(|(at, _)| *at <= up_to.into())
|
||||
.enumerate()
|
||||
.take_while(|(idx, (at, _))| *at <= up_to.into() && *idx < max_elements)
|
||||
.map(|(_, v)| v)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(val)
|
||||
@@ -528,7 +531,10 @@ mod tests {
|
||||
let mut backend = DbBackend::new(db, config);
|
||||
|
||||
// Prove that it's cheap
|
||||
assert!(backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap().is_empty());
|
||||
assert!(backend
|
||||
.load_stagnant_at_up_to(Timestamp::max_value(), usize::MAX)
|
||||
.unwrap()
|
||||
.is_empty());
|
||||
|
||||
backend
|
||||
.write(vec![
|
||||
@@ -539,7 +545,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap(),
|
||||
backend.load_stagnant_at_up_to(Timestamp::max_value(), usize::MAX).unwrap(),
|
||||
vec![
|
||||
(2, vec![Hash::repeat_byte(1)]),
|
||||
(5, vec![Hash::repeat_byte(2)]),
|
||||
@@ -548,7 +554,7 @@ mod tests {
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
backend.load_stagnant_at_up_to(10).unwrap(),
|
||||
backend.load_stagnant_at_up_to(10, usize::MAX).unwrap(),
|
||||
vec![
|
||||
(2, vec![Hash::repeat_byte(1)]),
|
||||
(5, vec![Hash::repeat_byte(2)]),
|
||||
@@ -557,21 +563,26 @@ mod tests {
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
backend.load_stagnant_at_up_to(9).unwrap(),
|
||||
backend.load_stagnant_at_up_to(9, usize::MAX).unwrap(),
|
||||
vec![(2, vec![Hash::repeat_byte(1)]), (5, vec![Hash::repeat_byte(2)]),]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
backend.load_stagnant_at_up_to(9, 1).unwrap(),
|
||||
vec![(2, vec![Hash::repeat_byte(1)]),]
|
||||
);
|
||||
|
||||
backend.write(vec![BackendWriteOp::DeleteStagnantAt(2)]).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
backend.load_stagnant_at_up_to(5).unwrap(),
|
||||
backend.load_stagnant_at_up_to(5, usize::MAX).unwrap(),
|
||||
vec![(5, vec![Hash::repeat_byte(2)]),]
|
||||
);
|
||||
|
||||
backend.write(vec![BackendWriteOp::WriteStagnantAt(5, vec![])]).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
backend.load_stagnant_at_up_to(10).unwrap(),
|
||||
backend.load_stagnant_at_up_to(10, usize::MAX).unwrap(),
|
||||
vec![(10, vec![Hash::repeat_byte(3)]),]
|
||||
);
|
||||
}
|
||||
|
||||
@@ -50,6 +50,8 @@ type Timestamp = u64;
|
||||
// If a block isn't approved in 120 seconds, nodes will abandon it
|
||||
// and begin building on another chain.
|
||||
const STAGNANT_TIMEOUT: Timestamp = 120;
|
||||
// Maximum number of stagnant entries cleaned during one `STAGNANT_TIMEOUT` iteration
|
||||
const MAX_STAGNANT_ENTRIES: usize = 1000;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum Approval {
|
||||
@@ -435,7 +437,7 @@ where
|
||||
}
|
||||
}
|
||||
_ = stagnant_check_stream.next().fuse() => {
|
||||
detect_stagnant(backend, clock.timestamp_now())?;
|
||||
detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -637,9 +639,13 @@ fn handle_approved_block(backend: &mut impl Backend, approved_block: Hash) -> Re
|
||||
backend.write(ops)
|
||||
}
|
||||
|
||||
fn detect_stagnant(backend: &mut impl Backend, now: Timestamp) -> Result<(), Error> {
|
||||
fn detect_stagnant(
|
||||
backend: &mut impl Backend,
|
||||
now: Timestamp,
|
||||
max_elements: usize,
|
||||
) -> Result<(), Error> {
|
||||
let ops = {
|
||||
let overlay = tree::detect_stagnant(&*backend, now)?;
|
||||
let overlay = tree::detect_stagnant(&*backend, now, max_elements)?;
|
||||
|
||||
overlay.into_write_ops()
|
||||
};
|
||||
|
||||
@@ -139,13 +139,16 @@ impl Backend for TestBackend {
|
||||
fn load_stagnant_at_up_to(
|
||||
&self,
|
||||
up_to: Timestamp,
|
||||
max_elements: usize,
|
||||
) -> Result<Vec<(Timestamp, Vec<Hash>)>, Error> {
|
||||
Ok(self
|
||||
.inner
|
||||
.lock()
|
||||
.stagnant_at
|
||||
.range(..=up_to)
|
||||
.map(|(t, v)| (*t, v.clone()))
|
||||
.enumerate()
|
||||
.take_while(|(idx, _)| *idx < max_elements)
|
||||
.map(|(_, (t, v))| (*t, v.clone()))
|
||||
.collect())
|
||||
}
|
||||
fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error> {
|
||||
|
||||
@@ -534,12 +534,28 @@ pub(super) fn approve_block(
|
||||
pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
|
||||
backend: &'a B,
|
||||
up_to: Timestamp,
|
||||
max_elements: usize,
|
||||
) -> Result<OverlayedBackend<'a, B>, Error> {
|
||||
let stagnant_up_to = backend.load_stagnant_at_up_to(up_to)?;
|
||||
let stagnant_up_to = backend.load_stagnant_at_up_to(up_to, max_elements)?;
|
||||
let mut backend = OverlayedBackend::new(backend);
|
||||
|
||||
let (min_ts, max_ts) = match stagnant_up_to.len() {
|
||||
0 => (0 as Timestamp, 0 as Timestamp),
|
||||
1 => (stagnant_up_to[0].0, stagnant_up_to[0].0),
|
||||
n => (stagnant_up_to[0].0, stagnant_up_to[n - 1].0),
|
||||
};
|
||||
|
||||
// As this is in ascending order, only the earliest stagnant
|
||||
// blocks will involve heavy viability propagations.
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
?up_to,
|
||||
?min_ts,
|
||||
?max_ts,
|
||||
"Prepared {} stagnant entries for pruning",
|
||||
stagnant_up_to.len()
|
||||
);
|
||||
|
||||
for (timestamp, maybe_stagnant) in stagnant_up_to {
|
||||
backend.delete_stagnant_at(timestamp);
|
||||
|
||||
@@ -550,12 +566,27 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
|
||||
entry.viability.approval = Approval::Stagnant;
|
||||
}
|
||||
let is_viable = entry.viability.is_viable();
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?block_hash,
|
||||
?timestamp,
|
||||
?was_viable,
|
||||
?is_viable,
|
||||
"Found existing stagnant entry"
|
||||
);
|
||||
|
||||
if was_viable && !is_viable {
|
||||
propagate_viability_update(&mut backend, entry)?;
|
||||
} else {
|
||||
backend.write_block_entry(entry);
|
||||
}
|
||||
} else {
|
||||
gum::trace!(
|
||||
target: LOG_TARGET,
|
||||
?block_hash,
|
||||
?timestamp,
|
||||
"Found non-existing stagnant entry"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user