diff --git a/polkadot/node/core/dispute-coordinator/src/real/backend.rs b/polkadot/node/core/dispute-coordinator/src/backend.rs similarity index 100% rename from polkadot/node/core/dispute-coordinator/src/real/backend.rs rename to polkadot/node/core/dispute-coordinator/src/backend.rs diff --git a/polkadot/node/core/dispute-coordinator/src/real/db/mod.rs b/polkadot/node/core/dispute-coordinator/src/db/mod.rs similarity index 100% rename from polkadot/node/core/dispute-coordinator/src/real/db/mod.rs rename to polkadot/node/core/dispute-coordinator/src/db/mod.rs diff --git a/polkadot/node/core/dispute-coordinator/src/real/db/v1.rs b/polkadot/node/core/dispute-coordinator/src/db/v1.rs similarity index 99% rename from polkadot/node/core/dispute-coordinator/src/real/db/v1.rs rename to polkadot/node/core/dispute-coordinator/src/db/v1.rs index a9b98cb7b6..9c7f4d9ac8 100644 --- a/polkadot/node/core/dispute-coordinator/src/real/db/v1.rs +++ b/polkadot/node/core/dispute-coordinator/src/db/v1.rs @@ -28,12 +28,10 @@ use std::sync::Arc; use parity_scale_codec::{Decode, Encode}; use crate::{ + backend::{Backend, BackendWriteOp, OverlayedBackend}, error::{FatalError, FatalResult}, - real::{ - backend::{Backend, BackendWriteOp, OverlayedBackend}, - DISPUTE_WINDOW, - }, status::DisputeStatus, + DISPUTE_WINDOW, }; const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes"; diff --git a/polkadot/node/core/dispute-coordinator/src/error.rs b/polkadot/node/core/dispute-coordinator/src/error.rs index e50a550bc6..dbef9c2916 100644 --- a/polkadot/node/core/dispute-coordinator/src/error.rs +++ b/polkadot/node/core/dispute-coordinator/src/error.rs @@ -20,7 +20,7 @@ use futures::channel::oneshot; use polkadot_node_subsystem::{errors::ChainApiError, SubsystemError}; use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime}; -use crate::{real::participation, LOG_TARGET}; +use crate::{participation, LOG_TARGET}; use parity_scale_codec::Error as CodecError; pub type Result = std::result::Result; diff --git a/polkadot/node/core/dispute-coordinator/src/real/initialized.rs b/polkadot/node/core/dispute-coordinator/src/initialized.rs similarity index 99% rename from polkadot/node/core/dispute-coordinator/src/real/initialized.rs rename to polkadot/node/core/dispute-coordinator/src/initialized.rs index b1b4e1c484..40f9ed8211 100644 --- a/polkadot/node/core/dispute-coordinator/src/real/initialized.rs +++ b/polkadot/node/core/dispute-coordinator/src/initialized.rs @@ -48,9 +48,8 @@ use polkadot_primitives::v2::{ use crate::{ error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, metrics::Metrics, - real::DisputeCoordinatorSubsystem, status::{get_active_with_status, Clock, DisputeStatus, Timestamp}, - LOG_TARGET, + DisputeCoordinatorSubsystem, LOG_TARGET, }; use super::{ diff --git a/polkadot/node/core/dispute-coordinator/src/lib.rs b/polkadot/node/core/dispute-coordinator/src/lib.rs index 2cf0d9dcd8..16b89c06ae 100644 --- a/polkadot/node/core/dispute-coordinator/src/lib.rs +++ b/polkadot/node/core/dispute-coordinator/src/lib.rs @@ -22,21 +22,391 @@ //! //! This subsystem will be the point which produce dispute votes, either positive or negative, based on locally-observed //! validation results as well as a sink for votes received by other subsystems. When importing a dispute vote from -//! another node, this will trigger the dispute participation subsystem to recover and validate the block and call -//! back to this subsystem. +//! another node, this will trigger dispute participation to recover and validate the block. + +use std::{collections::HashSet, sync::Arc}; + +use futures::FutureExt; + +use sc_keystore::LocalKeystore; + +use polkadot_node_primitives::{CandidateVotes, DISPUTE_WINDOW}; +use polkadot_node_subsystem::{ + messages::DisputeCoordinatorMessage, overseer, ActivatedLeaf, FromOverseer, OverseerSignal, + SpawnedSubsystem, SubsystemContext, SubsystemError, +}; +use polkadot_node_subsystem_util::{ + database::Database, rolling_session_window::RollingSessionWindow, +}; +use polkadot_primitives::v2::{ScrapedOnChainVotes, ValidatorIndex, ValidatorPair}; + +use crate::{ + error::{FatalResult, JfyiError, Result}, + metrics::Metrics, + status::{get_active_with_status, SystemClock}, +}; +use backend::{Backend, OverlayedBackend}; +use db::v1::DbBackend; +use fatality::Split; + +use self::{ + participation::{ParticipationPriority, ParticipationRequest}, + spam_slots::{SpamSlots, UnconfirmedDisputes}, +}; + +pub(crate) mod backend; +pub(crate) mod db; +pub(crate) mod error; + +/// Subsystem after receiving the first active leaf. +mod initialized; +use initialized::Initialized; + +/// Provider of data scraped from chain. +/// +/// If we have seen a candidate included somewhere, we should treat it as priority and will be able +/// to provide an ordering for participation. Thus a dispute for a candidate where we can get some +/// ordering is high-priority (we know it is a valid dispute) and those can be ordered by +/// `participation` based on `relay_parent` block number and other metrics, so each validator will +/// participate in disputes in a similar order, which ensures we will be resolving disputes, even +/// under heavy load. +mod scraping; +use scraping::ChainScraper; + +/// When importing votes we will check via the `ordering` module, whether or not we know of the +/// candidate to be included somewhere. If not, the votes might be spam, in this case we want to +/// limit the amount of locally imported votes, to prevent DoS attacks/resource exhaustion. The +/// `spam_slots` module helps keeping track of unconfirmed disputes per validators, if a spam slot +/// gets full, we will drop any further potential spam votes from that validator and report back +/// that the import failed. Which will lead to any honest validator to retry, thus the spam slots +/// can be relatively small, as a drop is not fatal. +mod spam_slots; + +/// Handling of participation requests via `Participation`. +/// +/// `Participation` provides an API (`Participation::queue_participation`) for queuing of dispute participations and will process those +/// participation requests, such that most important/urgent disputes will be resolved and processed +/// first and more importantly it will order requests in a way so disputes will get resolved, even +/// if there are lots of them. +pub(crate) mod participation; /// Metrics types. mod metrics; -/// Common error types for this subsystem. -mod error; - /// Status tracking of disputes (`DisputeStatus`). mod status; -/// The real implementation. -mod real; +use crate::status::Clock; + +#[cfg(test)] +mod tests; pub(crate) const LOG_TARGET: &str = "parachain::dispute-coordinator"; -pub use self::real::{Config, DisputeCoordinatorSubsystem}; +/// An implementation of the dispute coordinator subsystem. +pub struct DisputeCoordinatorSubsystem { + config: Config, + store: Arc, + keystore: Arc, + metrics: Metrics, +} + +/// Configuration for the dispute coordinator subsystem. +#[derive(Debug, Clone, Copy)] +pub struct Config { + /// The data column in the store to use for dispute data. + pub col_data: u32, +} + +impl Config { + fn column_config(&self) -> db::v1::ColumnConfiguration { + db::v1::ColumnConfiguration { col_data: self.col_data } + } +} + +impl overseer::Subsystem for DisputeCoordinatorSubsystem +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = async { + let backend = DbBackend::new(self.store.clone(), self.config.column_config()); + self.run(ctx, backend, Box::new(SystemClock)) + .await + .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) + } + .boxed(); + + SpawnedSubsystem { name: "dispute-coordinator-subsystem", future } + } +} + +impl DisputeCoordinatorSubsystem { + /// Create a new instance of the subsystem. + pub fn new( + store: Arc, + config: Config, + keystore: Arc, + metrics: Metrics, + ) -> Self { + Self { store, config, keystore, metrics } + } + + /// Initialize and afterwards run `Initialized::run`. + async fn run( + self, + mut ctx: Context, + backend: B, + clock: Box, + ) -> FatalResult<()> + where + Context: overseer::SubsystemContext, + Context: SubsystemContext, + B: Backend + 'static, + { + let res = self.initialize(&mut ctx, backend, &*clock).await?; + + let (participations, votes, first_leaf, initialized, backend) = match res { + // Concluded: + None => return Ok(()), + Some(r) => r, + }; + + initialized + .run(ctx, backend, participations, votes, Some(first_leaf), clock) + .await + } + + /// Make sure to recover participations properly on startup. + async fn initialize( + self, + ctx: &mut Context, + mut backend: B, + clock: &(dyn Clock), + ) -> FatalResult< + Option<( + Vec<(ParticipationPriority, ParticipationRequest)>, + Vec, + ActivatedLeaf, + Initialized, + B, + )>, + > + where + Context: overseer::SubsystemContext, + Context: SubsystemContext, + B: Backend + 'static, + { + loop { + let (first_leaf, rolling_session_window) = match get_rolling_session_window(ctx).await { + Ok(Some(update)) => update, + Ok(None) => { + gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); + return Ok(None) + }, + Err(e) => { + e.split()?.log(); + continue + }, + }; + + let mut overlay_db = OverlayedBackend::new(&mut backend); + let (participations, votes, spam_slots, ordering_provider) = match self + .handle_startup( + ctx, + first_leaf.clone(), + &rolling_session_window, + &mut overlay_db, + clock, + ) + .await + { + Ok(v) => v, + Err(e) => { + e.split()?.log(); + continue + }, + }; + if !overlay_db.is_empty() { + let ops = overlay_db.into_write_ops(); + backend.write(ops)?; + } + + return Ok(Some(( + participations, + votes, + first_leaf, + Initialized::new(self, rolling_session_window, spam_slots, ordering_provider), + backend, + ))) + } + } + + // Restores the subsystem's state before proceeding with the main event loop. + // + // - Prune any old disputes. + // - Find disputes we need to participate in. + // - Initialize spam slots & OrderingProvider. + async fn handle_startup( + &self, + ctx: &mut Context, + initial_head: ActivatedLeaf, + rolling_session_window: &RollingSessionWindow, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, + clock: &dyn Clock, + ) -> Result<( + Vec<(ParticipationPriority, ParticipationRequest)>, + Vec, + SpamSlots, + ChainScraper, + )> + where + Context: overseer::SubsystemContext, + Context: SubsystemContext, + { + // Prune obsolete disputes: + db::v1::note_current_session(overlay_db, rolling_session_window.latest_session())?; + + let active_disputes = match overlay_db.load_recent_disputes() { + Ok(Some(disputes)) => + get_active_with_status(disputes.into_iter(), clock.now()).collect(), + Ok(None) => Vec::new(), + Err(e) => { + gum::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e); + return Err(e.into()) + }, + }; + + let mut participation_requests = Vec::new(); + let mut unconfirmed_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new(); + let (mut scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?; + for ((session, ref candidate_hash), status) in active_disputes { + let votes: CandidateVotes = + match overlay_db.load_candidate_votes(session, candidate_hash) { + Ok(Some(votes)) => votes.into(), + Ok(None) => continue, + Err(e) => { + gum::error!( + target: LOG_TARGET, + "Failed initial load of candidate votes: {:?}", + e + ); + continue + }, + }; + + let validators = match rolling_session_window.session_info(session) { + None => { + gum::warn!( + target: LOG_TARGET, + session, + "Missing info for session which has an active dispute", + ); + continue + }, + Some(info) => info.validators.clone(), + }; + + let n_validators = validators.len(); + let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect(); + + // Determine if there are any missing local statements for this dispute. Validators are + // filtered if: + // 1) their statement already exists, or + // 2) the validator key is not in the local keystore (i.e. the validator is remote). + // The remaining set only contains local validators that are also missing statements. + let missing_local_statement = validators + .iter() + .enumerate() + .map(|(index, validator)| (ValidatorIndex(index as _), validator)) + .any(|(index, validator)| { + !voted_indices.contains(&index) && + self.keystore + .key_pair::(validator) + .ok() + .map_or(false, |v| v.is_some()) + }); + + let is_included = scraper.is_candidate_included(&votes.candidate_receipt.hash()); + + if !status.is_confirmed_concluded() && !is_included { + unconfirmed_disputes.insert((session, *candidate_hash), voted_indices); + } + + // Participate for all non-concluded disputes which do not have a + // recorded local statement. + if missing_local_statement { + participation_requests.push(( + ParticipationPriority::with_priority_if(is_included), + ParticipationRequest::new( + votes.candidate_receipt.clone(), + session, + n_validators, + ), + )); + } + } + + Ok(( + participation_requests, + votes, + SpamSlots::recover_from_state(unconfirmed_disputes), + scraper, + )) + } +} + +/// Wait for `ActiveLeavesUpdate` on startup, returns `None` if `Conclude` signal came first. +async fn get_rolling_session_window( + ctx: &mut Context, +) -> Result> +where + Context: overseer::SubsystemContext, + Context: SubsystemContext, +{ + if let Some(leaf) = wait_for_first_leaf(ctx).await? { + Ok(Some(( + leaf.clone(), + RollingSessionWindow::new(ctx, DISPUTE_WINDOW, leaf.hash) + .await + .map_err(JfyiError::RollingSessionWindow)?, + ))) + } else { + Ok(None) + } +} + +/// Wait for `ActiveLeavesUpdate`, returns `None` if `Conclude` signal came first. +async fn wait_for_first_leaf(ctx: &mut Context) -> Result> +where + Context: overseer::SubsystemContext, + Context: SubsystemContext, +{ + loop { + match ctx.recv().await? { + FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(None), + FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { + if let Some(activated) = update.activated { + return Ok(Some(activated)) + } + }, + FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {}, + FromOverseer::Communication { msg } => + // NOTE: We could technically actually handle a couple of message types, even if + // not initialized (e.g. all requests that only query the database). The problem + // is, we would deliver potentially outdated information, especially in the event + // of bugs where initialization fails for a while (e.g. `SessionInfo`s are not + // available). So instead of telling subsystems, everything is fine, because of an + // hour old database state, we should rather cancel contained oneshots and delay + // finality until we are fully functional. + { + gum::warn!( + target: LOG_TARGET, + ?msg, + "Received msg before first active leaves update. This is not expected - message will be dropped." + ) + }, + } + } +} diff --git a/polkadot/node/core/dispute-coordinator/src/real/participation/mod.rs b/polkadot/node/core/dispute-coordinator/src/participation/mod.rs similarity index 99% rename from polkadot/node/core/dispute-coordinator/src/real/participation/mod.rs rename to polkadot/node/core/dispute-coordinator/src/participation/mod.rs index 3ed91836b8..6c47f41cc6 100644 --- a/polkadot/node/core/dispute-coordinator/src/real/participation/mod.rs +++ b/polkadot/node/core/dispute-coordinator/src/participation/mod.rs @@ -33,7 +33,7 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash; use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; -use crate::real::LOG_TARGET; +use crate::LOG_TARGET; use crate::error::{FatalError, FatalResult, Result}; diff --git a/polkadot/node/core/dispute-coordinator/src/real/participation/queues/mod.rs b/polkadot/node/core/dispute-coordinator/src/participation/queues/mod.rs similarity index 100% rename from polkadot/node/core/dispute-coordinator/src/real/participation/queues/mod.rs rename to polkadot/node/core/dispute-coordinator/src/participation/queues/mod.rs diff --git a/polkadot/node/core/dispute-coordinator/src/real/participation/queues/tests.rs b/polkadot/node/core/dispute-coordinator/src/participation/queues/tests.rs similarity index 100% rename from polkadot/node/core/dispute-coordinator/src/real/participation/queues/tests.rs rename to polkadot/node/core/dispute-coordinator/src/participation/queues/tests.rs diff --git a/polkadot/node/core/dispute-coordinator/src/real/participation/tests.rs b/polkadot/node/core/dispute-coordinator/src/participation/tests.rs similarity index 100% rename from polkadot/node/core/dispute-coordinator/src/real/participation/tests.rs rename to polkadot/node/core/dispute-coordinator/src/participation/tests.rs diff --git a/polkadot/node/core/dispute-coordinator/src/real/mod.rs b/polkadot/node/core/dispute-coordinator/src/real/mod.rs deleted file mode 100644 index 139760b641..0000000000 --- a/polkadot/node/core/dispute-coordinator/src/real/mod.rs +++ /dev/null @@ -1,405 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Implements the dispute coordinator subsystem. -//! -//! This is the central subsystem of the node-side components which participate in disputes. -//! This subsystem wraps a database which tracks all statements observed by all validators over some window of sessions. -//! Votes older than this session window are pruned. -//! -//! This subsystem will be the point which produce dispute votes, either positive or negative, based on locally-observed -//! validation results as well as a sink for votes received by other subsystems. When importing a dispute vote from -//! another node, this will trigger dispute participation to recover and validate the block. - -use std::{collections::HashSet, sync::Arc}; - -use futures::FutureExt; - -use sc_keystore::LocalKeystore; - -use polkadot_node_primitives::{CandidateVotes, DISPUTE_WINDOW}; -use polkadot_node_subsystem::{ - messages::DisputeCoordinatorMessage, overseer, ActivatedLeaf, FromOverseer, OverseerSignal, - SpawnedSubsystem, SubsystemContext, SubsystemError, -}; -use polkadot_node_subsystem_util::{ - database::Database, rolling_session_window::RollingSessionWindow, -}; -use polkadot_primitives::v2::{ScrapedOnChainVotes, ValidatorIndex, ValidatorPair}; - -use crate::{ - error::{FatalResult, JfyiError, Result}, - metrics::Metrics, - status::{get_active_with_status, SystemClock}, -}; -use backend::{Backend, OverlayedBackend}; -use db::v1::DbBackend; -use fatality::Split; - -use self::{ - participation::{ParticipationPriority, ParticipationRequest}, - spam_slots::{SpamSlots, UnconfirmedDisputes}, -}; - -pub(crate) mod backend; -pub(crate) mod db; - -/// Subsystem after receiving the first active leaf. -mod initialized; -use initialized::Initialized; - -/// Provider of data scraped from chain. -/// -/// If we have seen a candidate included somewhere, we should treat it as priority and will be able -/// to provide an ordering for participation. Thus a dispute for a candidate where we can get some -/// ordering is high-priority (we know it is a valid dispute) and those can be ordered by -/// `participation` based on `relay_parent` block number and other metrics, so each validator will -/// participate in disputes in a similar order, which ensures we will be resolving disputes, even -/// under heavy load. -mod scraping; -use scraping::ChainScraper; - -/// When importing votes we will check via the `ordering` module, whether or not we know of the -/// candidate to be included somewhere. If not, the votes might be spam, in this case we want to -/// limit the amount of locally imported votes, to prevent DoS attacks/resource exhaustion. The -/// `spam_slots` module helps keeping track of unconfirmed disputes per validators, if a spam slot -/// gets full, we will drop any further potential spam votes from that validator and report back -/// that the import failed. Which will lead to any honest validator to retry, thus the spam slots -/// can be relatively small, as a drop is not fatal. -mod spam_slots; - -/// Handling of participation requests via `Participation`. -/// -/// `Participation` provides an API (`Participation::queue_participation`) for queuing of dispute participations and will process those -/// participation requests, such that most important/urgent disputes will be resolved and processed -/// first and more importantly it will order requests in a way so disputes will get resolved, even -/// if there are lots of them. -pub(crate) mod participation; - -use crate::status::Clock; - -use crate::LOG_TARGET; - -#[cfg(test)] -mod tests; - -/// An implementation of the dispute coordinator subsystem. -pub struct DisputeCoordinatorSubsystem { - config: Config, - store: Arc, - keystore: Arc, - metrics: Metrics, -} - -/// Configuration for the dispute coordinator subsystem. -#[derive(Debug, Clone, Copy)] -pub struct Config { - /// The data column in the store to use for dispute data. - pub col_data: u32, -} - -impl Config { - fn column_config(&self) -> db::v1::ColumnConfiguration { - db::v1::ColumnConfiguration { col_data: self.col_data } - } -} - -impl overseer::Subsystem for DisputeCoordinatorSubsystem -where - Context: SubsystemContext, - Context: overseer::SubsystemContext, -{ - fn start(self, ctx: Context) -> SpawnedSubsystem { - let future = async { - let backend = DbBackend::new(self.store.clone(), self.config.column_config()); - self.run(ctx, backend, Box::new(SystemClock)) - .await - .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) - } - .boxed(); - - SpawnedSubsystem { name: "dispute-coordinator-subsystem", future } - } -} - -impl DisputeCoordinatorSubsystem { - /// Create a new instance of the subsystem. - pub fn new( - store: Arc, - config: Config, - keystore: Arc, - metrics: Metrics, - ) -> Self { - Self { store, config, keystore, metrics } - } - - /// Initialize and afterwards run `Initialized::run`. - async fn run( - self, - mut ctx: Context, - backend: B, - clock: Box, - ) -> FatalResult<()> - where - Context: overseer::SubsystemContext, - Context: SubsystemContext, - B: Backend + 'static, - { - let res = self.initialize(&mut ctx, backend, &*clock).await?; - - let (participations, votes, first_leaf, initialized, backend) = match res { - // Concluded: - None => return Ok(()), - Some(r) => r, - }; - - initialized - .run(ctx, backend, participations, votes, Some(first_leaf), clock) - .await - } - - /// Make sure to recover participations properly on startup. - async fn initialize( - self, - ctx: &mut Context, - mut backend: B, - clock: &(dyn Clock), - ) -> FatalResult< - Option<( - Vec<(ParticipationPriority, ParticipationRequest)>, - Vec, - ActivatedLeaf, - Initialized, - B, - )>, - > - where - Context: overseer::SubsystemContext, - Context: SubsystemContext, - B: Backend + 'static, - { - loop { - let (first_leaf, rolling_session_window) = match get_rolling_session_window(ctx).await { - Ok(Some(update)) => update, - Ok(None) => { - gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); - return Ok(None) - }, - Err(e) => { - e.split()?.log(); - continue - }, - }; - - let mut overlay_db = OverlayedBackend::new(&mut backend); - let (participations, votes, spam_slots, ordering_provider) = match self - .handle_startup( - ctx, - first_leaf.clone(), - &rolling_session_window, - &mut overlay_db, - clock, - ) - .await - { - Ok(v) => v, - Err(e) => { - e.split()?.log(); - continue - }, - }; - if !overlay_db.is_empty() { - let ops = overlay_db.into_write_ops(); - backend.write(ops)?; - } - - return Ok(Some(( - participations, - votes, - first_leaf, - Initialized::new(self, rolling_session_window, spam_slots, ordering_provider), - backend, - ))) - } - } - - // Restores the subsystem's state before proceeding with the main event loop. - // - // - Prune any old disputes. - // - Find disputes we need to participate in. - // - Initialize spam slots & OrderingProvider. - async fn handle_startup( - &self, - ctx: &mut Context, - initial_head: ActivatedLeaf, - rolling_session_window: &RollingSessionWindow, - overlay_db: &mut OverlayedBackend<'_, impl Backend>, - clock: &dyn Clock, - ) -> Result<( - Vec<(ParticipationPriority, ParticipationRequest)>, - Vec, - SpamSlots, - ChainScraper, - )> - where - Context: overseer::SubsystemContext, - Context: SubsystemContext, - { - // Prune obsolete disputes: - db::v1::note_current_session(overlay_db, rolling_session_window.latest_session())?; - - let active_disputes = match overlay_db.load_recent_disputes() { - Ok(Some(disputes)) => - get_active_with_status(disputes.into_iter(), clock.now()).collect(), - Ok(None) => Vec::new(), - Err(e) => { - gum::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e); - return Err(e.into()) - }, - }; - - let mut participation_requests = Vec::new(); - let mut unconfirmed_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new(); - let (mut scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?; - for ((session, ref candidate_hash), status) in active_disputes { - let votes: CandidateVotes = - match overlay_db.load_candidate_votes(session, candidate_hash) { - Ok(Some(votes)) => votes.into(), - Ok(None) => continue, - Err(e) => { - gum::error!( - target: LOG_TARGET, - "Failed initial load of candidate votes: {:?}", - e - ); - continue - }, - }; - - let validators = match rolling_session_window.session_info(session) { - None => { - gum::warn!( - target: LOG_TARGET, - session, - "Missing info for session which has an active dispute", - ); - continue - }, - Some(info) => info.validators.clone(), - }; - - let n_validators = validators.len(); - let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect(); - - // Determine if there are any missing local statements for this dispute. Validators are - // filtered if: - // 1) their statement already exists, or - // 2) the validator key is not in the local keystore (i.e. the validator is remote). - // The remaining set only contains local validators that are also missing statements. - let missing_local_statement = validators - .iter() - .enumerate() - .map(|(index, validator)| (ValidatorIndex(index as _), validator)) - .any(|(index, validator)| { - !voted_indices.contains(&index) && - self.keystore - .key_pair::(validator) - .ok() - .map_or(false, |v| v.is_some()) - }); - - let is_included = scraper.is_candidate_included(&votes.candidate_receipt.hash()); - - if !status.is_confirmed_concluded() && !is_included { - unconfirmed_disputes.insert((session, *candidate_hash), voted_indices); - } - - // Participate for all non-concluded disputes which do not have a - // recorded local statement. - if missing_local_statement { - participation_requests.push(( - ParticipationPriority::with_priority_if(is_included), - ParticipationRequest::new( - votes.candidate_receipt.clone(), - session, - n_validators, - ), - )); - } - } - - Ok(( - participation_requests, - votes, - SpamSlots::recover_from_state(unconfirmed_disputes), - scraper, - )) - } -} - -/// Wait for `ActiveLeavesUpdate` on startup, returns `None` if `Conclude` signal came first. -async fn get_rolling_session_window( - ctx: &mut Context, -) -> Result> -where - Context: overseer::SubsystemContext, - Context: SubsystemContext, -{ - if let Some(leaf) = wait_for_first_leaf(ctx).await? { - Ok(Some(( - leaf.clone(), - RollingSessionWindow::new(ctx, DISPUTE_WINDOW, leaf.hash) - .await - .map_err(JfyiError::RollingSessionWindow)?, - ))) - } else { - Ok(None) - } -} - -/// Wait for `ActiveLeavesUpdate`, returns `None` if `Conclude` signal came first. -async fn wait_for_first_leaf(ctx: &mut Context) -> Result> -where - Context: overseer::SubsystemContext, - Context: SubsystemContext, -{ - loop { - match ctx.recv().await? { - FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(None), - FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { - if let Some(activated) = update.activated { - return Ok(Some(activated)) - } - }, - FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {}, - FromOverseer::Communication { msg } => - // NOTE: We could technically actually handle a couple of message types, even if - // not initialized (e.g. all requests that only query the database). The problem - // is, we would deliver potentially outdated information, especially in the event - // of bugs where initialization fails for a while (e.g. `SessionInfo`s are not - // available). So instead of telling subsystems, everything is fine, because of an - // hour old database state, we should rather cancel contained oneshots and delay - // finality until we are fully functional. - { - gum::warn!( - target: LOG_TARGET, - ?msg, - "Received msg before first active leaves update. This is not expected - message will be dropped." - ) - }, - } - } -} diff --git a/polkadot/node/core/dispute-coordinator/src/real/scraping/mod.rs b/polkadot/node/core/dispute-coordinator/src/scraping/mod.rs similarity index 100% rename from polkadot/node/core/dispute-coordinator/src/real/scraping/mod.rs rename to polkadot/node/core/dispute-coordinator/src/scraping/mod.rs diff --git a/polkadot/node/core/dispute-coordinator/src/real/scraping/tests.rs b/polkadot/node/core/dispute-coordinator/src/scraping/tests.rs similarity index 100% rename from polkadot/node/core/dispute-coordinator/src/real/scraping/tests.rs rename to polkadot/node/core/dispute-coordinator/src/scraping/tests.rs diff --git a/polkadot/node/core/dispute-coordinator/src/real/spam_slots.rs b/polkadot/node/core/dispute-coordinator/src/spam_slots.rs similarity index 99% rename from polkadot/node/core/dispute-coordinator/src/real/spam_slots.rs rename to polkadot/node/core/dispute-coordinator/src/spam_slots.rs index 5b44351215..76cae0a721 100644 --- a/polkadot/node/core/dispute-coordinator/src/real/spam_slots.rs +++ b/polkadot/node/core/dispute-coordinator/src/spam_slots.rs @@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet}; use polkadot_primitives::v2::{CandidateHash, SessionIndex, ValidatorIndex}; -use crate::real::LOG_TARGET; +use crate::LOG_TARGET; /// Type used for counting potential spam votes. type SpamCount = u32; diff --git a/polkadot/node/core/dispute-coordinator/src/real/tests.rs b/polkadot/node/core/dispute-coordinator/src/tests.rs similarity index 99% rename from polkadot/node/core/dispute-coordinator/src/real/tests.rs rename to polkadot/node/core/dispute-coordinator/src/tests.rs index ff95631034..d74a1e0e46 100644 --- a/polkadot/node/core/dispute-coordinator/src/real/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/tests.rs @@ -60,13 +60,11 @@ use polkadot_primitives::v2::{ }; use crate::{ + backend::Backend, metrics::Metrics, - real::{ - backend::Backend, - participation::{participation_full_happy_path, participation_missing_availability}, - Config, DisputeCoordinatorSubsystem, - }, + participation::{participation_full_happy_path, participation_missing_availability}, status::{Clock, Timestamp, ACTIVE_DURATION_SECS}, + Config, DisputeCoordinatorSubsystem, }; use super::db::v1::DbBackend;