Fix stalling dispute coordinator. (#7125)

* Fix stalling dispute coordinator.

* Initialization.

---------

Co-authored-by: eskimor <eskimor@no-such-url.com>
This commit is contained in:
eskimor
2023-04-25 16:42:45 +02:00
committed by GitHub
parent 634b2f6ad2
commit 33dd2584df
@@ -16,7 +16,10 @@
//! Dispute coordinator subsystem in initialized state (after first active leaf is received).
use std::{collections::BTreeMap, sync::Arc};
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
use futures::{
channel::{mpsc, oneshot},
@@ -65,6 +68,12 @@ use super::{
OverlayedBackend,
};
/// How many blocks we import votes from per leaf update.
///
/// Since vote import is relatively slow, we have to limit the maximum amount of work we do on leaf
/// updates (and especially on startup) so the dispute coordinator won't be considered stalling.
const CHAIN_IMPORT_MAX_BATCH_SIZE: usize = 8;
// Initial data for `dispute-coordinator`. It is provided only at first start.
pub struct InitialData {
pub participations: Vec<(ParticipationPriority, ParticipationRequest)>,
@@ -89,6 +98,17 @@ pub(crate) struct Initialized {
participation: Participation,
scraper: ChainScraper,
participation_receiver: WorkerMessageReceiver,
/// Backlog of still to be imported votes from chain.
///
/// For some reason importing votes is relatively slow, if there is a large finality lag (~50
/// blocks) we will be too slow importing all votes from unfinalized chains on startup
/// (dispute-coordinator gets killed because of unresponsiveness).
///
/// https://github.com/paritytech/polkadot/issues/6912
///
/// To resolve this, we limit the amount of votes imported at once to
/// `CHAIN_IMPORT_MAX_BATCH_SIZE` and put the rest here for later processing.
chain_import_backlog: VecDeque<ScrapedOnChainVotes>,
metrics: Metrics,
}
@@ -117,6 +137,7 @@ impl Initialized {
scraper,
participation,
participation_receiver,
chain_import_backlog: VecDeque::new(),
metrics,
}
}
@@ -168,24 +189,16 @@ impl Initialized {
}
let mut overlay_db = OverlayedBackend::new(backend);
for votes in on_chain_votes {
let _ = self
.process_on_chain_votes(
ctx,
&mut overlay_db,
votes,
clock.now(),
first_leaf.hash,
)
.await
.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
?error,
"Skipping scraping block due to error",
);
});
}
self.process_chain_import_backlog(
ctx,
&mut overlay_db,
on_chain_votes,
clock.now(),
first_leaf.hash,
)
.await;
if !overlay_db.is_empty() {
let ops = overlay_db.into_write_ops();
backend.write(ops)?;
@@ -344,26 +357,49 @@ impl Initialized {
scraped_updates.on_chain_votes.len()
);
// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel
for votes in scraped_updates.on_chain_votes {
let _ = self
.process_on_chain_votes(ctx, overlay_db, votes, now, new_leaf.hash)
.await
.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
?error,
"Skipping scraping block due to error",
);
});
}
self.process_chain_import_backlog(
ctx,
overlay_db,
scraped_updates.on_chain_votes,
now,
new_leaf.hash,
)
.await;
}
gum::trace!(target: LOG_TARGET, timestamp = now, "Done processing ActiveLeavesUpdate");
Ok(())
}
/// Process one batch of our `chain_import_backlog`.
///
/// `new_votes` will be appended beforehand.
async fn process_chain_import_backlog<Context>(
&mut self,
ctx: &mut Context,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
new_votes: Vec<ScrapedOnChainVotes>,
now: u64,
block_hash: Hash,
) {
let mut chain_import_backlog = std::mem::take(&mut self.chain_import_backlog);
chain_import_backlog.extend(new_votes);
let import_range =
0..std::cmp::min(CHAIN_IMPORT_MAX_BATCH_SIZE, chain_import_backlog.len());
// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel
for votes in chain_import_backlog.drain(import_range) {
let res = self.process_on_chain_votes(ctx, overlay_db, votes, now, block_hash).await;
match res {
Ok(()) => {},
Err(error) => {
gum::warn!(target: LOG_TARGET, ?error, "Skipping scraping block due to error",);
},
};
}
self.chain_import_backlog = chain_import_backlog;
}
/// Scrapes on-chain votes (backing votes and concluded disputes) for a active leaf of the
/// relay chain.
async fn process_on_chain_votes<Context>(