Rework dispute-coordinator to use RuntimeInfo for obtaining session information instead of RollingSessionWindow (#6968)

* Pass `SessionInfo` directly to `CandidateEnvironment::new` otherwise it should be an async function

* Replace calls to `RollingSessionWindow` with `RuntimeInfo`

Adjust `dispute-coordinator` initialization to use `RuntimeInfo`

* Modify `dispute-coordinator` initialization

* Pass `Hash` to `process_on_chain_votes` so that `RuntimeInfo` calls can be made

Remove some fixmes

* Pass `Hash` to `handle_import_statements` to perform `RuntimeInfo` calls

* remove todo comments

* Remove `error` from `Initialized`

Rework new session handling code

* Remove db code which is no longer used

* Update stale comment and remove unneeded type specification

* Cache SessionInfo on startup

* Use `DISPUTE_WINDOW` from primitives

* Fix caching in `process_active_leaves_update`

* handle_import_statements: leaf_hash -> block_hash

* Restore `ensure_available_session_info`

* Don't interrupt `process_on_chain_votes` if SessionInfo can't be fetched

* Small style improvements in logging

* process_on_chain_votes: leaf_hash -> block_hash

* Restore `note_earliest_session` - it is required to prune disputes and votes

* Cache new sessions only when there is an actual session change

* Fix tests

* `CandidateEnvironment::new` gets `session_idx` and fetches SessionInfo by itself to avoid the invariant where the input SessionIndex and SessionInfo parameters don't match

* Fix handling of missing session info

* Move sessions caching in `handle_startup` and fix tests

* Load `relay_parent` from db in `handle_import_statements` instead of passing it as a parameter via two functions

* Don't do two db reads

* Fix the twisted logic in `handle_import_statements`

* fixup

* Small style fix

* Decrease log levels for caching errors to debug and fix a typo

* Update outdated comment

* Remove `ensure_available_session_info`

* Load relay parent from db in `process_on_chain_votes`

* Revert "Load relay parent from db in `process_on_chain_votes`"

This reverts commit 978ad4f223d517faa7a7fbad96e3f8de4fa17501.

* Keep track of highest seen session and last session cached without gaps.

* Apply suggestions from code review

Co-authored-by: ordian <write@reusable.software>

* Handle session caching failure on startup correctly

* Update node/core/dispute-coordinator/src/initialized.rs

Co-authored-by: ordian <write@reusable.software>

* Simplify session caching retries

* Update stale comment

* Fix lower bound calculation for session caching

---------

Co-authored-by: ordian <write@reusable.software>
This commit is contained in:
Tsvetomir Dimitrov
2023-04-24 15:27:44 +03:00
committed by GitHub
parent a8d80532ce
commit 3f4ce6326f
5 changed files with 308 additions and 268 deletions
@@ -18,7 +18,7 @@ use fatality::Nested;
use futures::channel::oneshot;
use polkadot_node_subsystem::{errors::ChainApiError, SubsystemError};
use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime};
use polkadot_node_subsystem_util::runtime;
use crate::{db, participation, LOG_TARGET};
use parity_scale_codec::Error as CodecError;
@@ -96,8 +96,8 @@ pub enum Error {
Codec(#[from] CodecError),
/// `RollingSessionWindow` was not able to retrieve `SessionInfo`s.
#[error("Sessions unavailable in `RollingSessionWindow`: {0}")]
RollingSessionWindow(#[from] SessionsUnavailable),
#[error("Session can't be fetched via `RuntimeInfo`")]
SessionInfo,
#[error(transparent)]
QueueError(#[from] participation::QueueError),
@@ -31,9 +31,10 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use polkadot_node_primitives::{
disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp,
};
use polkadot_node_subsystem_util::rolling_session_window::RollingSessionWindow;
use polkadot_node_subsystem::overseer;
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
use polkadot_primitives::{
CandidateReceipt, DisputeStatement, IndexedVec, SessionIndex, SessionInfo,
CandidateReceipt, DisputeStatement, Hash, IndexedVec, SessionIndex, SessionInfo,
ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
};
use sc_keystore::LocalKeystore;
@@ -50,18 +51,29 @@ pub struct CandidateEnvironment<'a> {
controlled_indices: HashSet<ValidatorIndex>,
}
#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
impl<'a> CandidateEnvironment<'a> {
/// Create `CandidateEnvironment`.
///
/// Return: `None` in case session is outside of session window.
pub fn new(
pub async fn new<Context>(
keystore: &LocalKeystore,
session_window: &'a RollingSessionWindow,
ctx: &mut Context,
runtime_info: &'a mut RuntimeInfo,
session_index: SessionIndex,
) -> Option<Self> {
let session = session_window.session_info(session_index)?;
let controlled_indices = find_controlled_validator_indices(keystore, &session.validators);
Some(Self { session_index, session, controlled_indices })
relay_parent: Hash,
) -> Option<CandidateEnvironment<'a>> {
let session_info = match runtime_info
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await
{
Ok(extended_session_info) => &extended_session_info.session_info,
Err(_) => return None,
};
let controlled_indices =
find_controlled_validator_indices(keystore, &session_info.validators);
Some(Self { session_index, session: session_info, controlled_indices })
}
/// Validators in the candidate's session.
@@ -26,7 +26,8 @@ use futures::{
use sc_keystore::LocalKeystore;
use polkadot_node_primitives::{
disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp,
disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement,
Timestamp, DISPUTE_WINDOW,
};
use polkadot_node_subsystem::{
messages::{
@@ -35,17 +36,16 @@ use polkadot_node_subsystem::{
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal,
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable,
};
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
use polkadot_primitives::{
BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement,
DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidDisputeStatementKind, ValidatorId, ValidatorIndex,
DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, ValidDisputeStatementKind,
ValidatorId, ValidatorIndex,
};
use crate::{
error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result},
db,
error::{log_error, FatalError, FatalResult, JfyiError, JfyiResult, Result},
import::{CandidateEnvironment, CandidateVoteState},
is_potential_spam,
metrics::Metrics,
@@ -55,7 +55,7 @@ use crate::{
use super::{
backend::Backend,
db, make_dispute_message,
make_dispute_message,
participation::{
self, Participation, ParticipationPriority, ParticipationRequest, ParticipationStatement,
WorkerMessageReceiver,
@@ -65,23 +65,31 @@ use super::{
OverlayedBackend,
};
// Initial data for `dispute-coordinator`. It is provided only at first start.
pub struct InitialData {
pub participations: Vec<(ParticipationPriority, ParticipationRequest)>,
pub votes: Vec<ScrapedOnChainVotes>,
pub leaf: ActivatedLeaf,
}
/// After the first active leaves update we transition to `Initialized` state.
///
/// Before the first active leaves update we can't really do much. We cannot check incoming
/// statements for validity, we cannot query orderings, we have no valid `RollingSessionWindow`,
/// statements for validity, we cannot query orderings, we have no valid `SessionInfo`,
/// ...
pub struct Initialized {
pub(crate) struct Initialized {
keystore: Arc<LocalKeystore>,
rolling_session_window: RollingSessionWindow,
highest_session: SessionIndex,
runtime_info: RuntimeInfo,
/// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doen't matter if it was
/// cached successfully or not. It is used to detect ancient disputes.
highest_session_seen: SessionIndex,
/// Will be set to `true` if an error occured during the last caching attempt
gaps_in_cache: bool,
spam_slots: SpamSlots,
participation: Participation,
scraper: ChainScraper,
participation_receiver: WorkerMessageReceiver,
metrics: Metrics,
// This tracks only rolling session window failures.
// It can be a `Vec` if the need to track more arises.
error: Option<SessionsUnavailable>,
}
#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
@@ -89,55 +97,46 @@ impl Initialized {
/// Make initialized subsystem, ready to `run`.
pub fn new(
subsystem: DisputeCoordinatorSubsystem,
rolling_session_window: RollingSessionWindow,
runtime_info: RuntimeInfo,
spam_slots: SpamSlots,
scraper: ChainScraper,
highest_session_seen: SessionIndex,
gaps_in_cache: bool,
) -> Self {
let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;
let (participation_sender, participation_receiver) = mpsc::channel(1);
let participation = Participation::new(participation_sender, metrics.clone());
let highest_session = rolling_session_window.latest_session();
Self {
keystore,
rolling_session_window,
highest_session,
runtime_info,
highest_session_seen,
gaps_in_cache,
spam_slots,
scraper,
participation,
participation_receiver,
metrics,
error: None,
}
}
/// Run the initialized subsystem.
///
/// Optionally supply initial participations and a first leaf to process.
/// `initial_data` is optional. It is passed on first start and is `None` on subsystem restarts.
pub async fn run<B, Context>(
mut self,
mut ctx: Context,
mut backend: B,
mut participations: Vec<(ParticipationPriority, ParticipationRequest)>,
mut votes: Vec<ScrapedOnChainVotes>,
mut first_leaf: Option<ActivatedLeaf>,
mut initial_data: Option<InitialData>,
clock: Box<dyn Clock>,
) -> FatalResult<()>
where
B: Backend,
{
loop {
let res = self
.run_until_error(
&mut ctx,
&mut backend,
&mut participations,
&mut votes,
&mut first_leaf,
&*clock,
)
.await;
let res =
self.run_until_error(&mut ctx, &mut backend, &mut initial_data, &*clock).await;
if let Ok(()) = res {
gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
return Ok(())
@@ -155,23 +154,29 @@ impl Initialized {
&mut self,
ctx: &mut Context,
backend: &mut B,
participations: &mut Vec<(ParticipationPriority, ParticipationRequest)>,
on_chain_votes: &mut Vec<ScrapedOnChainVotes>,
first_leaf: &mut Option<ActivatedLeaf>,
initial_data: &mut Option<InitialData>,
clock: &dyn Clock,
) -> Result<()>
where
B: Backend,
{
for (priority, request) in participations.drain(..) {
self.participation.queue_participation(ctx, priority, request).await?;
}
if let Some(InitialData { participations, votes: on_chain_votes, leaf: first_leaf }) =
initial_data.take()
{
for (priority, request) in participations {
self.participation.queue_participation(ctx, priority, request).await?;
}
let mut overlay_db = OverlayedBackend::new(backend);
for votes in on_chain_votes.drain(..) {
for votes in on_chain_votes {
let _ = self
.process_on_chain_votes(ctx, &mut overlay_db, votes, clock.now())
.process_on_chain_votes(
ctx,
&mut overlay_db,
votes,
clock.now(),
first_leaf.hash,
)
.await
.map_err(|error| {
gum::warn!(
@@ -185,9 +190,7 @@ impl Initialized {
let ops = overlay_db.into_write_ops();
backend.write(ops)?;
}
}
if let Some(first_leaf) = first_leaf.take() {
// Also provide first leaf to participation for good measure.
self.participation
.process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf))
@@ -283,37 +286,56 @@ impl Initialized {
self.participation.process_active_leaves_update(ctx, &update).await?;
if let Some(new_leaf) = update.activated {
match self
.rolling_session_window
.cache_session_info_for_head(ctx.sender(), new_leaf.hash)
.await
{
Err(e) => {
gum::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
);
self.error = Some(e);
},
Ok(SessionWindowUpdate::Advanced {
new_window_end: window_end,
new_window_start,
..
}) => {
self.error = None;
let session = window_end;
if self.highest_session < session {
gum::trace!(target: LOG_TARGET, session, "Observed new session. Pruning");
let session_idx =
self.runtime_info.get_session_index_for_child(ctx.sender(), new_leaf.hash).await;
self.highest_session = session;
match session_idx {
Ok(session_idx)
if self.gaps_in_cache || session_idx > self.highest_session_seen =>
{
// If error has occurred during last session caching - fetch the whole window
// Otherwise - cache only the new sessions
let lower_bound = if self.gaps_in_cache {
session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)
} else {
self.highest_session_seen + 1
};
db::v1::note_earliest_session(overlay_db, new_window_start)?;
self.spam_slots.prune_old(new_window_start);
// There is a new session. Perform a dummy fetch to cache it.
for idx in lower_bound..=session_idx {
if let Err(err) = self
.runtime_info
.get_session_info_by_index(ctx.sender(), new_leaf.hash, idx)
.await
{
gum::debug!(
target: LOG_TARGET,
session_idx,
leaf_hash = ?new_leaf.hash,
?err,
"Error caching SessionInfo on ActiveLeaves update"
);
self.gaps_in_cache = true;
}
}
self.highest_session_seen = session_idx;
db::v1::note_earliest_session(
overlay_db,
session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1),
)?;
self.spam_slots.prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1));
},
Ok(SessionWindowUpdate::Unchanged) => {},
};
Ok(_) => { /* no new session => nothing to cache */ },
Err(err) => {
gum::debug!(
target: LOG_TARGET,
?err,
"Failed to update session cache for disputes - can't fetch session index",
);
},
}
gum::trace!(
target: LOG_TARGET,
@@ -325,15 +347,16 @@ impl Initialized {
// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel
for votes in scraped_updates.on_chain_votes {
let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err(
|error| {
let _ = self
.process_on_chain_votes(ctx, overlay_db, votes, now, new_leaf.hash)
.await
.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
?error,
"Skipping scraping block due to error",
);
},
);
});
}
}
@@ -349,6 +372,7 @@ impl Initialized {
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
votes: ScrapedOnChainVotes,
now: u64,
block_hash: Hash,
) -> Result<()> {
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes;
@@ -360,23 +384,24 @@ impl Initialized {
// the new active leaf as if we received them via gossip.
for (candidate_receipt, backers) in backing_validators_per_candidate {
// Obtain the session info, for sake of `ValidatorId`s
// either from the rolling session window.
// Must be called _after_ `fn cache_session_info_for_head`
// which guarantees that the session info is available
// for the current session.
let session_info: &SessionInfo =
if let Some(session_info) = self.rolling_session_window.session_info(session) {
session_info
} else {
let relay_parent = candidate_receipt.descriptor.relay_parent;
let session_info = match self
.runtime_info
.get_session_info_by_index(ctx.sender(), relay_parent, session)
.await
{
Ok(extended_session_info) => &extended_session_info.session_info,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?session,
"Could not retrieve session info from rolling session window",
?err,
"Could not retrieve session info from RuntimeInfo",
);
return Ok(())
};
},
};
let relay_parent = candidate_receipt.descriptor.relay_parent;
let candidate_hash = candidate_receipt.hash();
gum::trace!(
target: LOG_TARGET,
@@ -470,18 +495,23 @@ impl Initialized {
?session,
"Importing dispute votes from chain for candidate"
);
let session_info =
if let Some(session_info) = self.rolling_session_window.session_info(session) {
session_info
} else {
let session_info = match self
.runtime_info
.get_session_info_by_index(ctx.sender(), block_hash, session)
.await
{
Ok(extended_session_info) => &extended_session_info.session_info,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?candidate_hash,
?session,
"Could not retrieve session info from rolling session window for recently concluded dispute"
?err,
"Could not retrieve session info for recently concluded dispute"
);
continue
};
},
};
let statements = statements
.into_iter()
@@ -593,9 +623,6 @@ impl Initialized {
}
},
DisputeCoordinatorMessage::RecentDisputes(tx) => {
// Return error if session information is missing.
self.ensure_available_session_info()?;
gum::trace!(target: LOG_TARGET, "Loading recent disputes from db");
let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
@@ -609,11 +636,7 @@ impl Initialized {
);
},
DisputeCoordinatorMessage::ActiveDisputes(tx) => {
// Return error if session information is missing.
self.ensure_available_session_info()?;
gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes");
let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
@@ -629,11 +652,7 @@ impl Initialized {
);
},
DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
// Return error if session information is missing.
self.ensure_available_session_info()?;
gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::QueryCandidateVotes");
let mut query_output = Vec::new();
for (session_index, candidate_hash) in query {
if let Some(v) =
@@ -673,8 +692,6 @@ impl Initialized {
block_descriptions,
tx,
} => {
// Return error if session information is missing.
self.ensure_available_session_info()?;
gum::trace!(
target: LOG_TARGET,
"DisputeCoordinatorMessage::DetermineUndisputedChain"
@@ -694,15 +711,6 @@ impl Initialized {
Ok(Box::new(|| Ok(())))
}
// Helper function for checking subsystem errors in message processing.
fn ensure_available_session_info(&self) -> Result<()> {
if let Some(subsystem_error) = self.error.clone() {
return Err(Error::RollingSessionWindow(subsystem_error))
}
Ok(())
}
// We use fatal result rather than result here. Reason being, We for example increase
// spam slots in this function. If then the import fails for some non fatal and
// unrelated reason, we should likely actually decrement previously incremented spam
@@ -717,16 +725,39 @@ impl Initialized {
now: Timestamp,
) -> FatalResult<ImportStatementsResult> {
gum::trace!(target: LOG_TARGET, ?statements, "In handle import statements");
if !self.rolling_session_window.contains(session) {
if self.session_is_ancient(session) {
// It is not valid to participate in an ancient dispute (spam?) or too new.
return Ok(ImportStatementsResult::InvalidImport)
}
let candidate_hash = candidate_receipt.hash();
let votes_in_db = overlay_db.load_candidate_votes(session, &candidate_hash)?;
let relay_parent = match &candidate_receipt {
MaybeCandidateReceipt::Provides(candidate_receipt) =>
candidate_receipt.descriptor().relay_parent,
MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash) => match &votes_in_db {
Some(votes) => votes.candidate_receipt.descriptor().relay_parent,
None => {
gum::warn!(
target: LOG_TARGET,
session,
?candidate_hash,
"Cannot obtain relay parent without `CandidateReceipt` available!"
);
return Ok(ImportStatementsResult::InvalidImport)
},
},
};
let env = match CandidateEnvironment::new(
&self.keystore,
&self.rolling_session_window,
ctx,
&mut self.runtime_info,
session,
) {
relay_parent,
)
.await
{
None => {
gum::warn!(
target: LOG_TARGET,
@@ -739,8 +770,6 @@ impl Initialized {
Some(env) => env,
};
let candidate_hash = candidate_receipt.hash();
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
@@ -757,10 +786,7 @@ impl Initialized {
// There is one exception: A sufficiently sophisticated attacker could prevent
// us from seeing the backing votes by withholding arbitrary blocks, and hence we do
// not have a `CandidateReceipt` available.
let old_state = match overlay_db
.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
{
let old_state = match votes_in_db.map(CandidateVotes::from) {
Some(votes) => CandidateVoteState::new(votes, &env, now),
None =>
if let MaybeCandidateReceipt::Provides(candidate_receipt) = candidate_receipt {
@@ -1120,12 +1146,17 @@ impl Initialized {
?now,
"Issuing local statement for candidate!"
);
// Load environment:
let env = match CandidateEnvironment::new(
&self.keystore,
&self.rolling_session_window,
ctx,
&mut self.runtime_info,
session,
) {
candidate_receipt.descriptor.relay_parent,
)
.await
{
None => {
gum::warn!(
target: LOG_TARGET,
@@ -1175,10 +1206,10 @@ impl Initialized {
statements.push((signed_dispute_statement, *index));
},
Ok(None) => {},
Err(e) => {
Err(err) => {
gum::error!(
target: LOG_TARGET,
err = ?e,
?err,
"Encountered keystore error while signing dispute statement",
);
},
@@ -1233,6 +1264,10 @@ impl Initialized {
Ok(())
}
fn session_is_ancient(&self, session_idx: SessionIndex) -> bool {
return session_idx < self.highest_session_seen.saturating_sub(DISPUTE_WINDOW.get() - 1)
}
}
/// Messages to be handled in this subsystem.
+104 -66
View File
@@ -24,7 +24,7 @@
//! validation results as well as a sink for votes received by other subsystems. When importing a dispute vote from
//! another node, this will trigger dispute participation to recover and validate the block.
use std::sync::Arc;
use std::{num::NonZeroUsize, sync::Arc};
use futures::FutureExt;
@@ -33,6 +33,7 @@ use sc_keystore::LocalKeystore;
use polkadot_node_primitives::{
CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement,
DISPUTE_WINDOW,
};
use polkadot_node_subsystem::{
messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal,
@@ -40,12 +41,14 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_util::{
database::Database,
rolling_session_window::{DatabaseParams, RollingSessionWindow},
runtime::{Config as RuntimeInfoConfig, RuntimeInfo},
};
use polkadot_primitives::{
DisputeStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidatorIndex,
};
use polkadot_primitives::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex};
use crate::{
error::{FatalResult, JfyiError, Result},
error::{FatalResult, Result},
metrics::Metrics,
status::{get_active_with_status, SystemClock},
};
@@ -65,7 +68,7 @@ pub(crate) mod error;
/// Subsystem after receiving the first active leaf.
mod initialized;
use initialized::Initialized;
use initialized::{InitialData, Initialized};
/// Provider of data scraped from chain.
///
@@ -187,7 +190,7 @@ impl DisputeCoordinatorSubsystem {
};
initialized
.run(ctx, backend, participations, votes, Some(first_leaf), clock)
.run(ctx, backend, Some(InitialData { participations, votes, leaf: first_leaf }), clock)
.await
}
@@ -210,31 +213,32 @@ impl DisputeCoordinatorSubsystem {
B: Backend + 'static,
{
loop {
let db_params =
DatabaseParams { db: self.store.clone(), db_column: self.config.col_session_data };
let (first_leaf, rolling_session_window) =
match get_rolling_session_window(ctx, db_params).await {
Ok(Some(update)) => update,
Ok(None) => {
gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
return Ok(None)
},
Err(e) => {
e.split()?.log();
continue
},
};
let first_leaf = match wait_for_first_leaf(ctx).await {
Ok(Some(activated_leaf)) => activated_leaf,
Ok(None) => continue,
Err(e) => {
e.split()?.log();
continue
},
};
// `RuntimeInfo` cache should match `DISPUTE_WINDOW` so that we can
// keep all sessions for a dispute window
let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig {
keystore: None,
session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize)
.expect("DISPUTE_WINDOW can't be 0; qed."),
});
let mut overlay_db = OverlayedBackend::new(&mut backend);
let (participations, votes, spam_slots, ordering_provider) = match self
.handle_startup(
ctx,
first_leaf.clone(),
&rolling_session_window,
&mut overlay_db,
clock,
)
let (
participations,
votes,
spam_slots,
ordering_provider,
highest_session_seen,
gaps_in_cache,
) = match self
.handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock)
.await
{
Ok(v) => v,
@@ -252,7 +256,14 @@ impl DisputeCoordinatorSubsystem {
participations,
votes,
first_leaf,
Initialized::new(self, rolling_session_window, spam_slots, ordering_provider),
Initialized::new(
self,
runtime_info,
spam_slots,
ordering_provider,
highest_session_seen,
gaps_in_cache,
),
backend,
)))
}
@@ -267,7 +278,7 @@ impl DisputeCoordinatorSubsystem {
&self,
ctx: &mut Context,
initial_head: ActivatedLeaf,
rolling_session_window: &RollingSessionWindow,
runtime_info: &mut RuntimeInfo,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
clock: &dyn Clock,
) -> Result<(
@@ -275,10 +286,9 @@ impl DisputeCoordinatorSubsystem {
Vec<ScrapedOnChainVotes>,
SpamSlots,
ChainScraper,
SessionIndex,
bool,
)> {
// Prune obsolete disputes:
db::v1::note_earliest_session(overlay_db, rolling_session_window.earliest_session())?;
let now = clock.now();
let active_disputes = match overlay_db.load_recent_disputes() {
@@ -292,23 +302,63 @@ impl DisputeCoordinatorSubsystem {
},
};
// We assume the highest session is the passed leaf. If we can't get the session index
// we can't initialize the subsystem so we'll wait for a new leaf
let highest_session = runtime_info
.get_session_index_for_child(ctx.sender(), initial_head.hash)
.await?;
let mut gap_in_cache = false;
// Cache the sessions. A failure to fetch a session here is not that critical so we
// won't abort the initialization
for idx in highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1)..=highest_session {
if let Err(e) = runtime_info
.get_session_info_by_index(ctx.sender(), initial_head.hash, idx)
.await
{
gum::debug!(
target: LOG_TARGET,
leaf_hash = ?initial_head.hash,
session_idx = idx,
err = ?e,
"Can't cache SessionInfo during subsystem initialization. Skipping session."
);
gap_in_cache = true;
continue
};
}
// Prune obsolete disputes:
db::v1::note_earliest_session(
overlay_db,
highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1),
)?;
let mut participation_requests = Vec::new();
let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new();
let leaf_hash = initial_head.hash;
let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?;
for ((session, ref candidate_hash), _) in active_disputes {
let env =
match CandidateEnvironment::new(&self.keystore, &rolling_session_window, session) {
None => {
gum::warn!(
target: LOG_TARGET,
session,
"We are lacking a `SessionInfo` for handling db votes on startup."
);
let env = match CandidateEnvironment::new(
&self.keystore,
ctx,
runtime_info,
highest_session,
leaf_hash,
)
.await
{
None => {
gum::warn!(
target: LOG_TARGET,
session,
"We are lacking a `SessionInfo` for handling db votes on startup."
);
continue
},
Some(env) => env,
};
continue
},
Some(env) => env,
};
let votes: CandidateVotes =
match overlay_db.load_candidate_votes(session, candidate_hash) {
@@ -370,26 +420,14 @@ impl DisputeCoordinatorSubsystem {
}
}
Ok((participation_requests, votes, SpamSlots::recover_from_state(spam_disputes), scraper))
}
}
/// Wait for `ActiveLeavesUpdate` on startup, returns `None` if `Conclude` signal came first.
#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
async fn get_rolling_session_window<Context>(
ctx: &mut Context,
db_params: DatabaseParams,
) -> Result<Option<(ActivatedLeaf, RollingSessionWindow)>> {
if let Some(leaf) = { wait_for_first_leaf(ctx) }.await? {
let sender = ctx.sender().clone();
Ok(Some((
leaf.clone(),
RollingSessionWindow::new(sender, leaf.hash, db_params)
.await
.map_err(JfyiError::RollingSessionWindow)?,
)))
} else {
Ok(None)
Ok((
participation_requests,
votes,
SpamSlots::recover_from_state(spam_disputes),
scraper,
highest_session,
gap_in_cache,
))
}
}
@@ -281,14 +281,8 @@ impl TestState {
)))
.await;
self.handle_sync_queries(
virtual_overseer,
block_hash,
block_number,
session,
candidate_events,
)
.await;
self.handle_sync_queries(virtual_overseer, block_hash, session, candidate_events)
.await;
}
/// Returns any sent `DisputeMessage`s.
@@ -296,7 +290,6 @@ impl TestState {
&mut self,
virtual_overseer: &mut VirtualOverseer,
block_hash: Hash,
block_number: BlockNumber,
session: SessionIndex,
candidate_events: Vec<CandidateEvent>,
) -> Vec<DisputeMessage> {
@@ -334,61 +327,24 @@ impl TestState {
assert_eq!(h, block_hash);
let _ = tx.send(Ok(session));
// Queries for fetching earliest unfinalized block session. See `RollingSessionWindow`.
// Queries for session caching - see `handle_startup`
if self.known_session.is_none() {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(block_number));
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
number,
s_tx,
)) => {
assert_eq!(block_number, number);
let _ = s_tx.send(Ok(Some(block_hash)));
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, block_hash);
let _ = s_tx.send(Ok(session));
}
);
}
// No queries, if subsystem knows about this session already.
if self.known_session == Some(session) {
continue
}
self.known_session = Some(session);
loop {
// answer session info queries until the current session is reached.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(session_index, tx),
for i in 0..=session {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(session_index, tx),
)) => {
assert_eq!(h, block_hash);
let _ = tx.send(Ok(Some(self.session_info())));
if session_index == session { break }
assert_eq!(h, block_hash);
assert_eq!(session_index, i);
let _ = tx.send(Ok(Some(self.session_info())));
}
);
}
);
}
self.known_session = Some(session);
},
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(tx)) => {
assert!(
@@ -481,9 +437,8 @@ impl TestState {
let events = if n == 1 { std::mem::take(&mut initial_events) } else { Vec::new() };
let mut new_messages = self
.handle_sync_queries(virtual_overseer, *leaf, n as BlockNumber, session, events)
.await;
let mut new_messages =
self.handle_sync_queries(virtual_overseer, *leaf, session, events).await;
messages.append(&mut new_messages);
}
messages