dispute-coordinator: Cleanup + Bug fixes (#5323)

* Make import confirmation oneshot optional.

* Cleanup for further improvements.

* Queue adoptions.

* Fix fieldname

* Use correct relay parent

* Fix scraper tests.

* Small optimization.

* Fix all tests.

* Fix other tests.

* fmt

* spelling

* Fix warning.
This commit is contained in:
Robert Klotzner
2022-04-19 14:51:02 +02:00
committed by GitHub
parent b3540e76e3
commit f820db494f
22 changed files with 866 additions and 850 deletions
+1
View File
@@ -6814,6 +6814,7 @@ dependencies = [
"assert_matches",
"fatality",
"futures 0.3.21",
"futures-timer",
"kvdb",
"kvdb-memorydb",
"lru 0.7.5",
@@ -950,15 +950,12 @@ async fn handle_actions(
dispute_statement,
validator_index,
} => {
// TODO: Log confirmation results in an efficient way:
// https://github.com/paritytech/polkadot/issues/5156
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
ctx.send_message(DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt,
session,
statements: vec![(dispute_statement, validator_index)],
pending_confirmation,
pending_confirmation: None,
})
.await;
},
@@ -25,7 +25,6 @@ use polkadot_node_primitives::{
use polkadot_node_subsystem::{
messages::{
AllMessages, ApprovalVotingMessage, AssignmentCheckResult, AvailabilityRecoveryMessage,
ImportStatementsResult,
},
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
};
@@ -605,11 +604,10 @@ async fn check_and_import_approval(
overseer_recv(overseer).await,
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ImportStatements {
candidate_hash: c_hash,
pending_confirmation,
pending_confirmation: None,
..
}) => {
assert_eq!(c_hash, candidate_hash);
let _ = pending_confirmation.send(ImportStatementsResult::ValidImport);
}
);
}
+1 -4
View File
@@ -887,16 +887,13 @@ impl CandidateBackingJob {
if let (Some(candidate_receipt), Some(dispute_statement)) =
(maybe_candidate_receipt, maybe_signed_dispute_statement)
{
// TODO: Log confirmation results in an efficient way:
// https://github.com/paritytech/polkadot/issues/5156
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
sender
.send_message(DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt,
session: self.session_index,
statements: vec![(dispute_statement, validator_index)],
pending_confirmation,
pending_confirmation: None,
})
.await;
}
+2 -5
View File
@@ -28,9 +28,7 @@ use polkadot_primitives::v2::{
ScheduledCore,
};
use polkadot_subsystem::{
messages::{
CollatorProtocolMessage, ImportStatementsResult, RuntimeApiMessage, RuntimeApiRequest,
},
messages::{CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest},
ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal,
};
use sp_application_crypto::AppKey;
@@ -284,7 +282,7 @@ async fn test_dispute_coordinator_notifications(
candidate_receipt: c_receipt,
session: s,
statements,
pending_confirmation,
pending_confirmation: None,
}
) => {
assert_eq!(c_hash, candidate_hash);
@@ -292,7 +290,6 @@ async fn test_dispute_coordinator_notifications(
assert_eq!(s, session);
assert_eq!(statements.len(), 1);
assert_eq!(statements[0].1, validator_index);
let _ = pending_confirmation.send(ImportStatementsResult::ValidImport);
}
)
}
@@ -29,6 +29,7 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
assert_matches = "1.4.0"
test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" }
futures-timer = "3.0.2"
[features]
# If not enabled, the dispute coordinator will do nothing.
@@ -82,7 +82,7 @@ pub enum Error {
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error("Dispute import confirmation send failed (receiver canceled)")]
#[error("Could not send import confirmation (receiver canceled)")]
DisputeImportOneshotSend,
#[error(transparent)]
@@ -118,7 +118,7 @@ impl JfyiError {
pub fn log(self) {
match self {
// don't spam the log with spurious errors
Self::Runtime(_) | Self::Oneshot(_) | Self::DisputeImportOneshotSend => {
Self::Runtime(_) | Self::Oneshot(_) => {
gum::debug!(target: LOG_TARGET, error = ?self)
},
// it's worth reporting otherwise
@@ -21,22 +21,18 @@ use std::{
sync::Arc,
};
use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
};
use lru::LruCache;
use futures::{channel::mpsc, FutureExt, StreamExt};
use sc_keystore::LocalKeystore;
use polkadot_node_primitives::{
CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement,
DISPUTE_WINDOW, MAX_FINALITY_LAG,
DISPUTE_WINDOW,
};
use polkadot_node_subsystem::{
messages::{
BlockDescription, DisputeCoordinatorMessage, DisputeDistributionMessage,
ImportStatementsResult, RuntimeApiMessage, RuntimeApiRequest,
ImportStatementsResult,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SubsystemContext,
};
@@ -52,7 +48,7 @@ use polkadot_primitives::v2::{
use crate::{
error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result},
metrics::Metrics,
real::{ordering::get_finalized_block_number, DisputeCoordinatorSubsystem},
real::DisputeCoordinatorSubsystem,
status::{get_active_with_status, Clock, DisputeStatus, Timestamp},
LOG_TARGET,
};
@@ -60,19 +56,15 @@ use crate::{
use super::{
backend::Backend,
db,
ordering::{CandidateComparator, OrderingProvider},
participation::{
self, Participation, ParticipationRequest, ParticipationStatement, WorkerMessageReceiver,
self, Participation, ParticipationPriority, ParticipationRequest, ParticipationStatement,
WorkerMessageReceiver,
},
scraping::ChainScraper,
spam_slots::SpamSlots,
OverlayedBackend,
};
// The capacity and scrape depth are equal to the maximum allowed unfinalized depth.
const LRU_SCRAPED_BLOCKS_CAPACITY: usize = MAX_FINALITY_LAG as usize;
// This is in sync with `MAX_FINALITY_LAG` in relay chain selection & node primitives.
const MAX_BATCH_SCRAPE_ANCESTORS: u32 = MAX_FINALITY_LAG;
/// After the first active leaves update we transition to `Initialized` state.
///
/// Before the first active leaves update we can't really do much. We cannot check incoming
@@ -84,14 +76,12 @@ pub struct Initialized {
highest_session: SessionIndex,
spam_slots: SpamSlots,
participation: Participation,
ordering_provider: OrderingProvider,
scraper: ChainScraper,
participation_receiver: WorkerMessageReceiver,
metrics: Metrics,
// This tracks only rolling session window failures.
// It can be a `Vec` if the need to track more arises.
error: Option<SessionsUnavailable>,
/// Latest relay blocks that have been successfully scraped.
last_scraped_blocks: LruCache<Hash, ()>,
}
impl Initialized {
@@ -100,7 +90,7 @@ impl Initialized {
subsystem: DisputeCoordinatorSubsystem,
rolling_session_window: RollingSessionWindow,
spam_slots: SpamSlots,
ordering_provider: OrderingProvider,
scraper: ChainScraper,
) -> Self {
let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;
@@ -113,12 +103,11 @@ impl Initialized {
rolling_session_window,
highest_session,
spam_slots,
ordering_provider,
scraper,
participation,
participation_receiver,
metrics,
error: None,
last_scraped_blocks: LruCache::new(LRU_SCRAPED_BLOCKS_CAPACITY),
}
}
@@ -129,7 +118,8 @@ impl Initialized {
mut self,
mut ctx: Context,
mut backend: B,
mut participations: Vec<(Option<CandidateComparator>, ParticipationRequest)>,
mut participations: Vec<(ParticipationPriority, ParticipationRequest)>,
mut votes: Vec<ScrapedOnChainVotes>,
mut first_leaf: Option<ActivatedLeaf>,
clock: Box<dyn Clock>,
) -> FatalResult<()>
@@ -144,6 +134,7 @@ impl Initialized {
&mut ctx,
&mut backend,
&mut participations,
&mut votes,
&mut first_leaf,
&*clock,
)
@@ -165,7 +156,8 @@ impl Initialized {
&mut self,
ctx: &mut Context,
backend: &mut B,
participations: &mut Vec<(Option<CandidateComparator>, ParticipationRequest)>,
participations: &mut Vec<(ParticipationPriority, ParticipationRequest)>,
on_chain_votes: &mut Vec<ScrapedOnChainVotes>,
first_leaf: &mut Option<ActivatedLeaf>,
clock: &dyn Clock,
) -> Result<()>
@@ -174,17 +166,31 @@ impl Initialized {
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
B: Backend,
{
for (comparator, request) in participations.drain(..) {
self.participation.queue_participation(ctx, comparator, request).await?;
for (priority, request) in participations.drain(..) {
self.participation.queue_participation(ctx, priority, request).await?;
}
if let Some(first_leaf) = first_leaf.take() {
{
let mut overlay_db = OverlayedBackend::new(backend);
self.scrape_on_chain_votes(ctx, &mut overlay_db, first_leaf.hash, clock.now())
.await?;
for votes in on_chain_votes.drain(..) {
let _ = self
.process_on_chain_votes(ctx, &mut overlay_db, votes, clock.now())
.await
.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
?error,
"Skipping scraping block due to error",
);
});
}
if !overlay_db.is_empty() {
let ops = overlay_db.into_write_ops();
backend.write(ops)?;
}
}
if let Some(first_leaf) = first_leaf.take() {
// Also provide first leaf to participation for good measure.
self.participation
.process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf))
@@ -230,7 +236,7 @@ impl Initialized {
default_confirm
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, n)) => {
self.ordering_provider.process_finalized_block(&n);
self.scraper.process_finalized_block(&n);
default_confirm
},
FromOverseer::Communication { msg } =>
@@ -256,9 +262,8 @@ impl Initialized {
update: ActiveLeavesUpdate,
now: u64,
) -> Result<()> {
self.ordering_provider
.process_active_leaves_update(ctx.sender(), &update)
.await?;
let on_chain_votes =
self.scraper.process_active_leaves_update(ctx.sender(), &update).await?;
self.participation.process_active_leaves_update(ctx, &update).await?;
if let Some(new_leaf) = update.activated {
@@ -294,76 +299,14 @@ impl Initialized {
Ok(SessionWindowUpdate::Unchanged) => {},
};
// Scrape the head if above rolling session update went well.
if self.error.is_none() {
let _ = self
.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now)
.await
.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
"Skipping scraping block #{}({}) due to error: {}",
new_leaf.number,
new_leaf.hash,
err
);
});
}
// Try to scrape any blocks for which we could not get the current session or did not receive an
// active leaves update.
let ancestors = match get_finalized_block_number(ctx.sender()).await {
Ok(block_number) => {
// Limit our search to last finalized block, or up to max finality lag.
let block_number = std::cmp::max(
block_number,
new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS),
);
// Fetch ancestry up to and including the last finalized block.
// `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to
// pass in it's parent.
OrderingProvider::get_block_ancestors(
ctx.sender(),
new_leaf.hash,
new_leaf.number,
block_number.saturating_sub(1),
&mut self.last_scraped_blocks,
)
.await
.unwrap_or_else(|err| {
gum::debug!(
target: LOG_TARGET,
activated_leaf = ?new_leaf,
error = ?err,
"Skipping leaf ancestors due to an error",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
})
},
Err(err) => {
gum::debug!(
target: LOG_TARGET,
activated_leaf = ?new_leaf,
error = ?err,
"Skipping leaf ancestors scraping",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
},
};
// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel.
for ancestor in ancestors {
let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err(
|err| {
for votes in on_chain_votes {
let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err(
|error| {
gum::warn!(
target: LOG_TARGET,
hash = ?ancestor,
error = ?err,
?error,
"Skipping scraping block due to error",
);
},
@@ -376,60 +319,17 @@ impl Initialized {
/// Scrapes on-chain votes (backing votes and concluded disputes) for a active leaf of the
/// relay chain.
async fn scrape_on_chain_votes(
async fn process_on_chain_votes(
&mut self,
ctx: &mut (impl SubsystemContext<Message = DisputeCoordinatorMessage>
+ overseer::SubsystemContext<Message = DisputeCoordinatorMessage>),
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
new_leaf: Hash,
votes: ScrapedOnChainVotes,
now: u64,
) -> Result<()> {
// Avoid scraping twice.
if self.last_scraped_blocks.get(&new_leaf).is_some() {
return Ok(())
}
// obtain the concluded disputes as well as the candidate backing votes
// from the new leaf
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = {
let (tx, rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
new_leaf,
RuntimeApiRequest::FetchOnChainVotes(tx),
))
.await;
match rx.await {
Ok(Ok(Some(val))) => val,
Ok(Ok(None)) => {
gum::trace!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
"No on chain votes stored for relay chain leaf");
return Ok(())
},
Ok(Err(e)) => {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
error = ?e,
"Could not retrieve on chain votes due to an API error");
return Ok(())
},
Err(e) => {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
error = ?e,
"Could not retrieve onchain votes due to oneshot cancellation");
return Ok(())
},
}
};
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes;
if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
// This block is not interesting as it doesnt contain any backing votes or disputes. We'll
// mark it here as scraped to prevent further processing.
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}
@@ -444,7 +344,6 @@ impl Initialized {
} else {
gum::warn!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Could not retrieve session info from rolling session window",
);
@@ -454,6 +353,7 @@ impl Initialized {
// Scraped on-chain backing votes for the candidates with
// the new active leaf as if we received them via gossip.
for (candidate_receipt, backers) in backing_validators_per_candidate {
let relay_parent = candidate_receipt.descriptor.relay_parent;
let candidate_hash = candidate_receipt.hash();
let statements = backers
.into_iter()
@@ -463,10 +363,11 @@ impl Initialized {
.get(validator_index.0 as usize)
.or_else(|| {
gum::error!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
"Missing public key for validator {:?}",
&validator_index);
target: LOG_TARGET,
?session,
?validator_index,
"Missing public key for validator",
);
None
})
.cloned()?;
@@ -474,9 +375,9 @@ impl Initialized {
let valid_statement_kind =
match attestation.to_compact_statement(candidate_hash) {
CompactStatement::Seconded(_) =>
ValidDisputeStatementKind::BackingSeconded(new_leaf),
ValidDisputeStatementKind::BackingSeconded(relay_parent),
CompactStatement::Valid(_) =>
ValidDisputeStatementKind::BackingValid(new_leaf),
ValidDisputeStatementKind::BackingValid(relay_parent),
};
let signed_dispute_statement =
SignedDisputeStatement::new_unchecked_from_trusted_source(
@@ -502,22 +403,21 @@ impl Initialized {
)
.await?;
match import_result {
ImportStatementsResult::ValidImport => gum::trace!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Imported backing vote from on-chain"),
ImportStatementsResult::InvalidImport => gum::warn!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Attempted import of on-chain backing votes failed"),
ImportStatementsResult::ValidImport => gum::trace!(
target: LOG_TARGET,
?relay_parent,
?session,
"Imported backing votes from chain"
),
ImportStatementsResult::InvalidImport => gum::warn!(
target: LOG_TARGET,
?relay_parent,
?session,
"Attempted import of on-chain backing votes failed"
),
}
}
if disputes.is_empty() {
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}
// Import concluded disputes from on-chain, this already went through a vote so it's assumed
// as verified. This will only be stored, gossiping it is not necessary.
@@ -535,7 +435,7 @@ impl Initialized {
} else {
gum::warn!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
?candidate_hash,
?session,
"Could not retrieve session info from rolling session window for recently concluded dispute");
return None
@@ -547,7 +447,7 @@ impl Initialized {
.or_else(|| {
gum::error!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
?candidate_hash,
?session,
"Missing public key for validator {:?} that participated in concluded dispute",
&validator_index);
@@ -580,20 +480,21 @@ impl Initialized {
)
.await?;
match import_result {
ImportStatementsResult::ValidImport => gum::trace!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?candidate_hash,
?session,
"Imported statement of concluded dispute from on-chain"),
ImportStatementsResult::InvalidImport => gum::warn!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?candidate_hash,
?session,
"Attempted import of on-chain statement of concluded dispute failed"),
ImportStatementsResult::ValidImport => gum::trace!(
target: LOG_TARGET,
?candidate_hash,
?session,
"Imported statement of concluded dispute from on-chain"
),
ImportStatementsResult::InvalidImport => gum::warn!(
target: LOG_TARGET,
?candidate_hash,
?session,
"Attempted import of on-chain statement of concluded dispute failed"
),
}
}
self.last_scraped_blocks.put(new_leaf, ());
Ok(())
}
@@ -623,11 +524,13 @@ impl Initialized {
now,
)
.await?;
let report = move || {
pending_confirmation
let report = move || match pending_confirmation {
Some(pending_confirmation) => pending_confirmation
.send(outcome)
.map_err(|_| JfyiError::DisputeImportOneshotSend)
.map_err(|_| JfyiError::DisputeImportOneshotSend),
None => Ok(()),
};
match outcome {
ImportStatementsResult::InvalidImport => {
report()?;
@@ -811,14 +714,13 @@ impl Initialized {
our_votes.retain(|index| controlled_indices.contains(index));
!our_votes.is_empty()
};
let was_confirmed = recent_disputes
.get(&(session, candidate_hash))
.map_or(false, |s| s.is_confirmed_concluded());
let comparator = self
.ordering_provider
.candidate_comparator(ctx.sender(), &candidate_receipt)
.await?;
let is_included = comparator.is_some();
let is_included = self.scraper.is_candidate_included(&candidate_receipt.hash());
let is_local = statements
.iter()
.find(|(_, index)| controlled_indices.contains(index))
@@ -927,13 +829,14 @@ impl Initialized {
// Participate in dispute if the imported vote was not local, we did not vote before either
// and we actually have keys to issue a local vote.
if !is_local && !voted_already && is_disputed && !controlled_indices.is_empty() {
let priority = ParticipationPriority::with_priority_if(is_included);
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?candidate_receipt.hash(),
priority = ?comparator.is_some(),
?priority,
"Queuing participation for candidate"
);
if comparator.is_some() {
if priority.is_priority() {
self.metrics.on_queued_priority_participation();
} else {
self.metrics.on_queued_best_effort_participation();
@@ -944,7 +847,7 @@ impl Initialized {
.participation
.queue_participation(
ctx,
comparator,
priority,
ParticipationRequest::new(candidate_receipt, session, n_validators),
)
.await;
@@ -38,7 +38,7 @@ use polkadot_node_subsystem::{
use polkadot_node_subsystem_util::{
database::Database, rolling_session_window::RollingSessionWindow,
};
use polkadot_primitives::v2::{ValidatorIndex, ValidatorPair};
use polkadot_primitives::v2::{ScrapedOnChainVotes, ValidatorIndex, ValidatorPair};
use crate::{
error::{FatalResult, JfyiError, Result},
@@ -50,8 +50,7 @@ use db::v1::DbBackend;
use fatality::Split;
use self::{
ordering::CandidateComparator,
participation::ParticipationRequest,
participation::{ParticipationPriority, ParticipationRequest},
spam_slots::{SpamSlots, UnconfirmedDisputes},
};
@@ -62,8 +61,7 @@ pub(crate) mod db;
mod initialized;
use initialized::Initialized;
/// Provider of an ordering for candidates for dispute participation, see
/// [`participation`] below.
/// Provider of data scraped from chain.
///
/// If we have seen a candidate included somewhere, we should treat it as priority and will be able
/// to provide an ordering for participation. Thus a dispute for a candidate where we can get some
@@ -71,8 +69,8 @@ use initialized::Initialized;
/// `participation` based on `relay_parent` block number and other metrics, so each validator will
/// participate in disputes in a similar order, which ensures we will be resolving disputes, even
/// under heavy load.
mod ordering;
use ordering::OrderingProvider;
mod scraping;
use scraping::ChainScraper;
/// When importing votes we will check via the `ordering` module, whether or not we know of the
/// candidate to be included somewhere. If not, the votes might be spam, in this case we want to
@@ -162,13 +160,15 @@ impl DisputeCoordinatorSubsystem {
{
let res = self.initialize(&mut ctx, backend, &*clock).await?;
let (participations, first_leaf, initialized, backend) = match res {
let (participations, votes, first_leaf, initialized, backend) = match res {
// Concluded:
None => return Ok(()),
Some(r) => r,
};
initialized.run(ctx, backend, participations, Some(first_leaf), clock).await
initialized
.run(ctx, backend, participations, votes, Some(first_leaf), clock)
.await
}
/// Make sure to recover participations properly on startup.
@@ -179,7 +179,8 @@ impl DisputeCoordinatorSubsystem {
clock: &(dyn Clock),
) -> FatalResult<
Option<(
Vec<(Option<CandidateComparator>, ParticipationRequest)>,
Vec<(ParticipationPriority, ParticipationRequest)>,
Vec<ScrapedOnChainVotes>,
ActivatedLeaf,
Initialized,
B,
@@ -203,12 +204,8 @@ impl DisputeCoordinatorSubsystem {
},
};
// Before we move to the initialized state we need to check if we got at
// least on finality notification to prevent large ancestry block scraping,
// when the node is syncing.
let mut overlay_db = OverlayedBackend::new(&mut backend);
let (participations, spam_slots, ordering_provider) = match self
let (participations, votes, spam_slots, ordering_provider) = match self
.handle_startup(
ctx,
first_leaf.clone(),
@@ -231,6 +228,7 @@ impl DisputeCoordinatorSubsystem {
return Ok(Some((
participations,
votes,
first_leaf,
Initialized::new(self, rolling_session_window, spam_slots, ordering_provider),
backend,
@@ -251,9 +249,10 @@ impl DisputeCoordinatorSubsystem {
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
clock: &dyn Clock,
) -> Result<(
Vec<(Option<CandidateComparator>, ParticipationRequest)>,
Vec<(ParticipationPriority, ParticipationRequest)>,
Vec<ScrapedOnChainVotes>,
SpamSlots,
OrderingProvider,
ChainScraper,
)>
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
@@ -274,7 +273,7 @@ impl DisputeCoordinatorSubsystem {
let mut participation_requests = Vec::new();
let mut unconfirmed_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new();
let mut ordering_provider = OrderingProvider::new(ctx.sender(), initial_head).await?;
let (mut scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?;
for ((session, ref candidate_hash), status) in active_disputes {
let votes: CandidateVotes =
match overlay_db.load_candidate_votes(session, candidate_hash) {
@@ -322,10 +321,7 @@ impl DisputeCoordinatorSubsystem {
.map_or(false, |v| v.is_some())
});
let candidate_comparator = ordering_provider
.candidate_comparator(ctx.sender(), &votes.candidate_receipt)
.await?;
let is_included = candidate_comparator.is_some();
let is_included = scraper.is_candidate_included(&votes.candidate_receipt.hash());
if !status.is_confirmed_concluded() && !is_included {
unconfirmed_disputes.insert((session, *candidate_hash), voted_indices);
@@ -335,7 +331,7 @@ impl DisputeCoordinatorSubsystem {
// recorded local statement.
if missing_local_statement {
participation_requests.push((
candidate_comparator,
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(
votes.candidate_receipt.clone(),
session,
@@ -347,8 +343,9 @@ impl DisputeCoordinatorSubsystem {
Ok((
participation_requests,
votes,
SpamSlots::recover_from_state(unconfirmed_disputes),
ordering_provider,
scraper,
))
}
}
@@ -1,378 +0,0 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{
cmp::{Ord, Ordering, PartialOrd},
collections::{BTreeMap, HashSet},
};
use futures::channel::oneshot;
use lru::LruCache;
use polkadot_node_subsystem::{
messages::ChainApiMessage, ActivatedLeaf, ActiveLeavesUpdate, ChainApiError, SubsystemSender,
};
use polkadot_node_subsystem_util::runtime::get_candidate_events;
use polkadot_primitives::v2::{BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash};
use crate::{
error::{FatalError, FatalResult, Result},
LOG_TARGET,
};
#[cfg(test)]
mod tests;
const LRU_OBSERVED_BLOCKS_CAPACITY: usize = 20;
/// Provider of `CandidateComparator` for candidates.
pub struct OrderingProvider {
/// All candidates we have seen included, which not yet have been finalized.
included_candidates: HashSet<CandidateHash>,
/// including block -> `CandidateHash`
///
/// We need this to clean up `included_candidates` on `ActiveLeavesUpdate`.
candidates_by_block_number: BTreeMap<BlockNumber, HashSet<CandidateHash>>,
/// Latest relay blocks observed by the provider. We assume that ancestors of
/// cached blocks are already processed, i.e. we have saved corresponding
/// included candidates.
last_observed_blocks: LruCache<Hash, ()>,
}
/// `Comparator` for ordering of disputes for candidates.
///
/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness
/// between chains in case of equally old disputes.
///
/// Objective ordering between nodes is important in case of lots disputes, so nodes will pull in
/// the same direction and work on resolving the same disputes first. This ensures that we will
/// conclude some disputes, even if there are lots of them. While any objective ordering would
/// suffice for this goal, ordering by age ensures we are not only resolving disputes, but also
/// resolve the oldest one first, which are also the most urgent and important ones to resolve.
///
/// Note: That by `oldest` we mean oldest in terms of relay chain block number, for any block
/// number that has not yet been finalized. If a block has been finalized already it should be
/// treated as low priority when it comes to disputes, as even in the case of a negative outcome,
/// we are already too late. The ordering mechanism here serves to prevent this from happening in
/// the first place.
#[derive(Copy, Clone)]
#[cfg_attr(test, derive(Debug))]
pub struct CandidateComparator {
/// Block number of the relay parent.
///
/// Important, so we will be participating in oldest disputes first.
///
/// Note: In theory it would make more sense to use the `BlockNumber` of the including
/// block, as inclusion time is the actual relevant event when it comes to ordering. The
/// problem is, that a candidate can get included multiple times on forks, so the `BlockNumber`
/// of the including block is not unique. We could theoretically work around that problem, by
/// just using the lowest `BlockNumber` of all available including blocks - the problem is,
/// that is not stable. If a new fork appears after the fact, we would start ordering the same
/// candidate differently, which would result in the same candidate getting queued twice.
relay_parent_block_number: BlockNumber,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates.
candidate_hash: CandidateHash,
}
impl PartialEq for CandidateComparator {
fn eq(&self, other: &CandidateComparator) -> bool {
Ordering::Equal == self.cmp(other)
}
}
impl Eq for CandidateComparator {}
impl PartialOrd for CandidateComparator {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for CandidateComparator {
fn cmp(&self, other: &Self) -> Ordering {
match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) {
Ordering::Equal => (),
o => return o,
}
self.candidate_hash.cmp(&other.candidate_hash)
}
}
impl CandidateComparator {
/// Create a candidate comparator based on given (fake) values.
///
/// Useful for testing.
#[cfg(test)]
pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self {
Self { relay_parent_block_number: block_number, candidate_hash }
}
/// Check whether the given candidate hash belongs to this comparator.
pub fn matches_candidate(&self, candidate_hash: &CandidateHash) -> bool {
&self.candidate_hash == candidate_hash
}
}
impl OrderingProvider {
/// Limits the number of ancestors received for a single request.
pub(crate) const ANCESTRY_CHUNK_SIZE: usize = 10;
/// Limits the overall number of ancestors walked through for a given head.
pub(crate) const ANCESTRY_SIZE_LIMIT: usize = 1000;
/// Create a properly initialized `OrderingProvider`.
pub async fn new<Sender: SubsystemSender>(
sender: &mut Sender,
initial_head: ActivatedLeaf,
) -> Result<Self> {
let mut s = Self {
included_candidates: HashSet::new(),
candidates_by_block_number: BTreeMap::new(),
last_observed_blocks: LruCache::new(LRU_OBSERVED_BLOCKS_CAPACITY),
};
let update =
ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() };
s.process_active_leaves_update(sender, &update).await?;
Ok(s)
}
/// Retrieve a candidate `comparator` if available.
///
/// If not available, we can treat disputes concerning this candidate with low priority and
/// should use spam slots for such disputes.
pub async fn candidate_comparator<'a>(
&mut self,
sender: &mut impl SubsystemSender,
candidate: &CandidateReceipt,
) -> FatalResult<Option<CandidateComparator>> {
let candidate_hash = candidate.hash();
if !self.included_candidates.contains(&candidate_hash) {
return Ok(None)
}
let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? {
None => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API, but we saw candidate included?!"
);
return Ok(None)
},
Some(n) => n,
};
Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash }))
}
/// Query active leaves for any candidate `CandidateEvent::CandidateIncluded` events.
///
/// and updates current heads, so we can query candidates for all non finalized blocks.
pub async fn process_active_leaves_update<Sender: SubsystemSender>(
&mut self,
sender: &mut Sender,
update: &ActiveLeavesUpdate,
) -> crate::error::Result<()> {
if let Some(activated) = update.activated.as_ref() {
// Fetch last finalized block.
let ancestors = match get_finalized_block_number(sender).await {
Ok(block_number) => {
// Fetch ancestry up to last finalized block.
Self::get_block_ancestors(
sender,
activated.hash,
activated.number,
block_number,
&mut self.last_observed_blocks,
)
.await
.unwrap_or_else(|err| {
gum::debug!(
target: LOG_TARGET,
activated_leaf = ?activated,
error = ?err,
"Skipping leaf ancestors due to an error",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
})
},
Err(err) => {
gum::debug!(
target: LOG_TARGET,
activated_leaf = ?activated,
error = ?err,
"Failed to retrieve last finalized block number",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
},
};
// Ancestors block numbers are consecutive in the descending order.
let earliest_block_number = activated.number - ancestors.len() as u32;
let block_numbers = (earliest_block_number..=activated.number).rev();
let block_hashes = std::iter::once(activated.hash).chain(ancestors);
for (block_num, block_hash) in block_numbers.zip(block_hashes) {
// Get included events:
let included = get_candidate_events(sender, block_hash)
.await?
.into_iter()
.filter_map(|ev| match ev {
CandidateEvent::CandidateIncluded(receipt, _, _, _) => Some(receipt),
_ => None,
});
for receipt in included {
let candidate_hash = receipt.hash();
self.included_candidates.insert(candidate_hash);
self.candidates_by_block_number
.entry(block_num)
.or_default()
.insert(candidate_hash);
}
}
self.last_observed_blocks.put(activated.hash, ());
}
Ok(())
}
/// Prune finalized candidates.
///
/// Once a candidate lives in a relay chain block that's behind the finalized chain/got
/// finalized, we can treat it as low priority.
pub fn process_finalized_block(&mut self, finalized: &BlockNumber) {
let not_finalized = self.candidates_by_block_number.split_off(finalized);
let finalized = std::mem::take(&mut self.candidates_by_block_number);
self.candidates_by_block_number = not_finalized;
// Clean up finalized:
for finalized_candidate in finalized.into_values().flatten() {
self.included_candidates.remove(&finalized_candidate);
}
}
/// Returns ancestors of `head` in the descending order, stopping
/// either at the block present in cache or at `target_ancestor`.
///
/// Suited specifically for querying non-finalized chains, thus
/// doesn't rely on block numbers.
///
/// Both `head` and last are **not** included in the result.
pub async fn get_block_ancestors<Sender: SubsystemSender>(
sender: &mut Sender,
mut head: Hash,
mut head_number: BlockNumber,
target_ancestor: BlockNumber,
lookup_cache: &mut LruCache<Hash, ()>,
) -> Result<Vec<Hash>> {
let mut ancestors = Vec::new();
if lookup_cache.get(&head).is_some() || head_number <= target_ancestor {
return Ok(ancestors)
}
loop {
let (tx, rx) = oneshot::channel();
let hashes = {
sender
.send_message(
ChainApiMessage::Ancestors {
hash: head,
k: Self::ANCESTRY_CHUNK_SIZE,
response_channel: tx,
}
.into(),
)
.await;
rx.await
.or(Err(FatalError::ChainApiSenderDropped))?
.map_err(FatalError::ChainApiAncestors)?
};
let earliest_block_number = match head_number.checked_sub(hashes.len() as u32) {
Some(number) => number,
None => {
// It's assumed that it's impossible to retrieve
// more than N ancestors for block number N.
gum::error!(
target: LOG_TARGET,
"Received {} ancestors for block number {} from Chain API",
hashes.len(),
head_number,
);
return Ok(ancestors)
},
};
// The reversed order is parent, grandparent, etc. excluding the head.
let block_numbers = (earliest_block_number..head_number).rev();
for (block_number, hash) in block_numbers.zip(&hashes) {
// Return if we either met target/cached block or
// hit the size limit for the returned ancestry of head.
if lookup_cache.get(hash).is_some() ||
block_number <= target_ancestor ||
ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT
{
return Ok(ancestors)
}
ancestors.push(*hash);
}
match hashes.last() {
Some(last_hash) => {
head = *last_hash;
head_number = earliest_block_number;
},
None => break,
}
}
return Ok(ancestors)
}
}
async fn send_message_fatal<Sender, Response>(
sender: &mut Sender,
message: ChainApiMessage,
receiver: oneshot::Receiver<std::result::Result<Response, ChainApiError>>,
) -> FatalResult<Response>
where
Sender: SubsystemSender,
{
sender.send_message(message.into()).await;
receiver
.await
.map_err(|_| FatalError::ChainApiSenderDropped)?
.map_err(FatalError::ChainApiAncestors)
}
async fn get_block_number(
sender: &mut impl SubsystemSender,
relay_parent: Hash,
) -> FatalResult<Option<BlockNumber>> {
let (tx, rx) = oneshot::channel();
send_message_fatal(sender, ChainApiMessage::BlockNumber(relay_parent, tx), rx).await
}
pub async fn get_finalized_block_number(
sender: &mut impl SubsystemSender,
) -> FatalResult<BlockNumber> {
let (number_tx, number_rx) = oneshot::channel();
send_message_fatal(sender, ChainApiMessage::FinalizedBlockNumber(number_tx), number_rx).await
}
@@ -15,11 +15,15 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashSet;
#[cfg(test)]
use std::time::Duration;
use futures::{
channel::{mpsc, oneshot},
FutureExt, SinkExt,
};
#[cfg(test)]
use futures_timer::Delay;
use polkadot_node_primitives::{ValidationResult, APPROVAL_EXECUTION_TIMEOUT};
use polkadot_node_subsystem::{
@@ -31,8 +35,7 @@ use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash
use crate::real::LOG_TARGET;
use super::ordering::CandidateComparator;
use crate::error::{FatalError, FatalResult, JfyiError, Result};
use crate::error::{FatalError, FatalResult, Result};
#[cfg(test)]
mod tests;
@@ -41,7 +44,7 @@ pub use tests::{participation_full_happy_path, participation_missing_availabilit
mod queues;
use queues::Queues;
pub use queues::{ParticipationRequest, QueueError};
pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};
/// How many participation processes do we want to run in parallel the most.
///
@@ -144,7 +147,7 @@ impl Participation {
pub async fn queue_participation<Context: SubsystemContext>(
&mut self,
ctx: &mut Context,
comparator: Option<CandidateComparator>,
priority: ParticipationPriority,
req: ParticipationRequest,
) -> Result<()> {
// Participation already running - we can ignore that request:
@@ -159,7 +162,7 @@ impl Participation {
}
}
// Out of capacity/no recent block yet - queue:
Ok(self.queue.queue(comparator, req).map_err(JfyiError::QueueError)?)
self.queue.queue(ctx.sender(), priority, req).await
}
/// Message from a worker task was received - get the outcome.
@@ -249,6 +252,9 @@ async fn participate(
block_hash: Hash,
req: ParticipationRequest,
) {
#[cfg(test)]
// Hack for tests, so we get recovery messages not too early.
Delay::new(Duration::from_millis(100)).await;
// in order to validate a candidate we need to start by recovering the
// available data
let (recover_available_data_tx, recover_available_data_rx) = oneshot::channel();
@@ -14,11 +14,19 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{BTreeMap, HashMap};
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
};
use polkadot_primitives::v2::{CandidateHash, CandidateReceipt, SessionIndex};
use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, SubsystemSender};
use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex};
use crate::real::ordering::CandidateComparator;
use crate::{
error::{FatalError, FatalResult, Result},
LOG_TARGET,
};
#[cfg(test)]
mod tests;
@@ -73,11 +81,34 @@ pub struct ParticipationRequest {
n_validators: usize,
}
/// Entry for the best effort queue.
struct BestEffortEntry {
req: ParticipationRequest,
/// How often was the above request added to the queue.
added_count: BestEffortCount,
/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
#[derive(Debug)]
pub enum ParticipationPriority {
BestEffort,
Priority,
}
impl ParticipationPriority {
/// Create `ParticipationPriority` with either `Priority`
///
/// or `BestEffort`.
pub fn with_priority_if(is_priority: bool) -> Self {
if is_priority {
Self::Priority
} else {
Self::BestEffort
}
}
/// Whether or not this is a priority entry.
///
/// If false, it is best effort.
pub fn is_priority(&self) -> bool {
match self {
Self::Priority => true,
Self::BestEffort => false,
}
}
}
/// What can go wrong when queuing a request.
@@ -123,23 +154,47 @@ impl Queues {
Self { best_effort: HashMap::new(), priority: BTreeMap::new() }
}
/// Will put message in queue, either priority or best effort depending on whether a
/// `CandidateComparator` was provided or not.
/// Will put message in queue, either priority or best effort depending on priority.
///
/// If the message was already previously present on best effort, it will be moved to priority
/// if a `CandidateComparator` has been passed now, otherwise the `added_count` on the best
/// effort queue will be bumped.
/// if it considered priority now, otherwise the `added_count` on the best effort queue will be
/// bumped.
///
/// Returns error in case a queue was found full already.
pub fn queue(
pub async fn queue(
&mut self,
sender: &mut impl SubsystemSender,
priority: ParticipationPriority,
req: ParticipationRequest,
) -> Result<()> {
let comparator = match priority {
ParticipationPriority::BestEffort => None,
ParticipationPriority::Priority =>
CandidateComparator::new(sender, &req.candidate_receipt).await?,
};
self.queue_with_comparator(comparator, req)?;
Ok(())
}
/// Get the next best request for dispute participation
///
/// if any. Priority queue is always considered first, then the best effort queue based on
/// `added_count`.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
// In case a candidate became best effort over time, we might have it also queued in
// the best effort queue - get rid of any such entry:
self.best_effort.remove(req.candidate_hash());
return Some(req)
}
self.pop_best_effort()
}
fn queue_with_comparator(
&mut self,
comparator: Option<CandidateComparator>,
req: ParticipationRequest,
) -> Result<(), QueueError> {
debug_assert!(comparator
.map(|c| c.matches_candidate(req.candidate_hash()))
.unwrap_or(true));
) -> std::result::Result<(), QueueError> {
if let Some(comparator) = comparator {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
@@ -161,20 +216,6 @@ impl Queues {
Ok(())
}
/// Get the next best request for dispute participation
///
/// if any. Priority queue is always considered first, then the best effort queue based on
/// `added_count`.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
// In case a candidate became best effort over time, we might have it also queued in
// the best effort queue - get rid of any such entry:
self.best_effort.remove(req.candidate_hash());
return Some(req)
}
self.pop_best_effort()
}
/// Get the next best from the best effort queue.
///
/// If there are multiple best - just pick one.
@@ -206,3 +247,115 @@ impl Queues {
}
}
}
/// Entry for the best effort queue.
struct BestEffortEntry {
req: ParticipationRequest,
/// How often was the above request added to the queue.
added_count: BestEffortCount,
}
/// `Comparator` for ordering of disputes for candidates.
///
/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness
/// between chains in case of equally old disputes.
///
/// Objective ordering between nodes is important in case of lots disputes, so nodes will pull in
/// the same direction and work on resolving the same disputes first. This ensures that we will
/// conclude some disputes, even if there are lots of them. While any objective ordering would
/// suffice for this goal, ordering by age ensures we are not only resolving disputes, but also
/// resolve the oldest one first, which are also the most urgent and important ones to resolve.
///
/// Note: That by `oldest` we mean oldest in terms of relay chain block number, for any block
/// number that has not yet been finalized. If a block has been finalized already it should be
/// treated as low priority when it comes to disputes, as even in the case of a negative outcome,
/// we are already too late. The ordering mechanism here serves to prevent this from happening in
/// the first place.
#[derive(Copy, Clone)]
#[cfg_attr(test, derive(Debug))]
struct CandidateComparator {
/// Block number of the relay parent.
///
/// Important, so we will be participating in oldest disputes first.
///
/// Note: In theory it would make more sense to use the `BlockNumber` of the including
/// block, as inclusion time is the actual relevant event when it comes to ordering. The
/// problem is, that a candidate can get included multiple times on forks, so the `BlockNumber`
/// of the including block is not unique. We could theoretically work around that problem, by
/// just using the lowest `BlockNumber` of all available including blocks - the problem is,
/// that is not stable. If a new fork appears after the fact, we would start ordering the same
/// candidate differently, which would result in the same candidate getting queued twice.
relay_parent_block_number: BlockNumber,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates.
candidate_hash: CandidateHash,
}
impl CandidateComparator {
/// Create a candidate comparator based on given (fake) values.
///
/// Useful for testing.
#[cfg(test)]
pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self {
Self { relay_parent_block_number: block_number, candidate_hash }
}
/// Create a candidate comparator for a given candidate.
///
/// Returns:
/// `Ok(None)` in case we could not lookup the candidate's relay parent, returns a
/// `FatalError` in case the chain API call fails with an unexpected error.
pub async fn new(
sender: &mut impl SubsystemSender,
candidate: &CandidateReceipt,
) -> FatalResult<Option<Self>> {
let candidate_hash = candidate.hash();
let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? {
None => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator could not be provided!"
);
return Ok(None)
},
Some(n) => n,
};
Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash }))
}
}
impl PartialEq for CandidateComparator {
fn eq(&self, other: &CandidateComparator) -> bool {
Ordering::Equal == self.cmp(other)
}
}
impl Eq for CandidateComparator {}
impl PartialOrd for CandidateComparator {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for CandidateComparator {
fn cmp(&self, other: &Self) -> Ordering {
match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) {
Ordering::Equal => (),
o => return o,
}
self.candidate_hash.cmp(&other.candidate_hash)
}
}
async fn get_block_number(
sender: &mut impl SubsystemSender,
relay_parent: Hash,
) -> FatalResult<Option<BlockNumber>> {
let (tx, rx) = oneshot::channel();
sender.send_message(ChainApiMessage::BlockNumber(relay_parent, tx).into()).await;
rx.await
.map_err(|_| FatalError::ChainApiSenderDropped)?
.map_err(FatalError::ChainApiAncestors)
}
@@ -18,9 +18,7 @@ use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
use assert_matches::assert_matches;
use polkadot_primitives::v2::{BlockNumber, Hash};
use crate::real::ordering::CandidateComparator;
use super::{ParticipationRequest, QueueError, Queues};
use super::{CandidateComparator, ParticipationRequest, QueueError, Queues};
/// Make a `ParticipationRequest` based on the given commitments hash.
fn make_participation_request(hash: Hash) -> ParticipationRequest {
@@ -52,21 +50,21 @@ fn ordering_works_as_expected() {
let req5 = make_participation_request(Hash::repeat_byte(0x05));
let req_full = make_participation_request(Hash::repeat_byte(0x06));
let req_prio_full = make_participation_request(Hash::repeat_byte(0x07));
queue.queue(None, req1.clone()).unwrap();
queue.queue_with_comparator(None, req1.clone()).unwrap();
queue
.queue(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone())
.queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone())
.unwrap();
queue.queue(None, req3.clone()).unwrap();
queue.queue_with_comparator(None, req3.clone()).unwrap();
queue
.queue(Some(make_dummy_comparator(&req_prio_2, 2)), req_prio_2.clone())
.queue_with_comparator(Some(make_dummy_comparator(&req_prio_2, 2)), req_prio_2.clone())
.unwrap();
queue.queue(None, req3.clone()).unwrap();
queue.queue(None, req5.clone()).unwrap();
queue.queue_with_comparator(None, req3.clone()).unwrap();
queue.queue_with_comparator(None, req5.clone()).unwrap();
assert_matches!(
queue.queue(Some(make_dummy_comparator(&req_prio_full, 3)), req_prio_full),
queue.queue_with_comparator(Some(make_dummy_comparator(&req_prio_full, 3)), req_prio_full),
Err(QueueError::PriorityFull)
);
assert_matches!(queue.queue(None, req_full), Err(QueueError::BestEffortFull));
assert_matches!(queue.queue_with_comparator(None, req_full), Err(QueueError::BestEffortFull));
assert_eq!(queue.dequeue(), Some(req_prio));
assert_eq!(queue.dequeue(), Some(req_prio_2));
@@ -91,22 +89,22 @@ fn candidate_is_only_dequeued_once() {
let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03));
let req_prio_then_best_effort = make_participation_request(Hash::repeat_byte(0x04));
queue.queue(None, req1.clone()).unwrap();
queue.queue_with_comparator(None, req1.clone()).unwrap();
queue
.queue(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone())
.queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone())
.unwrap();
// Insert same best effort again:
queue.queue(None, req1.clone()).unwrap();
queue.queue_with_comparator(None, req1.clone()).unwrap();
// insert same prio again:
queue
.queue(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone())
.queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone())
.unwrap();
// Insert first as best effort:
queue.queue(None, req_best_effort_then_prio.clone()).unwrap();
queue.queue_with_comparator(None, req_best_effort_then_prio.clone()).unwrap();
// Then as prio:
queue
.queue(
.queue_with_comparator(
Some(make_dummy_comparator(&req_best_effort_then_prio, 2)),
req_best_effort_then_prio.clone(),
)
@@ -117,13 +115,13 @@ fn candidate_is_only_dequeued_once() {
// Insert first as prio:
queue
.queue(
.queue_with_comparator(
Some(make_dummy_comparator(&req_prio_then_best_effort, 3)),
req_prio_then_best_effort.clone(),
)
.unwrap();
// Then as best effort:
queue.queue(None, req_prio_then_best_effort.clone()).unwrap();
queue.queue_with_comparator(None, req_prio_then_best_effort.clone()).unwrap();
assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio));
assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort));
@@ -76,7 +76,9 @@ async fn participate_with_commitments_hash(
let req = ParticipationRequest::new(candidate_receipt, session, n_validators);
participation.queue_participation(ctx, None, req).await
participation
.queue_participation(ctx, ParticipationPriority::BestEffort, req)
.await
}
async fn activate_leaf(
@@ -0,0 +1,300 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{BTreeMap, HashSet};
use futures::channel::oneshot;
use lru::LruCache;
use polkadot_node_primitives::MAX_FINALITY_LAG;
use polkadot_node_subsystem::{
messages::ChainApiMessage, ActivatedLeaf, ActiveLeavesUpdate, ChainApiError, SubsystemSender,
};
use polkadot_node_subsystem_util::runtime::{get_candidate_events, get_on_chain_votes};
use polkadot_primitives::v2::{
BlockNumber, CandidateEvent, CandidateHash, Hash, ScrapedOnChainVotes,
};
use crate::{
error::{FatalError, FatalResult, Result},
LOG_TARGET,
};
#[cfg(test)]
mod tests;
/// Number of hashes to keep in the LRU.
///
///
/// When traversing the ancestry of a block we will stop once we hit a hash that we find in the
/// `last_observed_blocks` LRU. This means, this value should the very least be as large as the
/// number of expected forks for keeping chain scraping efficient. Making the LRU much larger than
/// that has very limited use.
const LRU_OBSERVED_BLOCKS_CAPACITY: usize = 20;
/// Chain scraper
///
/// Scrapes unfinalized chain in order to collect information from blocks.
///
/// Concretely:
///
/// - Monitors for inclusion events to keep track of candidates that have been included on chains.
/// - Calls `FetchOnChainVotes` for each block to gather potentially missed votes from chain.
///
/// With this information it provies a `CandidateComparator` and as a return value of
/// `process_active_leaves_update` any scraped votes.
pub struct ChainScraper {
/// All candidates we have seen included, which not yet have been finalized.
included_candidates: HashSet<CandidateHash>,
/// including block -> `CandidateHash`
///
/// We need this to clean up `included_candidates` on finalization.
candidates_by_block_number: BTreeMap<BlockNumber, HashSet<CandidateHash>>,
/// Latest relay blocks observed by the provider.
///
/// We assume that ancestors of cached blocks are already processed, i.e. we have saved
/// corresponding included candidates.
last_observed_blocks: LruCache<Hash, ()>,
}
impl ChainScraper {
/// Limits the number of ancestors received for a single request.
pub(crate) const ANCESTRY_CHUNK_SIZE: u32 = 10;
/// Limits the overall number of ancestors walked through for a given head.
///
/// As long as we have `MAX_FINALITY_LAG` this makes sense as a value.
pub(crate) const ANCESTRY_SIZE_LIMIT: u32 = MAX_FINALITY_LAG;
/// Create a properly initialized `OrderingProvider`.
///
/// Returns: `Self` and any scraped votes.
pub async fn new<Sender: SubsystemSender>(
sender: &mut Sender,
initial_head: ActivatedLeaf,
) -> Result<(Self, Vec<ScrapedOnChainVotes>)> {
let mut s = Self {
included_candidates: HashSet::new(),
candidates_by_block_number: BTreeMap::new(),
last_observed_blocks: LruCache::new(LRU_OBSERVED_BLOCKS_CAPACITY),
};
let update =
ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() };
let votes = s.process_active_leaves_update(sender, &update).await?;
Ok((s, votes))
}
/// Check whether we have seen a candidate included on any chain.
pub fn is_candidate_included(&mut self, candidate_hash: &CandidateHash) -> bool {
self.included_candidates.contains(candidate_hash)
}
/// Query active leaves for any candidate `CandidateEvent::CandidateIncluded` events.
///
/// and updates current heads, so we can query candidates for all non finalized blocks.
///
/// Returns: On chain vote for the leaf and any ancestors we might not yet have seen.
pub async fn process_active_leaves_update<Sender: SubsystemSender>(
&mut self,
sender: &mut Sender,
update: &ActiveLeavesUpdate,
) -> crate::error::Result<Vec<ScrapedOnChainVotes>> {
let activated = match update.activated.as_ref() {
Some(activated) => activated,
None => return Ok(Vec::new()),
};
// Fetch ancestry up to last finalized block.
let ancestors = self
.get_unfinalized_block_ancestors(sender, activated.hash, activated.number)
.await?;
// Ancestors block numbers are consecutive in the descending order.
let earliest_block_number = activated.number - ancestors.len() as u32;
let block_numbers = (earliest_block_number..=activated.number).rev();
let block_hashes = std::iter::once(activated.hash).chain(ancestors);
let mut on_chain_votes = Vec::new();
for (block_number, block_hash) in block_numbers.zip(block_hashes) {
gum::trace!(?block_number, ?block_hash, "In ancestor processesing.");
self.process_candidate_events(sender, block_number, block_hash).await?;
if let Some(votes) = get_on_chain_votes(sender, block_hash).await? {
on_chain_votes.push(votes);
}
}
self.last_observed_blocks.put(activated.hash, ());
Ok(on_chain_votes)
}
/// Prune finalized candidates.
///
/// Once a candidate lives in a relay chain block that's behind the finalized chain/got
/// finalized, we can treat it as low priority.
pub fn process_finalized_block(&mut self, finalized: &BlockNumber) {
let not_finalized = self.candidates_by_block_number.split_off(finalized);
let finalized = std::mem::take(&mut self.candidates_by_block_number);
self.candidates_by_block_number = not_finalized;
// Clean up finalized:
for finalized_candidate in finalized.into_values().flatten() {
self.included_candidates.remove(&finalized_candidate);
}
}
/// Process candidate events of a block.
///
/// Keep track of all included candidates.
async fn process_candidate_events(
&mut self,
sender: &mut impl SubsystemSender,
block_number: BlockNumber,
block_hash: Hash,
) -> Result<()> {
// Get included events:
let included =
get_candidate_events(sender, block_hash)
.await?
.into_iter()
.filter_map(|ev| match ev {
CandidateEvent::CandidateIncluded(receipt, _, _, _) => Some(receipt),
_ => None,
});
for receipt in included {
let candidate_hash = receipt.hash();
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
?block_number,
"Processing included event"
);
self.included_candidates.insert(candidate_hash);
self.candidates_by_block_number
.entry(block_number)
.or_default()
.insert(candidate_hash);
}
Ok(())
}
/// Returns ancestors of `head` in the descending order, stopping
/// either at the block present in cache or at the last finalized block.
///
/// Both `head` and the latest finalized block are **not** included in the result.
async fn get_unfinalized_block_ancestors<Sender: SubsystemSender>(
&mut self,
sender: &mut Sender,
mut head: Hash,
mut head_number: BlockNumber,
) -> Result<Vec<Hash>> {
let target_ancestor = get_finalized_block_number(sender).await?;
let mut ancestors = Vec::new();
// If head_number <= target_ancestor + 1 the ancestry will be empty.
if self.last_observed_blocks.get(&head).is_some() || head_number <= target_ancestor + 1 {
return Ok(ancestors)
}
loop {
let hashes = get_block_ancestors(sender, head, Self::ANCESTRY_CHUNK_SIZE).await?;
let earliest_block_number = match head_number.checked_sub(hashes.len() as u32) {
Some(number) => number,
None => {
// It's assumed that it's impossible to retrieve
// more than N ancestors for block number N.
gum::error!(
target: LOG_TARGET,
"Received {} ancestors for block number {} from Chain API",
hashes.len(),
head_number,
);
return Ok(ancestors)
},
};
// The reversed order is parent, grandparent, etc. excluding the head.
let block_numbers = (earliest_block_number..head_number).rev();
for (block_number, hash) in block_numbers.zip(&hashes) {
// Return if we either met target/cached block or
// hit the size limit for the returned ancestry of head.
if self.last_observed_blocks.get(hash).is_some() ||
block_number <= target_ancestor ||
ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT as usize
{
return Ok(ancestors)
}
ancestors.push(*hash);
}
match hashes.last() {
Some(last_hash) => {
head = *last_hash;
head_number = earliest_block_number;
},
None => break,
}
}
return Ok(ancestors)
}
}
async fn get_finalized_block_number(sender: &mut impl SubsystemSender) -> FatalResult<BlockNumber> {
let (number_tx, number_rx) = oneshot::channel();
send_message_fatal(sender, ChainApiMessage::FinalizedBlockNumber(number_tx), number_rx).await
}
async fn get_block_ancestors(
sender: &mut impl SubsystemSender,
head: Hash,
num_ancestors: BlockNumber,
) -> FatalResult<Vec<Hash>> {
let (tx, rx) = oneshot::channel();
sender
.send_message(
ChainApiMessage::Ancestors {
hash: head,
k: num_ancestors as usize,
response_channel: tx,
}
.into(),
)
.await;
rx.await
.or(Err(FatalError::ChainApiSenderDropped))?
.map_err(FatalError::ChainApiAncestors)
}
async fn send_message_fatal<Sender, Response>(
sender: &mut Sender,
message: ChainApiMessage,
receiver: oneshot::Receiver<std::result::Result<Response, ChainApiError>>,
) -> FatalResult<Response>
where
Sender: SubsystemSender,
{
sender.send_message(message.into()).await;
receiver
.await
.map_err(|_| FatalError::ChainApiSenderDropped)?
.map_err(FatalError::ChainApiAncestors)
}
@@ -40,7 +40,9 @@ use polkadot_primitives::v2::{
GroupIndex, Hash, HashT, HeadData,
};
use super::OrderingProvider;
use crate::LOG_TARGET;
use super::ChainScraper;
type VirtualOverseer = TestSubsystemContextHandle<DisputeCoordinatorMessage>;
@@ -56,30 +58,32 @@ async fn overseer_recv(virtual_overseer: &mut VirtualOverseer) -> AllMessages {
struct TestState {
chain: Vec<Hash>,
ordering: OrderingProvider,
scraper: ChainScraper,
ctx: TestSubsystemContext<DisputeCoordinatorMessage, TaskExecutor>,
}
impl TestState {
async fn new() -> (Self, VirtualOverseer) {
let (mut ctx, mut ctx_handle) = make_subsystem_context(TaskExecutor::new());
let leaf = get_activated_leaf(0);
let chain = vec![get_block_number_hash(0)];
let chain = vec![get_block_number_hash(0), get_block_number_hash(1)];
let leaf = get_activated_leaf(1);
let finalized_block_number = 0;
let overseer_fut = async {
assert_finalized_block_number_request(&mut ctx_handle, finalized_block_number).await;
// No requests for ancestors since the block is already finalized.
gum::trace!(target: LOG_TARGET, "After assert_finalized_block_number");
// No ancestors requests, as list would be empty.
assert_candidate_events_request(&mut ctx_handle, &chain).await;
assert_chain_vote_request(&mut ctx_handle, &chain).await;
};
let ordering_provider =
join(OrderingProvider::new(ctx.sender(), leaf.clone()), overseer_fut)
.await
.0
.unwrap();
let (scraper, _) = join(ChainScraper::new(ctx.sender(), leaf.clone()), overseer_fut)
.await
.0
.unwrap();
gum::trace!(target: LOG_TARGET, "After launching chain scraper");
let test_state = Self { chain, ordering: ordering_provider, ctx };
let test_state = Self { chain, scraper, ctx };
(test_state, ctx_handle)
}
@@ -99,10 +103,10 @@ fn next_leaf(chain: &mut Vec<Hash>) -> ActivatedLeaf {
async fn process_active_leaves_update(
sender: &mut TestSubsystemSender,
ordering: &mut OrderingProvider,
scraper: &mut ChainScraper,
update: ActivatedLeaf,
) {
ordering
scraper
.process_active_leaves_update(sender, &ActiveLeavesUpdate::start_work(update))
.await
.unwrap();
@@ -166,13 +170,14 @@ async fn assert_candidate_events_request(virtual_overseer: &mut VirtualOverseer,
);
}
async fn assert_block_number_request(virtual_overseer: &mut VirtualOverseer, chain: &[Hash]) {
async fn assert_chain_vote_request(virtual_overseer: &mut VirtualOverseer, _chain: &[Hash]) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(relay_parent, tx)) => {
let maybe_block_number =
chain.iter().position(|hash| *hash == relay_parent).map(|number| number as u32);
tx.send(Ok(maybe_block_number)).unwrap();
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_hash,
RuntimeApiRequest::FetchOnChainVotes(tx),
)) => {
tx.send(Ok(None)).unwrap();
}
);
}
@@ -211,27 +216,29 @@ async fn overseer_process_active_leaves_update(
// Before walking through ancestors provider requests latest finalized block number.
assert_finalized_block_number_request(virtual_overseer, finalized_block).await;
// Expect block ancestors requests with respect to the ancestry step.
for _ in (0..expected_ancestry_len).step_by(OrderingProvider::ANCESTRY_CHUNK_SIZE) {
for _ in (0..expected_ancestry_len).step_by(ChainScraper::ANCESTRY_CHUNK_SIZE as usize) {
assert_block_ancestors_request(virtual_overseer, chain).await;
}
// For each ancestry and the head return corresponding candidates inclusions.
for _ in 0..expected_ancestry_len {
assert_candidate_events_request(virtual_overseer, chain).await;
assert_chain_vote_request(virtual_overseer, chain).await;
}
}
#[test]
fn ordering_provider_provides_ordering_when_initialized() {
let candidate = make_candidate_receipt(get_block_number_hash(1));
fn scraper_provides_included_state_when_initialized() {
let candidate_1 = make_candidate_receipt(get_block_number_hash(1));
let candidate_2 = make_candidate_receipt(get_block_number_hash(2));
futures::executor::block_on(async {
let (state, mut virtual_overseer) = TestState::new().await;
let TestState { mut chain, mut ordering, mut ctx } = state;
let TestState { mut chain, mut scraper, mut ctx } = state;
let r = ordering.candidate_comparator(ctx.sender(), &candidate).await.unwrap();
assert_matches!(r, None);
assert!(!scraper.is_candidate_included(&candidate_2.hash()));
assert!(scraper.is_candidate_included(&candidate_1.hash()));
// After next active leaves update we should have a comparator:
// After next active leaves update we should see the candidate included.
let next_update = next_leaf(&mut chain);
let finalized_block_number = 0;
@@ -242,30 +249,22 @@ fn ordering_provider_provides_ordering_when_initialized() {
finalized_block_number,
expected_ancestry_len,
);
join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
join(process_active_leaves_update(ctx.sender(), &mut scraper, next_update), overseer_fut)
.await;
let r = join(
ordering.candidate_comparator(ctx.sender(), &candidate),
assert_block_number_request(&mut virtual_overseer, &chain),
)
.await
.0;
assert_matches!(r, Ok(Some(r2)) => {
assert_eq!(r2.relay_parent_block_number, 1);
});
assert!(scraper.is_candidate_included(&candidate_2.hash()));
});
}
#[test]
fn ordering_provider_requests_candidates_of_leaf_ancestors() {
fn scraper_requests_candidates_of_leaf_ancestors() {
futures::executor::block_on(async {
// How many blocks should we skip before sending a leaf update.
const BLOCKS_TO_SKIP: usize = 30;
let (state, mut virtual_overseer) = TestState::new().await;
let TestState { mut chain, mut ordering, mut ctx } = state;
let TestState { mut chain, mut scraper, mut ctx } = state;
let next_update = (0..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap();
@@ -276,34 +275,26 @@ fn ordering_provider_requests_candidates_of_leaf_ancestors() {
finalized_block_number,
BLOCKS_TO_SKIP,
);
join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
join(process_active_leaves_update(ctx.sender(), &mut scraper, next_update), overseer_fut)
.await;
let next_block_number = next_block_number(&chain);
for block_number in 1..next_block_number {
let candidate = make_candidate_receipt(get_block_number_hash(block_number));
let r = join(
ordering.candidate_comparator(ctx.sender(), &candidate),
assert_block_number_request(&mut virtual_overseer, &chain),
)
.await
.0;
assert_matches!(r, Ok(Some(r2)) => {
assert_eq!(r2.relay_parent_block_number, block_number);
});
assert!(scraper.is_candidate_included(&candidate.hash()));
}
});
}
#[test]
fn ordering_provider_requests_candidates_of_non_cached_ancestors() {
fn scraper_requests_candidates_of_non_cached_ancestors() {
futures::executor::block_on(async {
// How many blocks should we skip before sending a leaf update.
const BLOCKS_TO_SKIP: &[usize] = &[30, 15];
let (state, mut virtual_overseer) = TestState::new().await;
let TestState { mut chain, mut ordering, mut ctx } = state;
let TestState { mut chain, scraper: mut ordering, mut ctx } = state;
let next_update = (0..BLOCKS_TO_SKIP[0]).map(|_| next_leaf(&mut chain)).last().unwrap();
@@ -331,16 +322,17 @@ fn ordering_provider_requests_candidates_of_non_cached_ancestors() {
}
#[test]
fn ordering_provider_requests_candidates_of_non_finalized_ancestors() {
fn scraper_requests_candidates_of_non_finalized_ancestors() {
futures::executor::block_on(async {
// How many blocks should we skip before sending a leaf update.
const BLOCKS_TO_SKIP: usize = 30;
let (state, mut virtual_overseer) = TestState::new().await;
let TestState { mut chain, mut ordering, mut ctx } = state;
let TestState { mut chain, scraper: mut ordering, mut ctx } = state;
let next_update = (0..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap();
// 1 because `TestState` starts at leaf 1.
let next_update = (1..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap();
let finalized_block_number = 17;
let overseer_fut = overseer_process_active_leaves_update(
@@ -29,7 +29,6 @@ use futures::{
future::{self, BoxFuture},
};
use parity_scale_codec::Encode;
use polkadot_node_subsystem_util::database::Database;
use polkadot_node_primitives::{SignedDisputeStatement, SignedFullStatement, Statement};
@@ -39,7 +38,7 @@ use polkadot_node_subsystem::{
ImportStatementsResult,
},
overseer::FromOverseer,
ChainApiError, OverseerSignal,
OverseerSignal,
};
use polkadot_node_subsystem_util::TimeoutExt;
use sc_keystore::LocalKeystore;
@@ -55,9 +54,9 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystemContextHandle};
use polkadot_primitives::v2::{
BlakeTwo256, BlockNumber, CandidateCommitments, CandidateHash, CandidateReceipt, Hash, HashT,
Header, MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SessionInfo,
SigningContext, ValidatorId, ValidatorIndex,
BlockNumber, CandidateCommitments, CandidateHash, CandidateReceipt, Hash, Header,
MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SessionInfo, SigningContext,
ValidatorId, ValidatorIndex,
};
use crate::{
@@ -87,12 +86,18 @@ fn make_keystore(seeds: impl Iterator<Item = String>) -> LocalKeystore {
store
}
fn session_to_hash(session: SessionIndex, extra: impl Encode) -> Hash {
BlakeTwo256::hash_of(&(session, extra))
}
type VirtualOverseer = TestSubsystemContextHandle<DisputeCoordinatorMessage>;
const OVERSEER_RECEIVE_TIMEOUT: Duration = Duration::from_secs(2);
async fn overseer_recv(virtual_overseer: &mut VirtualOverseer) -> AllMessages {
virtual_overseer
.recv()
.timeout(OVERSEER_RECEIVE_TIMEOUT)
.await
.expect("overseer `recv` timed out")
}
#[derive(Clone)]
struct MockClock {
time: Arc<AtomicU64>,
@@ -126,6 +131,9 @@ struct TestState {
config: Config,
clock: MockClock,
headers: HashMap<Hash, Header>,
last_block: Hash,
// last session the subsystem knows about.
known_session: Option<SessionIndex>,
}
impl Default for TestState {
@@ -168,6 +176,18 @@ impl Default for TestState {
let db = Arc::new(db);
let config = Config { col_data: 0 };
let genesis_header = Header {
parent_hash: Hash::zero(),
number: 0,
digest: dummy_digest(),
state_root: dummy_hash(),
extrinsics_root: dummy_hash(),
};
let last_block = genesis_header.hash();
let mut headers = HashMap::new();
let _ = headers.insert(last_block, genesis_header.clone());
TestState {
validators: validators.into_iter().map(|(pair, _)| pair).collect(),
validator_public,
@@ -177,7 +197,9 @@ impl Default for TestState {
db,
config,
clock: MockClock::default(),
headers: HashMap::new(),
headers,
last_block,
known_session: None,
}
}
}
@@ -191,9 +213,8 @@ impl TestState {
) {
assert!(block_number > 0);
let parent_hash = session_to_hash(session, b"parent");
let block_header = Header {
parent_hash,
parent_hash: self.last_block,
number: block_number,
digest: dummy_digest(),
state_root: dummy_hash(),
@@ -202,7 +223,9 @@ impl TestState {
let block_hash = block_header.hash();
let _ = self.headers.insert(block_hash, block_header.clone());
self.last_block = block_hash;
gum::debug!(?block_number, "Activating block in activate_leaf_at_session.");
virtual_overseer
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
@@ -218,84 +241,113 @@ impl TestState {
}
async fn handle_sync_queries(
&self,
&mut self,
virtual_overseer: &mut VirtualOverseer,
block_hash: Hash,
session: SessionIndex,
) {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(h, block_hash);
let _ = tx.send(Ok(session));
}
);
loop {
// answer session info queries until the current session is reached.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(session_index, tx),
)) => {
assert_eq!(h, block_hash);
let _ = tx.send(Ok(Some(self.session_info())));
if session_index == session { break }
}
)
// Order of messages is not fixed (different on initializing):
struct FinishedSteps {
got_session_information: bool,
got_scraping_information: bool,
}
// Since the test harness sends active leaves update for each block
// consecutively, walking back for ancestors is not necessary. Sending
// an error to the subsystem will force-skip this procedure, the ordering
// provider will only request for candidates included in the leaf.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
tx
)) => {
tx.send(Err(ChainApiError::from(""))).unwrap();
impl FinishedSteps {
fn new() -> Self {
Self { got_session_information: false, got_scraping_information: false }
}
);
fn is_done(&self) -> bool {
self.got_session_information && self.got_scraping_information
}
}
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
tx.send(Ok(Vec::new())).unwrap();
}
);
let mut finished_steps = FinishedSteps::new();
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::FetchOnChainVotes(tx),
)) => {
//add some `BackedCandidates` or resolved disputes here as needed
tx.send(Ok(Some(ScrapedOnChainVotes {
session,
backing_validators_per_candidate: Vec::default(),
disputes: MultiDisputeStatementSet::default(),
}))).unwrap();
while !finished_steps.is_done() {
match overseer_recv(virtual_overseer).await {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert!(
!finished_steps.got_session_information,
"session infos already retrieved"
);
finished_steps.got_session_information = true;
assert_eq!(h, block_hash);
let _ = tx.send(Ok(session));
// No queries, if subsystem knows about this session already.
if self.known_session == Some(session) {
continue
}
self.known_session = Some(session);
loop {
// answer session info queries until the current session is reached.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(session_index, tx),
)) => {
assert_eq!(h, block_hash);
let _ = tx.send(Ok(Some(self.session_info())));
if session_index == session { break }
}
);
}
},
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(tx)) => {
assert!(
!finished_steps.got_scraping_information,
"Scraping info was already retrieved!"
);
finished_steps.got_scraping_information = true;
tx.send(Ok(0)).unwrap();
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
tx.send(Ok(Vec::new())).unwrap();
}
);
gum::info!("After answering runtime api request");
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::FetchOnChainVotes(tx),
)) => {
//add some `BackedCandidates` or resolved disputes here as needed
tx.send(Ok(Some(ScrapedOnChainVotes {
session,
backing_validators_per_candidate: Vec::default(),
disputes: MultiDisputeStatementSet::default(),
}))).unwrap();
}
);
gum::info!("After answering runtime api request (votes)");
},
msg => {
panic!("Received unexpected message in `handle_sync_queries`: {:?}", msg);
},
}
)
}
}
async fn handle_resume_sync(
&self,
&mut self,
virtual_overseer: &mut VirtualOverseer,
session: SessionIndex,
) {
let leaves: Vec<Hash> = self.headers.keys().cloned().collect();
for (n, leaf) in leaves.iter().enumerate() {
gum::debug!(
block_number= ?n,
"Activating block in handle resume sync."
);
virtual_overseer
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
@@ -376,10 +428,11 @@ impl TestState {
SignedDisputeStatement::from_backing_statement(&statement, context, validator_id).unwrap()
}
fn resume<F>(self, test: F) -> Self
fn resume<F>(mut self, test: F) -> Self
where
F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState>,
{
self.known_session = None;
let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
let subsystem = DisputeCoordinatorSubsystem::new(
self.db.clone(),
@@ -411,7 +464,7 @@ async fn participation_with_distribution(
) {
participation_full_happy_path(virtual_overseer, expected_commitments_hash).await;
assert_matches!(
virtual_overseer.recv().await,
overseer_recv(virtual_overseer).await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::SendDispute(msg)
) => {
@@ -459,7 +512,6 @@ fn too_many_unconfirmed_statements_are_considered_spam() {
.issue_explicit_statement_with_index(1, candidate_hash1, session, false)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -470,7 +522,7 @@ fn too_many_unconfirmed_statements_are_considered_spam() {
(valid_vote1, ValidatorIndex(3)),
(invalid_vote1, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -514,7 +566,7 @@ fn too_many_unconfirmed_statements_are_considered_spam() {
(valid_vote2, ValidatorIndex(3)),
(invalid_vote2, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -577,7 +629,6 @@ fn dispute_gets_confirmed_via_participation() {
.issue_explicit_statement_with_index(1, candidate_hash1, session, false)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -588,7 +639,7 @@ fn dispute_gets_confirmed_via_participation() {
(valid_vote1, ValidatorIndex(3)),
(invalid_vote1, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -636,7 +687,7 @@ fn dispute_gets_confirmed_via_participation() {
(valid_vote2, ValidatorIndex(3)),
(invalid_vote2, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -711,7 +762,6 @@ fn dispute_gets_confirmed_at_byzantine_threshold() {
.issue_explicit_statement_with_index(1, candidate_hash1, session, false)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -724,7 +774,7 @@ fn dispute_gets_confirmed_at_byzantine_threshold() {
(valid_vote1a, ValidatorIndex(4)),
(invalid_vote1a, ValidatorIndex(5)),
],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -767,7 +817,7 @@ fn dispute_gets_confirmed_at_byzantine_threshold() {
(valid_vote2, ValidatorIndex(3)),
(invalid_vote2, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -834,7 +884,7 @@ fn backing_statements_import_works_and_no_spam() {
(valid_vote1, ValidatorIndex(3)),
(valid_vote2, ValidatorIndex(4)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -888,7 +938,7 @@ fn backing_statements_import_works_and_no_spam() {
(valid_vote1, ValidatorIndex(3)),
(valid_vote2, ValidatorIndex(4)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -931,7 +981,6 @@ fn conflicting_votes_lead_to_dispute_participation() {
.issue_explicit_statement_with_index(2, candidate_hash, session, false)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -942,7 +991,7 @@ fn conflicting_votes_lead_to_dispute_participation() {
(valid_vote, ValidatorIndex(3)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -979,7 +1028,6 @@ fn conflicting_votes_lead_to_dispute_participation() {
assert_eq!(votes.invalid.len(), 1);
}
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -987,7 +1035,7 @@ fn conflicting_votes_lead_to_dispute_participation() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![(invalid_vote_2, ValidatorIndex(2))],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1039,7 +1087,6 @@ fn positive_votes_dont_trigger_participation() {
.issue_explicit_statement_with_index(1, candidate_hash, session, true)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1047,7 +1094,7 @@ fn positive_votes_dont_trigger_participation() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![(valid_vote, ValidatorIndex(2))],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1077,7 +1124,6 @@ fn positive_votes_dont_trigger_participation() {
assert!(votes.invalid.is_empty());
}
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1085,7 +1131,7 @@ fn positive_votes_dont_trigger_participation() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![(valid_vote_2, ValidatorIndex(1))],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1146,7 +1192,6 @@ fn wrong_validator_index_is_ignored() {
.issue_explicit_statement_with_index(1, candidate_hash, session, false)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1157,7 +1202,7 @@ fn wrong_validator_index_is_ignored() {
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1218,7 +1263,6 @@ fn finality_votes_ignore_disputed_candidates() {
.issue_explicit_statement_with_index(1, candidate_hash, session, false)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1229,7 +1273,7 @@ fn finality_votes_ignore_disputed_candidates() {
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1321,7 +1365,6 @@ fn supermajority_valid_dispute_may_be_finalized() {
.issue_explicit_statement_with_index(1, candidate_hash, session, false)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1332,7 +1375,7 @@ fn supermajority_valid_dispute_may_be_finalized() {
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1353,7 +1396,6 @@ fn supermajority_valid_dispute_may_be_finalized() {
statements.push((vote, ValidatorIndex(i as _)));
}
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1361,7 +1403,7 @@ fn supermajority_valid_dispute_may_be_finalized() {
candidate_receipt: candidate_receipt.clone(),
session,
statements,
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1446,7 +1488,6 @@ fn concluded_supermajority_for_non_active_after_time() {
.issue_explicit_statement_with_index(1, candidate_hash, session, false)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1457,7 +1498,7 @@ fn concluded_supermajority_for_non_active_after_time() {
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1479,7 +1520,6 @@ fn concluded_supermajority_for_non_active_after_time() {
statements.push((vote, ValidatorIndex(i as _)));
}
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1487,7 +1527,7 @@ fn concluded_supermajority_for_non_active_after_time() {
candidate_receipt: candidate_receipt.clone(),
session,
statements,
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1560,7 +1600,7 @@ fn concluded_supermajority_against_non_active_after_time() {
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -1586,7 +1626,6 @@ fn concluded_supermajority_against_non_active_after_time() {
statements.push((vote, ValidatorIndex(i as _)));
}
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1594,7 +1633,7 @@ fn concluded_supermajority_against_non_active_after_time() {
candidate_receipt: candidate_receipt.clone(),
session,
statements,
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1665,7 +1704,7 @@ fn resume_dispute_without_local_statement() {
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -1695,7 +1734,7 @@ fn resume_dispute_without_local_statement() {
})
// Alice should send a DisputeParticiationMessage::Participate on restart since she has no
// local statement for the active dispute.
.resume(|test_state, mut virtual_overseer| {
.resume(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
@@ -1728,7 +1767,6 @@ fn resume_dispute_without_local_statement() {
.issue_explicit_statement_with_index(7, candidate_hash, session, true)
.await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
@@ -1743,7 +1781,7 @@ fn resume_dispute_without_local_statement() {
(valid_vote6, ValidatorIndex(6)),
(valid_vote7, ValidatorIndex(7)),
],
pending_confirmation,
pending_confirmation: None,
},
})
.await;
@@ -1809,7 +1847,7 @@ fn resume_dispute_with_local_statement() {
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -1836,7 +1874,7 @@ fn resume_dispute_with_local_statement() {
})
// Alice should not send a DisputeParticiationMessage::Participate on restart since she has a
// local statement for the active dispute.
.resume(|test_state, mut virtual_overseer| {
.resume(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
@@ -1886,7 +1924,7 @@ fn resume_dispute_without_local_statement_or_local_key() {
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -1916,7 +1954,7 @@ fn resume_dispute_without_local_statement_or_local_key() {
})
// Two should not send a DisputeParticiationMessage::Participate on restart since she is no
// validator in that dispute.
.resume(|test_state, mut virtual_overseer| {
.resume(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
@@ -1969,7 +2007,7 @@ fn resume_dispute_with_local_statement_without_local_key() {
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -1999,7 +2037,7 @@ fn resume_dispute_with_local_statement_without_local_key() {
make_keystore(vec![Sr25519Keyring::Two.to_seed()].into_iter()).into();
// Two should not send a DisputeParticiationMessage::Participate on restart since we gave
// her a non existing key.
test_state.resume(|test_state, mut virtual_overseer| {
test_state.resume(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
@@ -2049,7 +2087,7 @@ fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![(other_vote, ValidatorIndex(1))],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
})
.await;
@@ -2070,7 +2108,7 @@ fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation
// Dispute distribution should get notified now:
assert_matches!(
virtual_overseer.recv().await,
overseer_recv(&mut virtual_overseer).await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::SendDispute(msg)
) => {
@@ -2155,7 +2193,7 @@ fn empty_import_still_writes_candidate_receipt() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: Vec::new(),
pending_confirmation: tx,
pending_confirmation: Some(tx),
},
})
.await;
@@ -2206,7 +2244,7 @@ fn redundant_votes_ignored() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![(valid_vote.clone(), ValidatorIndex(1))],
pending_confirmation: tx,
pending_confirmation: Some(tx),
},
})
.await;
@@ -2221,7 +2259,7 @@ fn redundant_votes_ignored() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![(valid_vote_2, ValidatorIndex(1))],
pending_confirmation: tx,
pending_confirmation: Some(tx),
},
})
.await;
@@ -271,7 +271,7 @@ where
candidate_receipt,
session: valid_vote.0.session_index(),
statements: vec![valid_vote, invalid_vote],
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
},
))
.await;
@@ -526,7 +526,7 @@ async fn nested_network_dispute_request<'a, F, O>(
candidate_receipt,
session,
statements,
pending_confirmation,
pending_confirmation: Some(pending_confirmation),
}
) => {
assert_eq!(session, MOCK_SESSION_INDEX);
@@ -254,7 +254,7 @@ pub enum DisputeCoordinatorMessage {
/// The validator index passed alongside each statement should correspond to the index
/// of the validator in the set.
statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
/// Inform the requester once we finished importing.
/// Inform the requester once we finished importing (if a sender was provided).
///
/// This is:
/// - we discarded the votes because
@@ -268,7 +268,7 @@ pub enum DisputeCoordinatorMessage {
/// - or other explicit votes on that candidate already recorded
/// - or recovered availability for the candidate
/// - or the imported statements are backing/approval votes, which are always accepted.
pending_confirmation: oneshot::Sender<ImportStatementsResult>,
pending_confirmation: Option<oneshot::Sender<ImportStatementsResult>>,
},
/// Fetch a list of all recent disputes the co-ordinator is aware of.
/// These are disputes which have occurred any time in recent sessions,
+4 -2
View File
@@ -52,8 +52,9 @@ use pin_project::pin_project;
use polkadot_primitives::v2::{
AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs,
GroupIndex, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, SessionIndex, SessionInfo, Signed, SigningContext, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
PersistedValidationData, ScrapedOnChainVotes, SessionIndex, SessionInfo, Signed,
SigningContext, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
ValidatorSignature,
};
pub use rand;
use sp_application_crypto::AppKey;
@@ -217,6 +218,7 @@ specialize_requests! {
fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
-> Option<ValidationCodeHash>; ValidationCodeHash;
fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
}
/// From the given set of validators, find the first key we can sign with, if any.
@@ -28,13 +28,14 @@ use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_node_subsystem::{SubsystemContext, SubsystemSender};
use polkadot_primitives::v2::{
CandidateEvent, CoreState, EncodeAs, GroupIndex, GroupRotationInfo, Hash, OccupiedCore,
SessionIndex, SessionInfo, Signed, SigningContext, UncheckedSigned, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex,
ScrapedOnChainVotes, SessionIndex, SessionInfo, Signed, SigningContext, UncheckedSigned,
ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
};
use crate::{
request_availability_cores, request_candidate_events, request_session_index_for_child,
request_session_info, request_validation_code_by_hash, request_validator_groups,
request_availability_cores, request_candidate_events, request_on_chain_votes,
request_session_index_for_child, request_session_info, request_validation_code_by_hash,
request_validator_groups,
};
/// Errors that can happen on runtime fetches.
@@ -314,6 +315,17 @@ where
recv_runtime(request_candidate_events(relay_parent, sender).await).await
}
/// Get on chain votes.
pub async fn get_on_chain_votes<Sender>(
sender: &mut Sender,
relay_parent: Hash,
) -> Result<Option<ScrapedOnChainVotes>>
where
Sender: SubsystemSender,
{
recv_runtime(request_on_chain_votes(relay_parent, sender).await).await
}
/// Fetch `ValidationCode` by hash from the runtime.
pub async fn get_validation_code_by_hash<Sender>(
sender: &mut Sender,