Improved dispute votes import in provisioner (#5567)

* Add `DisputeState` to `DisputeCoordinatorMessage::RecentDisputes`

The new signature of the message is:
```
RecentDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash, DisputeStatus)>>),
```

As part of the change also add `DispiteStatus` to
`polkadot_node_primitives`.

* Move dummy_signature() in primitives/test-helpers

* Enable staging runtime api on Rococo

* Implementation

* Move disputes to separate module
* Vote prioritisation
* Duplicates handling
* Double vote handling
* Unit tests
* Logs and metrics
* Code review feedback
* Fix ACTIVE/INACTIVE separation and update partition names
* Add `fn dispute_is_inactive` to node primitives and refactor `fn get_active_with_status()` logic
* Keep the 'old' logic if the staging api is not enabled
* Fix some comments in tests
* Add warning message if there are any inactive_unknown_onchain disputes
* Add file headers and remove `use super::*;` usage outside tests
* Adding doc comments
* Fix test methods names

* Fix staging api usage

* Fix `get_disputes` runtime function implementation

* Fix compilation error

* Fix arithmetic operations in tests

* Use smaller test data

* Rename `RuntimeApiRequest::StagingDisputes` to `RuntimeApiRequest::Disputes`

* Remove `staging-client` feature flag

* fmt

* Remove `vstaging` feature flag

* Some comments regarding the staging api

* Rename dispute selection modules in provisioner
with_staging_api -> prioritized_selection
without_staging_api -> random_selection

* Comments for staging api

* Comments

* Additional logging

* Code review feedback

process_selected_disputes -> into_multi_dispute_statement_set
typo
In trait VoteType: vote_value -> is_valid

* Code review feedback

* Fix metrics

* get_disputes -> disputes

* Get time only once during partitioning

* Fix partitioning

* Comments

* Reduce the number of hardcoded api versions

* Code review feedback

* Unused import

* Comments

* More precise log messages

* Code review feedback

* Code review feedback

* Code review feedback - remove `trait VoteType`

* Code review feedback

* Trace log for DisputeCoordinatorMessage::QueryCandidateVotes counter in vote_selection
This commit is contained in:
Tsvetomir Dimitrov
2022-09-19 23:06:09 +03:00
committed by GitHub
parent bbb713521e
commit 6ae9720c36
43 changed files with 1860 additions and 975 deletions
@@ -0,0 +1,53 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The disputes module is responsible for selecting dispute votes to be sent with the inherent data. It contains two
//! different implementations, extracted in two separate modules - `random_selection` and `prioritized_selection`. Which
//! implementation will be executed depends on the version of the runtime. Runtime v2 supports `random_selection`. Runtime
//! v3 and above - `prioritized_selection`. The entrypoint to these implementations is the `select_disputes` function.
//! prioritized_selection` is considered superior and will be the default one in the future. Refer to the documentation of
//! the modules for more details about each implementation.
use crate::LOG_TARGET;
use futures::channel::oneshot;
use polkadot_node_primitives::CandidateVotes;
use polkadot_node_subsystem::{messages::DisputeCoordinatorMessage, overseer};
use polkadot_primitives::v2::{CandidateHash, SessionIndex};
/// Request the relevant dispute statements for a set of disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_votes(
sender: &mut impl overseer::ProvisionerSenderTrait,
disputes_to_query: Vec<(SessionIndex, CandidateHash)>,
) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> {
let (tx, rx) = oneshot::channel();
// Bounded by block production - `ProvisionerMessage::RequestInherentData`.
sender.send_unbounded_message(DisputeCoordinatorMessage::QueryCandidateVotes(
disputes_to_query,
tx,
));
match rx.await {
Ok(v) => v,
Err(oneshot::Canceled) => {
gum::warn!(target: LOG_TARGET, "Unable to query candidate votes");
Vec::new()
},
}
}
pub(crate) mod prioritized_selection;
pub(crate) mod random_selection;
@@ -0,0 +1,470 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! This module uses different approach for selecting dispute votes. It queries the Runtime
//! about the votes already known onchain and tries to select only relevant votes. Refer to
//! the documentation of `select_disputes` for more details about the actual implementation.
use crate::{error::GetOnchainDisputesError, metrics, LOG_TARGET};
use futures::channel::oneshot;
use polkadot_node_primitives::{dispute_is_inactive, CandidateVotes, DisputeStatus, Timestamp};
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest},
overseer, ActivatedLeaf,
};
use polkadot_primitives::v2::{
supermajority_threshold, CandidateHash, DisputeState, DisputeStatement, DisputeStatementSet,
Hash, MultiDisputeStatementSet, SessionIndex, ValidatorIndex,
};
use std::{
collections::{BTreeMap, HashMap},
time::{SystemTime, UNIX_EPOCH},
};
#[cfg(test)]
mod tests;
/// The maximum number of disputes Provisioner will include in the inherent data.
/// Serves as a protection not to flood the Runtime with excessive data.
#[cfg(not(test))]
pub const MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME: usize = 200_000;
#[cfg(test)]
pub const MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME: usize = 200;
/// Controls how much dispute votes to be fetched from the runtime per iteration in `fn vote_selection`.
/// The purpose is to fetch the votes in batches until `MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME` is
/// reached. This value should definitely be less than `MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME`.
///
/// The ratio `MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME` / `VOTES_SELECTION_BATCH_SIZE` gives an
/// approximation about how many runtime requests will be issued to fetch votes from the runtime in
/// a single `select_disputes` call. Ideally we don't want to make more than 2-3 calls. In practice
/// it's hard to predict this number because we can't guess how many new votes (for the runtime) a
/// batch will contain.
///
/// The value below is reached by: `MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME` / 2 + 10%
/// The 10% makes approximately means '10% new votes'. Tweak this if provisioner makes excessive
/// number of runtime calls.
#[cfg(not(test))]
const VOTES_SELECTION_BATCH_SIZE: usize = 1_100;
#[cfg(test)]
const VOTES_SELECTION_BATCH_SIZE: usize = 11; // Just a small value for tests. Doesn't follow the rules above
/// Implements the `select_disputes` function which selects dispute votes which should
/// be sent to the Runtime.
///
/// # How the prioritization works
///
/// Generally speaking disputes can be described as:
/// * Active vs Inactive
/// * Known vs Unknown onchain
/// * Offchain vs Onchain
/// * Concluded onchain vs Unconcluded onchain
///
/// Provisioner fetches all disputes from `dispute-coordinator` and separates them in multiple partitions.
/// Please refer to `struct PartitionedDisputes` for details about the actual partitions.
/// Each partition has got a priority implicitly assigned to it and the disputes are selected based on this
/// priority (e.g. disputes in partition 1, then if there is space - disputes from partition 2 and so on).
///
/// # Votes selection
///
/// Besides the prioritization described above the votes in each partition are filtered too. Provisioner
/// fetches all onchain votes and filters them out from all partitions. As a result the Runtime receives
/// only fresh votes (votes it didn't know about).
///
/// # How the onchain votes are fetched
///
/// The logic outlined above relies on `RuntimeApiRequest::Disputes` message from the Runtime. The user
/// check the Runtime version before calling `select_disputes`. If the function is used with old runtime
/// an error is logged and the logic will continue with empty onchain votes HashMap.
pub async fn select_disputes<Sender>(
sender: &mut Sender,
metrics: &metrics::Metrics,
leaf: &ActivatedLeaf,
) -> MultiDisputeStatementSet
where
Sender: overseer::ProvisionerSenderTrait,
{
gum::trace!(
target: LOG_TARGET,
?leaf,
"Selecting disputes for inherent data using prioritized selection"
);
// Fetch the onchain disputes. We'll do a prioritization based on them.
let onchain = match get_onchain_disputes(sender, leaf.hash.clone()).await {
Ok(r) => r,
Err(GetOnchainDisputesError::NotSupported(runtime_api_err, relay_parent)) => {
// Runtime version is checked before calling this method, so the error below should never happen!
gum::error!(
target: LOG_TARGET,
?runtime_api_err,
?relay_parent,
"Can't fetch onchain disputes, because ParachainHost runtime api version is old. Will continue with empty onchain disputes set.",
);
HashMap::new()
},
Err(GetOnchainDisputesError::Channel) => {
// This error usually means the node is shutting down. Log just in case.
gum::debug!(
target: LOG_TARGET,
"Channel error occurred while fetching onchain disputes. Will continue with empty onchain disputes set.",
);
HashMap::new()
},
Err(GetOnchainDisputesError::Execution(runtime_api_err, parent_hash)) => {
gum::warn!(
target: LOG_TARGET,
?runtime_api_err,
?parent_hash,
"Unexpected execution error occurred while fetching onchain votes. Will continue with empty onchain disputes set.",
);
HashMap::new()
},
};
let recent_disputes = request_disputes(sender).await;
gum::trace!(
target: LOG_TARGET,
?leaf,
"Got {} recent disputes and {} onchain disputes.",
recent_disputes.len(),
onchain.len(),
);
let partitioned = partition_recent_disputes(recent_disputes, &onchain);
metrics.on_partition_recent_disputes(&partitioned);
if partitioned.inactive_unknown_onchain.len() > 0 {
gum::warn!(
target: LOG_TARGET,
?leaf,
"Got {} inactive unknown onchain disputes. This should not happen!",
partitioned.inactive_unknown_onchain.len()
);
}
let result = vote_selection(sender, partitioned, &onchain).await;
make_multi_dispute_statement_set(metrics, result)
}
/// Selects dispute votes from `PartitionedDisputes` which should be sent to the runtime. Votes which
/// are already onchain are filtered out. Result should be sorted by `(SessionIndex, CandidateHash)`
/// which is enforced by the `BTreeMap`. This is a requirement from the runtime.
async fn vote_selection<Sender>(
sender: &mut Sender,
partitioned: PartitionedDisputes,
onchain: &HashMap<(SessionIndex, CandidateHash), DisputeState>,
) -> BTreeMap<(SessionIndex, CandidateHash), CandidateVotes>
where
Sender: overseer::ProvisionerSenderTrait,
{
// fetch in batches until there are enough votes
let mut disputes = partitioned.into_iter().collect::<Vec<_>>();
let mut total_votes_len = 0;
let mut result = BTreeMap::new();
let mut request_votes_counter = 0;
while !disputes.is_empty() {
let batch_size = std::cmp::min(VOTES_SELECTION_BATCH_SIZE, disputes.len());
let batch = Vec::from_iter(disputes.drain(0..batch_size));
// Filter votes which are already onchain
request_votes_counter += 1;
let votes = super::request_votes(sender, batch)
.await
.into_iter()
.map(|(session_index, candidate_hash, mut votes)| {
let onchain_state =
if let Some(onchain_state) = onchain.get(&(session_index, candidate_hash)) {
onchain_state
} else {
// onchain knows nothing about this dispute - add all votes
return (session_index, candidate_hash, votes)
};
votes.valid.retain(|validator_idx, (statement_kind, _)| {
is_vote_worth_to_keep(
validator_idx,
DisputeStatement::Valid(*statement_kind),
&onchain_state,
)
});
votes.invalid.retain(|validator_idx, (statement_kind, _)| {
is_vote_worth_to_keep(
validator_idx,
DisputeStatement::Invalid(*statement_kind),
&onchain_state,
)
});
(session_index, candidate_hash, votes)
})
.collect::<Vec<_>>();
// Check if votes are within the limit
for (session_index, candidate_hash, selected_votes) in votes {
let votes_len = selected_votes.valid.len() + selected_votes.invalid.len();
if votes_len + total_votes_len > MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME {
// we are done - no more votes can be added
return result
}
result.insert((session_index, candidate_hash), selected_votes);
total_votes_len += votes_len
}
}
gum::trace!(
target: LOG_TARGET,
?request_votes_counter,
"vote_selection DisputeCoordinatorMessage::QueryCandidateVotes counter",
);
result
}
/// Contains disputes by partitions. Check the field comments for further details.
#[derive(Default)]
pub(crate) struct PartitionedDisputes {
/// Concluded and inactive disputes which are completely unknown for the Runtime.
/// Hopefully this should never happen.
/// Will be sent to the Runtime with FIRST priority.
pub inactive_unknown_onchain: Vec<(SessionIndex, CandidateHash)>,
/// Disputes which are INACTIVE locally but they are unconcluded for the Runtime.
/// A dispute can have enough local vote to conclude and at the same time the
/// Runtime knows nothing about them at treats it as unconcluded. This discrepancy
/// should be treated with high priority.
/// Will be sent to the Runtime with SECOND priority.
pub inactive_unconcluded_onchain: Vec<(SessionIndex, CandidateHash)>,
/// Active disputes completely unknown onchain.
/// Will be sent to the Runtime with THIRD priority.
pub active_unknown_onchain: Vec<(SessionIndex, CandidateHash)>,
/// Active disputes unconcluded onchain.
/// Will be sent to the Runtime with FOURTH priority.
pub active_unconcluded_onchain: Vec<(SessionIndex, CandidateHash)>,
/// Active disputes concluded onchain. New votes are not that important for
/// this partition.
/// Will be sent to the Runtime with FIFTH priority.
pub active_concluded_onchain: Vec<(SessionIndex, CandidateHash)>,
/// Inactive disputes which has concluded onchain. These are not interesting and
/// won't be sent to the Runtime.
/// Will be DROPPED
pub inactive_concluded_onchain: Vec<(SessionIndex, CandidateHash)>,
}
impl PartitionedDisputes {
fn new() -> PartitionedDisputes {
Default::default()
}
fn into_iter(self) -> impl Iterator<Item = (SessionIndex, CandidateHash)> {
self.inactive_unknown_onchain
.into_iter()
.chain(self.inactive_unconcluded_onchain.into_iter())
.chain(self.active_unknown_onchain.into_iter())
.chain(self.active_unconcluded_onchain.into_iter())
.chain(self.active_concluded_onchain.into_iter())
// inactive_concluded_onchain is dropped on purpose
}
}
fn secs_since_epoch() -> Timestamp {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(e) => {
gum::warn!(
target: LOG_TARGET,
err = ?e,
"Error getting system time."
);
0
},
}
}
fn concluded_onchain(onchain_state: &DisputeState) -> bool {
// Check if there are enough onchain votes for or against to conclude the dispute
let supermajority = supermajority_threshold(onchain_state.validators_for.len());
onchain_state.validators_for.count_ones() >= supermajority ||
onchain_state.validators_against.count_ones() >= supermajority
}
fn partition_recent_disputes(
recent: Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
onchain: &HashMap<(SessionIndex, CandidateHash), DisputeState>,
) -> PartitionedDisputes {
let mut partitioned = PartitionedDisputes::new();
// Drop any duplicates
let unique_recent = recent
.into_iter()
.map(|(session_index, candidate_hash, dispute_state)| {
((session_index, candidate_hash), dispute_state)
})
.collect::<HashMap<_, _>>();
// Split recent disputes in ACTIVE and INACTIVE
let time_now = &secs_since_epoch();
let (active, inactive): (
Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
) = unique_recent
.into_iter()
.map(|((session_index, candidate_hash), dispute_state)| {
(session_index, candidate_hash, dispute_state)
})
.partition(|(_, _, status)| !dispute_is_inactive(status, time_now));
// Split ACTIVE in three groups...
for (session_index, candidate_hash, _) in active {
match onchain.get(&(session_index, candidate_hash)) {
Some(d) => match concluded_onchain(d) {
true => partitioned.active_concluded_onchain.push((session_index, candidate_hash)),
false =>
partitioned.active_unconcluded_onchain.push((session_index, candidate_hash)),
},
None => partitioned.active_unknown_onchain.push((session_index, candidate_hash)),
};
}
// ... and INACTIVE in three more
for (session_index, candidate_hash, _) in inactive {
match onchain.get(&(session_index, candidate_hash)) {
Some(onchain_state) =>
if concluded_onchain(onchain_state) {
partitioned.inactive_concluded_onchain.push((session_index, candidate_hash));
} else {
partitioned.inactive_unconcluded_onchain.push((session_index, candidate_hash));
},
None => partitioned.inactive_unknown_onchain.push((session_index, candidate_hash)),
}
}
partitioned
}
/// Determines if a vote is worth to be kept, based on the onchain disputes
fn is_vote_worth_to_keep(
validator_index: &ValidatorIndex,
dispute_statement: DisputeStatement,
onchain_state: &DisputeState,
) -> bool {
let offchain_vote = match dispute_statement {
DisputeStatement::Valid(_) => true,
DisputeStatement::Invalid(_) => false,
};
let in_validators_for = onchain_state
.validators_for
.get(validator_index.0 as usize)
.as_deref()
.copied()
.unwrap_or(false);
let in_validators_against = onchain_state
.validators_against
.get(validator_index.0 as usize)
.as_deref()
.copied()
.unwrap_or(false);
if in_validators_for && in_validators_against {
// The validator has double voted and runtime knows about this. Ignore this vote.
return false
}
if offchain_vote && in_validators_against || !offchain_vote && in_validators_for {
// offchain vote differs from the onchain vote
// we need this vote to punish the offending validator
return true
}
// The vote is valid. Return true if it is not seen onchain.
!in_validators_for && !in_validators_against
}
/// Request disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_disputes(
sender: &mut impl overseer::ProvisionerSenderTrait,
) -> Vec<(SessionIndex, CandidateHash, DisputeStatus)> {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::RecentDisputes(tx);
// Bounded by block production - `ProvisionerMessage::RequestInherentData`.
sender.send_unbounded_message(msg);
let recent_disputes = rx.await.unwrap_or_else(|err| {
gum::warn!(target: LOG_TARGET, err=?err, "Unable to gather recent disputes");
Vec::new()
});
recent_disputes
}
// This function produces the return value for `pub fn select_disputes()`
fn make_multi_dispute_statement_set(
metrics: &metrics::Metrics,
dispute_candidate_votes: BTreeMap<(SessionIndex, CandidateHash), CandidateVotes>,
) -> MultiDisputeStatementSet {
// Transform all `CandidateVotes` into `MultiDisputeStatementSet`.
dispute_candidate_votes
.into_iter()
.map(|((session_index, candidate_hash), votes)| {
let valid_statements = votes
.valid
.into_iter()
.map(|(i, (s, sig))| (DisputeStatement::Valid(s), i, sig));
let invalid_statements = votes
.invalid
.into_iter()
.map(|(i, (s, sig))| (DisputeStatement::Invalid(s), i, sig));
metrics.inc_valid_statements_by(valid_statements.len());
metrics.inc_invalid_statements_by(invalid_statements.len());
metrics.inc_dispute_statement_sets_by(1);
DisputeStatementSet {
candidate_hash,
session: session_index,
statements: valid_statements.chain(invalid_statements).collect(),
}
})
.collect()
}
/// Gets the on-chain disputes at a given block number and returns them as a `HashMap` so that searching in them is cheap.
pub async fn get_onchain_disputes<Sender>(
sender: &mut Sender,
relay_parent: Hash,
) -> Result<HashMap<(SessionIndex, CandidateHash), DisputeState>, GetOnchainDisputesError>
where
Sender: overseer::ProvisionerSenderTrait,
{
gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching on-chain disputes");
let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Disputes(tx)))
.await;
rx.await
.map_err(|_| GetOnchainDisputesError::Channel)
.and_then(|res| {
res.map_err(|e| match e {
RuntimeApiError::Execution { .. } =>
GetOnchainDisputesError::Execution(e, relay_parent),
RuntimeApiError::NotSupported { .. } =>
GetOnchainDisputesError::NotSupported(e, relay_parent),
})
})
.map(|v| v.into_iter().map(|e| ((e.0, e.1), e.2)).collect())
}
@@ -0,0 +1,722 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::super::{
super::{tests::common::test_harness, *},
prioritized_selection::*,
};
use bitvec::prelude::*;
use futures::channel::mpsc;
use polkadot_node_primitives::{CandidateVotes, DisputeStatus, ACTIVE_DURATION_SECS};
use polkadot_node_subsystem::messages::{
AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_node_subsystem_test_helpers::TestSubsystemSender;
use polkadot_primitives::v2::{
CandidateHash, DisputeState, InvalidDisputeStatementKind, SessionIndex,
ValidDisputeStatementKind, ValidatorSignature,
};
use std::sync::Arc;
use test_helpers;
//
// Unit tests for various functions
//
#[test]
fn should_keep_vote_behaves() {
let onchain_state = DisputeState {
validators_for: bitvec![u8, Lsb0; 1, 0, 1, 0, 1],
validators_against: bitvec![u8, Lsb0; 0, 1, 0, 0, 1],
start: 1,
concluded_at: None,
};
let local_valid_known = (ValidatorIndex(0), ValidDisputeStatementKind::Explicit);
let local_valid_unknown = (ValidatorIndex(3), ValidDisputeStatementKind::Explicit);
let local_invalid_known = (ValidatorIndex(1), InvalidDisputeStatementKind::Explicit);
let local_invalid_unknown = (ValidatorIndex(3), InvalidDisputeStatementKind::Explicit);
assert_eq!(
is_vote_worth_to_keep(
&local_valid_known.0,
DisputeStatement::Valid(local_valid_known.1),
&onchain_state
),
false
);
assert_eq!(
is_vote_worth_to_keep(
&local_valid_unknown.0,
DisputeStatement::Valid(local_valid_unknown.1),
&onchain_state
),
true
);
assert_eq!(
is_vote_worth_to_keep(
&local_invalid_known.0,
DisputeStatement::Invalid(local_invalid_known.1),
&onchain_state
),
false
);
assert_eq!(
is_vote_worth_to_keep(
&local_invalid_unknown.0,
DisputeStatement::Invalid(local_invalid_unknown.1),
&onchain_state
),
true
);
//double voting - onchain knows
let local_double_vote_onchain_knows =
(ValidatorIndex(4), InvalidDisputeStatementKind::Explicit);
assert_eq!(
is_vote_worth_to_keep(
&local_double_vote_onchain_knows.0,
DisputeStatement::Invalid(local_double_vote_onchain_knows.1),
&onchain_state
),
false
);
//double voting - onchain doesn't know
let local_double_vote_onchain_doesnt_knows =
(ValidatorIndex(0), InvalidDisputeStatementKind::Explicit);
assert_eq!(
is_vote_worth_to_keep(
&local_double_vote_onchain_doesnt_knows.0,
DisputeStatement::Invalid(local_double_vote_onchain_doesnt_knows.1),
&onchain_state
),
true
);
// empty onchain state
let empty_onchain_state = DisputeState {
validators_for: BitVec::new(),
validators_against: BitVec::new(),
start: 1,
concluded_at: None,
};
assert_eq!(
is_vote_worth_to_keep(
&local_double_vote_onchain_doesnt_knows.0,
DisputeStatement::Invalid(local_double_vote_onchain_doesnt_knows.1),
&empty_onchain_state
),
true
);
}
#[test]
fn partitioning_happy_case() {
let mut input = Vec::<(SessionIndex, CandidateHash, DisputeStatus)>::new();
let mut onchain = HashMap::<(u32, CandidateHash), DisputeState>::new();
let time_now = secs_since_epoch();
// Create one dispute for each partition
let inactive_unknown_onchain = (
0,
CandidateHash(Hash::random()),
DisputeStatus::ConcludedFor(time_now - ACTIVE_DURATION_SECS * 2),
);
input.push(inactive_unknown_onchain.clone());
let inactive_unconcluded_onchain = (
1,
CandidateHash(Hash::random()),
DisputeStatus::ConcludedFor(time_now - ACTIVE_DURATION_SECS * 2),
);
input.push(inactive_unconcluded_onchain.clone());
onchain.insert(
(inactive_unconcluded_onchain.0, inactive_unconcluded_onchain.1.clone()),
DisputeState {
validators_for: bitvec![u8, Lsb0; 1, 1, 1, 0, 0, 0, 0, 0, 0],
validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0],
start: 1,
concluded_at: None,
},
);
let active_unknown_onchain = (2, CandidateHash(Hash::random()), DisputeStatus::Active);
input.push(active_unknown_onchain.clone());
let active_unconcluded_onchain = (3, CandidateHash(Hash::random()), DisputeStatus::Active);
input.push(active_unconcluded_onchain.clone());
onchain.insert(
(active_unconcluded_onchain.0, active_unconcluded_onchain.1.clone()),
DisputeState {
validators_for: bitvec![u8, Lsb0; 1, 1, 1, 0, 0, 0, 0, 0, 0],
validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0],
start: 1,
concluded_at: None,
},
);
let active_concluded_onchain = (4, CandidateHash(Hash::random()), DisputeStatus::Active);
input.push(active_concluded_onchain.clone());
onchain.insert(
(active_concluded_onchain.0, active_concluded_onchain.1.clone()),
DisputeState {
validators_for: bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1, 0],
validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0],
start: 1,
concluded_at: Some(3),
},
);
let inactive_concluded_onchain = (
5,
CandidateHash(Hash::random()),
DisputeStatus::ConcludedFor(time_now - ACTIVE_DURATION_SECS * 2),
);
input.push(inactive_concluded_onchain.clone());
onchain.insert(
(inactive_concluded_onchain.0, inactive_concluded_onchain.1.clone()),
DisputeState {
validators_for: bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 0, 0],
validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0],
start: 1,
concluded_at: Some(3),
},
);
let result = partition_recent_disputes(input, &onchain);
// Check results
assert_eq!(result.inactive_unknown_onchain.len(), 1);
assert_eq!(
result.inactive_unknown_onchain.get(0).unwrap(),
&(inactive_unknown_onchain.0, inactive_unknown_onchain.1)
);
assert_eq!(result.inactive_unconcluded_onchain.len(), 1);
assert_eq!(
result.inactive_unconcluded_onchain.get(0).unwrap(),
&(inactive_unconcluded_onchain.0, inactive_unconcluded_onchain.1)
);
assert_eq!(result.active_unknown_onchain.len(), 1);
assert_eq!(
result.active_unknown_onchain.get(0).unwrap(),
&(active_unknown_onchain.0, active_unknown_onchain.1)
);
assert_eq!(result.active_unconcluded_onchain.len(), 1);
assert_eq!(
result.active_unconcluded_onchain.get(0).unwrap(),
&(active_unconcluded_onchain.0, active_unconcluded_onchain.1)
);
assert_eq!(result.active_concluded_onchain.len(), 1);
assert_eq!(
result.active_concluded_onchain.get(0).unwrap(),
&(active_concluded_onchain.0, active_concluded_onchain.1)
);
assert_eq!(result.inactive_concluded_onchain.len(), 1);
assert_eq!(
result.inactive_concluded_onchain.get(0).unwrap(),
&(inactive_concluded_onchain.0, inactive_concluded_onchain.1)
);
}
// This test verifies the double voting behavior. Currently we don't care if a supermajority is achieved with or
// without the 'help' of a double vote (a validator voting for and against at the same time). This makes the test
// a bit pointless but anyway I'm leaving it here to make this decision explicit and have the test code ready in
// case this behavior needs to be further tested in the future.
// Link to the PR with the discussions: https://github.com/paritytech/polkadot/pull/5567
#[test]
fn partitioning_doubled_onchain_vote() {
let mut input = Vec::<(SessionIndex, CandidateHash, DisputeStatus)>::new();
let mut onchain = HashMap::<(u32, CandidateHash), DisputeState>::new();
// Dispute A relies on a 'double onchain vote' to conclude. Validator with index 0 has voted both `for` and `against`.
// Despite that this dispute should be considered 'can conclude onchain'.
let dispute_a = (3, CandidateHash(Hash::random()), DisputeStatus::Active);
// Dispute B has supermajority + 1 votes, so the doubled onchain vote doesn't affect it. It should be considered
// as 'can conclude onchain'.
let dispute_b = (4, CandidateHash(Hash::random()), DisputeStatus::Active);
input.push(dispute_a.clone());
input.push(dispute_b.clone());
onchain.insert(
(dispute_a.0, dispute_a.1.clone()),
DisputeState {
validators_for: bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 0, 0],
validators_against: bitvec![u8, Lsb0; 1, 0, 0, 0, 0, 0, 0, 0, 0],
start: 1,
concluded_at: None,
},
);
onchain.insert(
(dispute_b.0, dispute_b.1.clone()),
DisputeState {
validators_for: bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1, 0],
validators_against: bitvec![u8, Lsb0; 1, 0, 0, 0, 0, 0, 0, 0, 0],
start: 1,
concluded_at: None,
},
);
let result = partition_recent_disputes(input, &onchain);
assert_eq!(result.active_unconcluded_onchain.len(), 0);
assert_eq!(result.active_concluded_onchain.len(), 2);
}
#[test]
fn partitioning_duplicated_dispute() {
let mut input = Vec::<(SessionIndex, CandidateHash, DisputeStatus)>::new();
let mut onchain = HashMap::<(u32, CandidateHash), DisputeState>::new();
let some_dispute = (3, CandidateHash(Hash::random()), DisputeStatus::Active);
input.push(some_dispute.clone());
input.push(some_dispute.clone());
onchain.insert(
(some_dispute.0, some_dispute.1.clone()),
DisputeState {
validators_for: bitvec![u8, Lsb0; 1, 1, 1, 0, 0, 0, 0, 0, 0],
validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0],
start: 1,
concluded_at: None,
},
);
let result = partition_recent_disputes(input, &onchain);
assert_eq!(result.active_unconcluded_onchain.len(), 1);
assert_eq!(
result.active_unconcluded_onchain.get(0).unwrap(),
&(some_dispute.0, some_dispute.1)
);
}
//
// end-to-end tests for select_disputes()
//
async fn mock_overseer(
mut receiver: mpsc::UnboundedReceiver<AllMessages>,
disputes_db: &mut TestDisputes,
vote_queries_count: &mut usize,
) {
while let Some(from_job) = receiver.next().await {
match from_job {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_,
RuntimeApiRequest::Disputes(sender),
)) => {
let _ = sender.send(Ok(disputes_db
.onchain_disputes
.clone()
.into_iter()
.map(|(k, v)| (k.0, k.1, v))
.collect::<Vec<_>>()));
},
AllMessages::RuntimeApi(_) => panic!("Unexpected RuntimeApi request"),
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::RecentDisputes(sender)) => {
let _ = sender.send(disputes_db.local_disputes.clone());
},
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::QueryCandidateVotes(
disputes,
sender,
)) => {
*vote_queries_count += 1;
let mut res = Vec::new();
for d in disputes.iter() {
let v = disputes_db.votes_db.get(d).unwrap().clone();
res.push((d.0, d.1, v));
}
let _ = sender.send(res);
},
_ => panic!("Unexpected message: {:?}", from_job),
}
}
}
fn leaf() -> ActivatedLeaf {
ActivatedLeaf {
hash: Hash::repeat_byte(0xAA),
number: 0xAA,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}
}
struct TestDisputes {
pub local_disputes: Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
pub votes_db: HashMap<(SessionIndex, CandidateHash), CandidateVotes>,
pub onchain_disputes: HashMap<(u32, CandidateHash), DisputeState>,
validators_count: usize,
}
impl TestDisputes {
pub fn new(validators_count: usize) -> TestDisputes {
TestDisputes {
local_disputes: Vec::<(SessionIndex, CandidateHash, DisputeStatus)>::new(),
votes_db: HashMap::<(SessionIndex, CandidateHash), CandidateVotes>::new(),
onchain_disputes: HashMap::<(u32, CandidateHash), DisputeState>::new(),
validators_count,
}
}
// Offchain disputes are on node side
fn add_offchain_dispute(
&mut self,
dispute: (SessionIndex, CandidateHash, DisputeStatus),
local_votes_count: usize,
dummy_receipt: CandidateReceipt,
) {
self.local_disputes.push(dispute.clone());
self.votes_db.insert(
(dispute.0, dispute.1),
CandidateVotes {
candidate_receipt: dummy_receipt,
valid: TestDisputes::generate_local_votes(
ValidDisputeStatementKind::Explicit,
0,
local_votes_count,
),
invalid: BTreeMap::new(),
},
);
}
fn add_onchain_dispute(
&mut self,
dispute: (SessionIndex, CandidateHash, DisputeStatus),
onchain_votes_count: usize,
) {
let concluded_at = match dispute.2 {
DisputeStatus::Active | DisputeStatus::Confirmed => None,
DisputeStatus::ConcludedAgainst(_) | DisputeStatus::ConcludedFor(_) => Some(1),
};
self.onchain_disputes.insert(
(dispute.0, dispute.1.clone()),
DisputeState {
validators_for: TestDisputes::generate_bitvec(
self.validators_count,
0,
onchain_votes_count,
),
validators_against: bitvec![u8, Lsb0; 0; self.validators_count],
start: 1,
concluded_at,
},
);
}
pub fn add_unconfirmed_disputes_concluded_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 80 / 100;
let session_idx = 0;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
self.add_onchain_dispute(d, onchain_votes_count);
}
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}
pub fn add_unconfirmed_disputes_unconcluded_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 40 / 100;
let session_idx = 1;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
self.add_onchain_dispute(d, onchain_votes_count);
}
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}
pub fn add_unconfirmed_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let session_idx = 2;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
}
(session_idx, local_votes_count * dispute_count)
}
pub fn add_concluded_disputes_known_onchain(&mut self, dispute_count: usize) -> (u32, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 75 / 100;
let session_idx = 3;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::ConcludedFor(0));
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
self.add_onchain_dispute(d, onchain_votes_count);
}
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}
pub fn add_concluded_disputes_unknown_onchain(&mut self, dispute_count: usize) -> (u32, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let session_idx = 4;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::ConcludedFor(0));
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
}
(session_idx, local_votes_count * dispute_count)
}
fn generate_local_votes<T: Clone>(
statement_kind: T,
start_idx: usize,
count: usize,
) -> BTreeMap<ValidatorIndex, (T, ValidatorSignature)> {
assert!(start_idx < count);
(start_idx..count)
.map(|idx| {
(
ValidatorIndex(idx as u32),
(statement_kind.clone(), test_helpers::dummy_signature()),
)
})
.collect::<BTreeMap<_, _>>()
}
fn generate_bitvec(
validator_count: usize,
start_idx: usize,
count: usize,
) -> BitVec<u8, bitvec::order::Lsb0> {
assert!(start_idx < count);
assert!(start_idx + count < validator_count);
let mut res = bitvec![u8, Lsb0; 0; validator_count];
for idx in start_idx..count {
res.set(idx, true);
}
res
}
}
#[test]
fn normal_flow() {
const VALIDATOR_COUNT: usize = 10;
const DISPUTES_PER_BATCH: usize = 2;
const ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: usize = 1;
let mut input = TestDisputes::new(VALIDATOR_COUNT);
// active, concluded onchain
let (third_idx, third_votes) =
input.add_unconfirmed_disputes_concluded_onchain(DISPUTES_PER_BATCH);
// active unconcluded onchain
let (first_idx, first_votes) =
input.add_unconfirmed_disputes_unconcluded_onchain(DISPUTES_PER_BATCH);
//concluded disputes unknown onchain
let (fifth_idx, fifth_votes) = input.add_concluded_disputes_unknown_onchain(DISPUTES_PER_BATCH);
// concluded disputes known onchain - these should be ignored
let (_, _) = input.add_concluded_disputes_known_onchain(DISPUTES_PER_BATCH);
// active disputes unknown onchain
let (second_idx, second_votes) =
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH);
let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
test_harness(
|r| mock_overseer(r, &mut input, &mut vote_queries),
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let result = select_disputes(&mut tx, &metrics, &lf).await;
assert!(!result.is_empty());
assert_eq!(result.len(), 4 * DISPUTES_PER_BATCH);
// Naive checks that the result is partitioned correctly
let (first_batch, rest): (Vec<DisputeStatementSet>, Vec<DisputeStatementSet>) =
result.into_iter().partition(|d| d.session == first_idx);
assert_eq!(first_batch.len(), DISPUTES_PER_BATCH);
let (second_batch, rest): (Vec<DisputeStatementSet>, Vec<DisputeStatementSet>) =
rest.into_iter().partition(|d| d.session == second_idx);
assert_eq!(second_batch.len(), DISPUTES_PER_BATCH);
let (third_batch, rest): (Vec<DisputeStatementSet>, Vec<DisputeStatementSet>) =
rest.into_iter().partition(|d| d.session == third_idx);
assert_eq!(third_batch.len(), DISPUTES_PER_BATCH);
let (fifth_batch, rest): (Vec<DisputeStatementSet>, Vec<DisputeStatementSet>) =
rest.into_iter().partition(|d| d.session == fifth_idx);
assert_eq!(fifth_batch.len(), DISPUTES_PER_BATCH);
// Ensure there are no more disputes - fourth_batch should be dropped
assert_eq!(rest.len(), 0);
assert_eq!(
first_batch.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v),
first_votes
);
assert_eq!(
second_batch.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v),
second_votes
);
assert_eq!(
third_batch.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v),
third_votes
);
assert_eq!(
fifth_batch.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v),
fifth_votes
);
},
);
assert!(vote_queries <= ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT);
}
#[test]
fn many_batches() {
const VALIDATOR_COUNT: usize = 10;
const DISPUTES_PER_PARTITION: usize = 10;
// 10 disputes per partition * 4 partitions = 40 disputes
// BATCH_SIZE = 11
// => There should be no more than 40 / 11 queries ( ~4 )
const ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: usize = 4;
let mut input = TestDisputes::new(VALIDATOR_COUNT);
// active which can conclude onchain
input.add_unconfirmed_disputes_concluded_onchain(DISPUTES_PER_PARTITION);
// active which can't conclude onchain
input.add_unconfirmed_disputes_unconcluded_onchain(DISPUTES_PER_PARTITION);
//concluded disputes unknown onchain
input.add_concluded_disputes_unknown_onchain(DISPUTES_PER_PARTITION);
// concluded disputes known onchain
input.add_concluded_disputes_known_onchain(DISPUTES_PER_PARTITION);
// active disputes unknown onchain
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);
let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
test_harness(
|r| mock_overseer(r, &mut input, &mut vote_queries),
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let result = select_disputes(&mut tx, &metrics, &lf).await;
assert!(!result.is_empty());
let vote_count = result.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v);
assert!(
MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME - VALIDATOR_COUNT <= vote_count &&
vote_count <= MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME,
"vote_count: {}",
vote_count
);
},
);
assert!(
vote_queries <= ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT,
"vote_queries: {} ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: {}",
vote_queries,
ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT
);
}
#[test]
fn votes_above_limit() {
const VALIDATOR_COUNT: usize = 10;
const DISPUTES_PER_PARTITION: usize = 50;
const ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: usize = 4;
let mut input = TestDisputes::new(VALIDATOR_COUNT);
// active which can conclude onchain
let (_, second_votes) =
input.add_unconfirmed_disputes_concluded_onchain(DISPUTES_PER_PARTITION);
// active which can't conclude onchain
let (_, first_votes) =
input.add_unconfirmed_disputes_unconcluded_onchain(DISPUTES_PER_PARTITION);
//concluded disputes unknown onchain
let (_, third_votes) = input.add_concluded_disputes_unknown_onchain(DISPUTES_PER_PARTITION);
assert!(
first_votes + second_votes + third_votes > 3 * MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME,
"Total relevant votes generated: {}",
first_votes + second_votes + third_votes
);
let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
test_harness(
|r| mock_overseer(r, &mut input, &mut vote_queries),
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let result = select_disputes(&mut tx, &metrics, &lf).await;
assert!(!result.is_empty());
let vote_count = result.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v);
assert!(
MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME - VALIDATOR_COUNT <= vote_count &&
vote_count <= MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME,
"vote_count: {}",
vote_count
);
},
);
assert!(
vote_queries <= ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT,
"vote_queries: {} ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: {}",
vote_queries,
ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT
);
}
@@ -0,0 +1,194 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! This module selects all RECENT disputes, fetches the votes for them from dispute-coordinator and
//! returns them as MultiDisputeStatementSet. If the RECENT disputes are more than
//! `MAX_DISPUTES_FORWARDED_TO_RUNTIME` constant - the ACTIVE disputes plus a random selection of
//! RECENT disputes (up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME`) are returned instead.
//! If the ACTIVE disputes are also above `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit - a random selection
//! of them is generated.
use crate::{metrics, LOG_TARGET};
use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::DisputeCoordinatorMessage, overseer};
use polkadot_primitives::v2::{
CandidateHash, DisputeStatement, DisputeStatementSet, MultiDisputeStatementSet, SessionIndex,
};
use std::collections::HashSet;
/// The maximum number of disputes Provisioner will include in the inherent data.
/// Serves as a protection not to flood the Runtime with excessive data.
const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 1_000;
#[derive(Debug)]
enum RequestType {
/// Query recent disputes, could be an excessive amount.
Recent,
/// Query the currently active and very recently concluded disputes.
Active,
}
/// Request open disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_disputes(
sender: &mut impl overseer::ProvisionerSenderTrait,
active_or_recent: RequestType,
) -> Vec<(SessionIndex, CandidateHash)> {
let disputes = match active_or_recent {
RequestType::Recent => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::RecentDisputes(tx);
sender.send_unbounded_message(msg);
let recent_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Channel closed: unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
recent_disputes
.into_iter()
.map(|(sesion_idx, candodate_hash, _)| (sesion_idx, candodate_hash))
.collect::<Vec<_>>()
},
RequestType::Active => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::ActiveDisputes(tx);
sender.send_unbounded_message(msg);
let active_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
active_disputes
},
};
disputes
}
/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent.
fn extend_by_random_subset_without_repetition(
acc: &mut Vec<(SessionIndex, CandidateHash)>,
extension: Vec<(SessionIndex, CandidateHash)>,
n: usize,
) {
use rand::Rng;
let lut = acc.iter().cloned().collect::<HashSet<(SessionIndex, CandidateHash)>>();
let mut unique_new =
extension.into_iter().filter(|recent| !lut.contains(recent)).collect::<Vec<_>>();
// we can simply add all
if unique_new.len() <= n {
acc.extend(unique_new)
} else {
acc.reserve(n);
let mut rng = rand::thread_rng();
for _ in 0..n {
let idx = rng.gen_range(0..unique_new.len());
acc.push(unique_new.swap_remove(idx));
}
}
// assure sorting stays candid according to session index
acc.sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
pub async fn select_disputes<Sender>(
sender: &mut Sender,
metrics: &metrics::Metrics,
) -> MultiDisputeStatementSet
where
Sender: overseer::ProvisionerSenderTrait,
{
gum::trace!(target: LOG_TARGET, "Selecting disputes for inherent data using random selection");
// We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine.
// It's heavier than `ActiveDisputes` but ensures that everything from the dispute
// window gets on-chain, unlike `ActiveDisputes`.
// In case of an overload condition, we limit ourselves to active disputes, and fill up to the
// upper bound of disputes to pass to wasm `fn create_inherent_data`.
// If the active ones are already exceeding the bounds, randomly select a subset.
let recent = request_disputes(sender, RequestType::Recent).await;
let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
gum::warn!(
target: LOG_TARGET,
"Recent disputes are excessive ({} > {}), reduce to active ones, and selected",
recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME
);
let mut active = request_disputes(sender, RequestType::Active).await;
let n_active = active.len();
let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME);
extend_by_random_subset_without_repetition(
&mut picked,
active,
MAX_DISPUTES_FORWARDED_TO_RUNTIME,
);
picked
} else {
extend_by_random_subset_without_repetition(
&mut active,
recent,
MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active),
);
active
};
active
} else {
recent
};
// Load all votes for all disputes from the coordinator.
let dispute_candidate_votes = super::request_votes(sender, disputes).await;
// Transform all `CandidateVotes` into `MultiDisputeStatementSet`.
dispute_candidate_votes
.into_iter()
.map(|(session_index, candidate_hash, votes)| {
let valid_statements = votes
.valid
.into_iter()
.map(|(i, (s, sig))| (DisputeStatement::Valid(s), i, sig));
let invalid_statements = votes
.invalid
.into_iter()
.map(|(i, (s, sig))| (DisputeStatement::Invalid(s), i, sig));
metrics.inc_valid_statements_by(valid_statements.len());
metrics.inc_invalid_statements_by(invalid_statements.len());
metrics.inc_dispute_statement_sets_by(1);
DisputeStatementSet {
candidate_hash,
session: session_index,
statements: valid_statements.chain(invalid_statements).collect(),
}
})
.collect()
}
+1 -3
View File
@@ -88,9 +88,7 @@ pub enum GetOnchainDisputesError {
#[error("runtime execution error occurred while fetching onchain disputes for parent {1}")]
Execution(#[source] RuntimeApiError, Hash),
#[error(
"runtime doesn't support RuntimeApiRequest::Disputes/RuntimeApiRequest::StagingDisputes for parent {1}"
)]
#[error("runtime doesn't support RuntimeApiRequest::Disputes for parent {1}")]
NotSupported(#[source] RuntimeApiError, Hash),
}
+66 -274
View File
@@ -25,29 +25,27 @@ use futures::{
};
use futures_timer::Delay;
use polkadot_node_primitives::CandidateVotes;
use polkadot_node_subsystem::{
jaeger,
messages::{
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData,
ProvisionerInherentData, ProvisionerMessage,
CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
ProvisionerMessage, RuntimeApiMessage, RuntimeApiRequest,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal,
PerLeafSpan, SpawnedSubsystem, SubsystemError,
PerLeafSpan, RuntimeApiError, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::{
request_availability_cores, request_persisted_validation_data, TimeoutExt,
};
use polkadot_primitives::v2::{
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState,
DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
SessionIndex, SignedAvailabilityBitfield, ValidatorIndex,
BackedCandidate, BlockNumber, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption,
SignedAvailabilityBitfield, ValidatorIndex,
};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap};
mod disputes;
mod error;
mod metrics;
mod onchain_disputes;
pub use self::metrics::*;
use error::{Error, FatalResult};
@@ -62,6 +60,9 @@ const SEND_INHERENT_DATA_TIMEOUT: std::time::Duration = core::time::Duration::fr
const LOG_TARGET: &str = "parachain::provisioner";
const PRIORITIZED_SELECTION_RUNTIME_VERSION_REQUIREMENT: u32 =
RuntimeApiRequest::DISPUTES_RUNTIME_REQUIREMENT;
/// The provisioner subsystem.
pub struct ProvisionerSubsystem {
metrics: Metrics,
@@ -361,7 +362,18 @@ async fn send_inherent_data(
relay_parent = ?leaf.hash,
"Selecting disputes"
);
let disputes = select_disputes(from_job, metrics, leaf).await?;
let disputes = match has_required_runtime(
from_job,
leaf.hash.clone(),
PRIORITIZED_SELECTION_RUNTIME_VERSION_REQUIREMENT,
)
.await
{
true => disputes::prioritized_selection::select_disputes(from_job, metrics, leaf).await,
false => disputes::random_selection::select_disputes(from_job, metrics).await,
};
gum::trace!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
@@ -677,275 +689,55 @@ fn bitfields_indicate_availability(
3 * availability.count_ones() >= 2 * availability.len()
}
#[derive(Debug)]
enum RequestType {
/// Query recent disputes, could be an excessive amount.
Recent,
/// Query the currently active and very recently concluded disputes.
Active,
}
/// Request open disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_disputes(
// If we have to be absolutely precise here, this method gets the version of the `ParachainHost` api.
// For brevity we'll just call it 'runtime version'.
async fn has_required_runtime(
sender: &mut impl overseer::ProvisionerSenderTrait,
active_or_recent: RequestType,
) -> Vec<(SessionIndex, CandidateHash)> {
relay_parent: Hash,
required_runtime_version: u32,
) -> bool {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
let (tx, rx) = oneshot::channel();
let msg = match active_or_recent {
RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx),
RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx),
};
// Bounded by block production - `ProvisionerMessage::RequestInherentData`.
sender.send_unbounded_message(msg);
let recent_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(target: LOG_TARGET, "Unable to gather {:?} disputes", active_or_recent);
Vec::new()
},
};
recent_disputes
}
/// Request the relevant dispute statements for a set of disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_votes(
sender: &mut impl overseer::ProvisionerSenderTrait,
disputes_to_query: Vec<(SessionIndex, CandidateHash)>,
) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> {
// No need to send dummy request, if nothing to request:
if disputes_to_query.is_empty() {
gum::trace!(target: LOG_TARGET, "No disputes, nothing to request - returning empty `Vec`.");
return Vec::new()
}
let (tx, rx) = oneshot::channel();
// Bounded by block production - `ProvisionerMessage::RequestInherentData`.
sender.send_unbounded_message(DisputeCoordinatorMessage::QueryCandidateVotes(
disputes_to_query,
tx,
));
sender
.send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
.await;
match rx.await {
Ok(v) => v,
Err(oneshot::Canceled) => {
gum::warn!(target: LOG_TARGET, "Unable to query candidate votes");
Vec::new()
},
}
}
/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent.
fn extend_by_random_subset_without_repetition(
acc: &mut Vec<(SessionIndex, CandidateHash)>,
extension: Vec<(SessionIndex, CandidateHash)>,
n: usize,
) {
use rand::Rng;
let lut = acc.iter().cloned().collect::<HashSet<(SessionIndex, CandidateHash)>>();
let mut unique_new =
extension.into_iter().filter(|recent| !lut.contains(recent)).collect::<Vec<_>>();
// we can simply add all
if unique_new.len() <= n {
acc.extend(unique_new)
} else {
acc.reserve(n);
let mut rng = rand::thread_rng();
for _ in 0..n {
let idx = rng.gen_range(0..unique_new.len());
acc.push(unique_new.swap_remove(idx));
}
}
// assure sorting stays candid according to session index
acc.sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
/// The maximum number of disputes Provisioner will include in the inherent data.
/// Serves as a protection not to flood the Runtime with excessive data.
const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 1_000;
async fn select_disputes(
sender: &mut impl overseer::ProvisionerSenderTrait,
metrics: &metrics::Metrics,
_leaf: &ActivatedLeaf,
) -> Result<MultiDisputeStatementSet, Error> {
// Helper lambda
// Gets the active disputes as input and partitions it in seen and unseen disputes by the Runtime
// Returns as much unseen disputes as possible and optionally some seen disputes up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit.
let generate_unseen_active_subset =
|active: Vec<(SessionIndex, CandidateHash)>,
onchain: HashMap<(SessionIndex, CandidateHash), DisputeState>|
-> Vec<(SessionIndex, CandidateHash)> {
let (seen_onchain, mut unseen_onchain): (
Vec<(SessionIndex, CandidateHash)>,
Vec<(SessionIndex, CandidateHash)>,
) = active.into_iter().partition(|d| onchain.contains_key(d));
if unseen_onchain.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
// Even unseen on-chain don't fit within the limit. Add as many as possible.
let mut unseen_subset = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME);
extend_by_random_subset_without_repetition(
&mut unseen_subset,
unseen_onchain,
MAX_DISPUTES_FORWARDED_TO_RUNTIME,
);
unseen_subset
} else {
// Add all unseen onchain disputes and as much of the seen ones as there is space.
let n_unseen_onchain = unseen_onchain.len();
extend_by_random_subset_without_repetition(
&mut unseen_onchain,
seen_onchain,
MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_unseen_onchain),
);
unseen_onchain
}
};
// Helper lambda
// Extends the active disputes with recent ones up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit. Unseen recent disputes are prioritised.
let generate_active_and_unseen_recent_subset =
|recent: Vec<(SessionIndex, CandidateHash)>,
mut active: Vec<(SessionIndex, CandidateHash)>,
onchain: HashMap<(SessionIndex, CandidateHash), DisputeState>|
-> Vec<(SessionIndex, CandidateHash)> {
let mut n_active = active.len();
// All active disputes can be sent. Fill the rest of the space with recent ones.
// We assume there is not enough space for all recent disputes. So we prioritise the unseen ones.
let (seen_onchain, unseen_onchain): (
Vec<(SessionIndex, CandidateHash)>,
Vec<(SessionIndex, CandidateHash)>,
) = recent.into_iter().partition(|d| onchain.contains_key(d));
extend_by_random_subset_without_repetition(
&mut active,
unseen_onchain,
MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active),
);
n_active = active.len();
if n_active < MAX_DISPUTES_FORWARDED_TO_RUNTIME {
// Looks like we can add some of the seen disputes too
extend_by_random_subset_without_repetition(
&mut active,
seen_onchain,
MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active),
);
}
active
};
gum::trace!(
target: LOG_TARGET,
relay_parent = ?_leaf.hash,
"Request recent disputes"
);
// We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine.
// It's heavier than `ActiveDisputes` but ensures that everything from the dispute
// window gets on-chain, unlike `ActiveDisputes`.
// In case of an overload condition, we limit ourselves to active disputes, and fill up to the
// upper bound of disputes to pass to wasm `fn create_inherent_data`.
// If the active ones are already exceeding the bounds, randomly select a subset.
let recent = request_disputes(sender, RequestType::Recent).await;
gum::trace!(
target: LOG_TARGET,
relay_paent = ?_leaf.hash,
"Received recent disputes"
);
gum::trace!(
target: LOG_TARGET,
relay_paent = ?_leaf.hash,
"Request on chain disputes"
);
// On chain disputes are fetched from the runtime. We want to prioritise the inclusion of unknown
// disputes in the inherent data. The call relies on staging Runtime API. If the staging API is not
// enabled in the binary an empty set is generated which doesn't affect the rest of the logic.
let onchain = match onchain_disputes::get_onchain_disputes(sender, _leaf.hash.clone()).await {
Ok(r) => r,
Err(e) => {
gum::debug!(
Result::Ok(Ok(runtime_version)) => {
gum::trace!(
target: LOG_TARGET,
?e,
"Can't fetch onchain disputes. Will continue with empty onchain disputes set.",
?relay_parent,
?runtime_version,
?required_runtime_version,
"Fetched ParachainHost runtime api version"
);
HashMap::new()
runtime_version >= required_runtime_version
},
};
gum::trace!(
target: LOG_TARGET,
relay_paent = ?_leaf.hash,
"Received on chain disputes"
);
gum::trace!(
target: LOG_TARGET,
relay_paent = ?_leaf.hash,
"Filtering disputes"
);
let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
gum::warn!(
target: LOG_TARGET,
"Recent disputes are excessive ({} > {}), reduce to active ones, and selected",
recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME
);
let active = request_disputes(sender, RequestType::Active).await;
if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
generate_unseen_active_subset(active, onchain)
} else {
generate_active_and_unseen_recent_subset(recent, active, onchain)
}
} else {
recent
};
gum::trace!(
target: LOG_TARGET,
relay_paent = ?_leaf.hash,
"Calling `request_votes`"
);
// Load all votes for all disputes from the coordinator.
let dispute_candidate_votes = request_votes(sender, disputes).await;
gum::trace!(
target: LOG_TARGET,
relay_paent = ?_leaf.hash,
"Finished `request_votes`"
);
// Transform all `CandidateVotes` into `MultiDisputeStatementSet`.
Ok(dispute_candidate_votes
.into_iter()
.map(|(session_index, candidate_hash, votes)| {
let valid_statements = votes
.valid
.into_iter()
.map(|(i, (s, sig))| (DisputeStatement::Valid(s), i, sig));
let invalid_statements = votes
.invalid
.into_iter()
.map(|(i, (s, sig))| (DisputeStatement::Invalid(s), i, sig));
metrics.inc_valid_statements_by(valid_statements.len());
metrics.inc_invalid_statements_by(invalid_statements.len());
metrics.inc_dispute_statement_sets_by(1);
DisputeStatementSet {
candidate_hash,
session: session_index,
statements: valid_statements.chain(invalid_statements).collect(),
}
})
.collect())
Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
?error,
"Execution error while fetching ParachainHost runtime api version"
);
false
},
Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
"NotSupported error while fetching ParachainHost runtime api version"
);
false
},
Result::Err(_) => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
"Cancelled error while fetching ParachainHost runtime api version"
);
false
},
}
}
@@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::disputes::prioritized_selection::PartitionedDisputes;
use polkadot_node_subsystem_util::metrics::{self, prometheus};
#[derive(Clone)]
@@ -32,6 +33,9 @@ struct MetricsInner {
/// 4 hours on Polkadot. The metrics are updated only when the node authors a block, so values vary across nodes.
inherent_data_dispute_statement_sets: prometheus::Counter<prometheus::U64>,
inherent_data_dispute_statements: prometheus::CounterVec<prometheus::U64>,
/// The disputes received from `disputes-coordinator` by partition
partitioned_disputes: prometheus::CounterVec<prometheus::U64>,
}
/// Provisioner metrics.
@@ -101,6 +105,44 @@ impl Metrics {
.inc_by(disputes.try_into().unwrap_or(0));
}
}
pub(crate) fn on_partition_recent_disputes(&self, disputes: &PartitionedDisputes) {
if let Some(metrics) = &self.0 {
let PartitionedDisputes {
inactive_unknown_onchain,
inactive_unconcluded_onchain: inactive_unconcluded_known_onchain,
active_unknown_onchain,
active_unconcluded_onchain,
active_concluded_onchain,
inactive_concluded_onchain: inactive_concluded_known_onchain,
} = disputes;
metrics
.partitioned_disputes
.with_label_values(&["inactive_unknown_onchain"])
.inc_by(inactive_unknown_onchain.len().try_into().unwrap_or(0));
metrics
.partitioned_disputes
.with_label_values(&["inactive_unconcluded_known_onchain"])
.inc_by(inactive_unconcluded_known_onchain.len().try_into().unwrap_or(0));
metrics
.partitioned_disputes
.with_label_values(&["active_unknown_onchain"])
.inc_by(active_unknown_onchain.len().try_into().unwrap_or(0));
metrics
.partitioned_disputes
.with_label_values(&["active_unconcluded_onchain"])
.inc_by(active_unconcluded_onchain.len().try_into().unwrap_or(0));
metrics
.partitioned_disputes
.with_label_values(&["active_concluded_onchain"])
.inc_by(active_concluded_onchain.len().try_into().unwrap_or(0));
metrics
.partitioned_disputes
.with_label_values(&["inactive_concluded_known_onchain"])
.inc_by(inactive_concluded_known_onchain.len().try_into().unwrap_or(0));
}
}
}
impl metrics::Metrics for Metrics {
@@ -156,6 +198,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
partitioned_disputes: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_provisioner_partitioned_disputes",
"some fancy description",
),
&["partition"],
)?,
&registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
@@ -1,77 +0,0 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::error::GetOnchainDisputesError;
use polkadot_node_subsystem::overseer;
use polkadot_primitives::v2::{CandidateHash, DisputeState, Hash, SessionIndex};
use std::collections::HashMap;
pub async fn get_onchain_disputes<Sender>(
_sender: &mut Sender,
_relay_parent: Hash,
) -> Result<HashMap<(SessionIndex, CandidateHash), DisputeState>, GetOnchainDisputesError>
where
Sender: overseer::ProvisionerSenderTrait,
{
let _onchain = Result::<
HashMap<(SessionIndex, CandidateHash), DisputeState>,
GetOnchainDisputesError,
>::Ok(HashMap::new());
#[cfg(feature = "staging-client")]
let _onchain = self::staging_impl::get_onchain_disputes(_sender, _relay_parent).await;
_onchain
}
// Merge this module with the outer (current one) when promoting to stable
#[cfg(feature = "staging-client")]
mod staging_impl {
use super::*; // remove this when promoting to stable
use crate::LOG_TARGET;
use futures::channel::oneshot;
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{RuntimeApiMessage, RuntimeApiRequest},
SubsystemSender,
};
/// Gets the on-chain disputes at a given block number and returns them as a `HashSet` so that searching in them is cheap.
pub async fn get_onchain_disputes(
sender: &mut impl SubsystemSender<RuntimeApiMessage>,
relay_parent: Hash,
) -> Result<HashMap<(SessionIndex, CandidateHash), DisputeState>, GetOnchainDisputesError> {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching on-chain disputes");
let (tx, rx) = oneshot::channel();
sender
.send_message(
RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::StagingDisputes(tx))
.into(),
)
.await;
rx.await
.map_err(|_| GetOnchainDisputesError::Channel)
.and_then(|res| {
res.map_err(|e| match e {
RuntimeApiError::Execution { .. } =>
GetOnchainDisputesError::Execution(e, relay_parent),
RuntimeApiError::NotSupported { .. } =>
GetOnchainDisputesError::NotSupported(e, relay_parent),
})
})
.map(|v| v.into_iter().map(|e| ((e.0, e.1), e.2)).collect())
}
}
+1 -401
View File
@@ -195,7 +195,7 @@ mod select_availability_bitfields {
}
}
mod common {
pub(crate) mod common {
use super::super::*;
use futures::channel::mpsc;
use polkadot_node_subsystem::messages::AllMessages;
@@ -497,403 +497,3 @@ mod select_candidates {
)
}
}
mod select_disputes {
use super::{super::*, common::test_harness};
use futures::channel::mpsc;
use polkadot_node_subsystem::{
messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest},
RuntimeApiError,
};
use polkadot_node_subsystem_test_helpers::TestSubsystemSender;
use polkadot_primitives::v2::DisputeState;
use std::sync::Arc;
use test_helpers;
// Global Test Data
fn recent_disputes(len: usize) -> Vec<(SessionIndex, CandidateHash)> {
let mut res = Vec::with_capacity(len);
for _ in 0..len {
res.push((0, CandidateHash(Hash::random())));
}
res
}
// same as recent_disputes() but with SessionIndex set to 1
fn active_disputes(len: usize) -> Vec<(SessionIndex, CandidateHash)> {
let mut res = Vec::with_capacity(len);
for _ in 0..len {
res.push((1, CandidateHash(Hash::random())));
}
res
}
fn leaf() -> ActivatedLeaf {
ActivatedLeaf {
hash: Hash::repeat_byte(0xAA),
number: 0xAA,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}
}
async fn mock_overseer(
leaf: ActivatedLeaf,
mut receiver: mpsc::UnboundedReceiver<AllMessages>,
onchain_disputes: Result<Vec<(SessionIndex, CandidateHash, DisputeState)>, RuntimeApiError>,
recent_disputes: Vec<(SessionIndex, CandidateHash)>,
active_disputes: Vec<(SessionIndex, CandidateHash)>,
) {
while let Some(from_job) = receiver.next().await {
match from_job {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_,
RuntimeApiRequest::StagingDisputes(sender),
)) => {
let _ = sender.send(onchain_disputes.clone());
},
AllMessages::RuntimeApi(_) => panic!("Unexpected RuntimeApi request"),
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::RecentDisputes(
sender,
)) => {
let _ = sender.send(recent_disputes.clone());
},
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes(
sender,
)) => {
let _ = sender.send(active_disputes.clone());
},
AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::QueryCandidateVotes(disputes, sender),
) => {
let mut res = Vec::new();
let v = CandidateVotes {
candidate_receipt: test_helpers::dummy_candidate_receipt(leaf.hash.clone()),
valid: BTreeMap::new(),
invalid: BTreeMap::new(),
};
for r in disputes.iter() {
res.push((r.0, r.1, v.clone()));
}
let _ = sender.send(res);
},
_ => panic!("Unexpected message: {:?}", from_job),
}
}
}
#[test]
fn recent_disputes_are_withing_onchain_limit() {
const RECENT_DISPUTES_SIZE: usize = 10;
let metrics = metrics::Metrics::new_dummy();
let onchain_disputes = Ok(Vec::new());
let active_disputes = Vec::new();
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let recent_disputes_overseer = recent_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes_overseer,
active_disputes,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
let result = disputes.iter().zip(recent_disputes.iter());
// We should get all recent disputes.
for (d, r) in result {
assert_eq!(d.session, r.0);
assert_eq!(d.candidate_hash, r.1);
}
},
)
}
#[test]
fn recent_disputes_are_too_much_but_active_are_within_limit() {
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME;
let metrics = metrics::Metrics::new_dummy();
let onchain_disputes = Ok(Vec::new());
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let active_disputes_overseer = active_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
let result = disputes.iter().zip(active_disputes.iter());
// We should get all active disputes.
for (d, r) in result {
assert_eq!(d.session, r.0);
assert_eq!(d.candidate_hash, r.1);
}
},
)
}
#[test]
fn recent_disputes_are_too_much_but_active_are_less_than_the_limit() {
// In this case all active disputes + a random set of recent disputes should be returned
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME - 10;
let metrics = metrics::Metrics::new_dummy();
let onchain_disputes = Ok(Vec::new());
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let active_disputes_overseer = active_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
// Recent disputes are generated with `SessionIndex` = 0
let (res_recent, res_active): (Vec<DisputeStatementSet>, Vec<DisputeStatementSet>) =
disputes.into_iter().partition(|d| d.session == 0);
// It should be good enough the count the number of active disputes and not compare them one by one. Checking the exact values is already covered by the previous tests.
assert_eq!(res_active.len(), active_disputes.len()); // We have got all active disputes
assert_eq!(res_active.len() + res_recent.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME);
// And some recent ones.
},
)
}
//These tests rely on staging Runtime functions so they are separated and compiled conditionally.
#[cfg(feature = "staging-client")]
mod staging_tests {
use super::*;
fn dummy_dispute_state() -> DisputeState {
DisputeState {
validators_for: BitVec::new(),
validators_against: BitVec::new(),
start: 0,
concluded_at: None,
}
}
#[test]
fn recent_disputes_are_too_much_active_fits_test_recent_prioritisation() {
// In this case recent disputes are above `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit and the active ones are below it.
// The expected behaviour is to send all active disputes and extend the set with recent ones. During the extension the disputes unknown for the Runtime are added with priority.
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME - 10;
const ONCHAIN_DISPUTE_SIZE: usize = RECENT_DISPUTES_SIZE - 9;
let metrics = metrics::Metrics::new_dummy();
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let onchain_disputes: Result<
Vec<(SessionIndex, CandidateHash, DisputeState)>,
RuntimeApiError,
> = Ok(Vec::from(&recent_disputes[0..ONCHAIN_DISPUTE_SIZE])
.iter()
.map(|(session_index, candidate_hash)| {
(*session_index, candidate_hash.clone(), dummy_dispute_state())
})
.collect());
let active_disputes_overseer = active_disputes.clone();
let recent_disputes_overseer = recent_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes_overseer,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
// Recent disputes are generated with `SessionIndex` = 0
let (res_recent, res_active): (
Vec<DisputeStatementSet>,
Vec<DisputeStatementSet>,
) = disputes.into_iter().partition(|d| d.session == 0);
// It should be good enough the count the number of the disputes and not compare them one by one as this was already covered in other tests.
assert_eq!(res_active.len(), active_disputes.len()); // We've got all active disputes.
assert_eq!(
res_recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME - active_disputes.len()
); // And some recent ones.
// Check if the recent disputes were unknown for the Runtime.
let expected_recent_disputes =
Vec::from(&recent_disputes[ONCHAIN_DISPUTE_SIZE..]);
let res_recent_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter(
res_recent.iter().map(|d| (d.session, d.candidate_hash)),
);
// Explicitly check that all unseen disputes are sent to the Runtime.
for d in &expected_recent_disputes {
assert_eq!(res_recent_set.contains(d), true);
}
},
)
}
#[test]
fn active_disputes_are_too_much_test_active_prioritisation() {
// In this case the active disputes are above the `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit so the unseen ones should be prioritised.
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ONCHAIN_DISPUTE_SIZE: usize = ACTIVE_DISPUTES_SIZE - 9;
let metrics = metrics::Metrics::new_dummy();
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let onchain_disputes: Result<
Vec<(SessionIndex, CandidateHash, DisputeState)>,
RuntimeApiError,
> = Ok(Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE])
.iter()
.map(|(session_index, candidate_hash)| {
(*session_index, candidate_hash.clone(), dummy_dispute_state())
})
.collect());
let active_disputes_overseer = active_disputes.clone();
let recent_disputes_overseer = recent_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes_overseer,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
// Recent disputes are generated with `SessionIndex` = 0
let (res_recent, res_active): (
Vec<DisputeStatementSet>,
Vec<DisputeStatementSet>,
) = disputes.into_iter().partition(|d| d.session == 0);
// It should be good enough the count the number of the disputes and not compare them one by one
assert_eq!(res_recent.len(), 0); // We expect no recent disputes
assert_eq!(res_active.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME);
let expected_active_disputes =
Vec::from(&active_disputes[ONCHAIN_DISPUTE_SIZE..]);
let res_active_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter(
res_active.iter().map(|d| (d.session, d.candidate_hash)),
);
// Explicitly check that the unseen disputes are delivered to the Runtime.
for d in &expected_active_disputes {
assert_eq!(res_active_set.contains(d), true);
}
},
)
}
#[test]
fn active_disputes_are_too_much_and_are_all_unseen() {
// In this case there are a lot of active disputes unseen by the Runtime. The focus of the test is to verify that in such cases known disputes are NOT sent to the Runtime.
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 5;
const ONCHAIN_DISPUTE_SIZE: usize = 5;
let metrics = metrics::Metrics::new_dummy();
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let onchain_disputes: Result<
Vec<(SessionIndex, CandidateHash, DisputeState)>,
RuntimeApiError,
> = Ok(Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE])
.iter()
.map(|(session_index, candidate_hash)| {
(*session_index, candidate_hash.clone(), dummy_dispute_state())
})
.collect());
let active_disputes_overseer = active_disputes.clone();
let recent_disputes_overseer = recent_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes_overseer,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
// Recent disputes are generated with `SessionIndex` = 0
let (res_recent, res_active): (
Vec<DisputeStatementSet>,
Vec<DisputeStatementSet>,
) = disputes.into_iter().partition(|d| d.session == 0);
// It should be good enough the count the number of the disputes and not compare them one by one
assert_eq!(res_recent.len(), 0);
assert_eq!(res_active.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME);
// For sure we don't want to see any of this disputes in the result
let unexpected_active_disputes =
Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE]);
let res_active_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter(
res_active.iter().map(|d| (d.session, d.candidate_hash)),
);
// Verify that the result DOESN'T contain known disputes (because there is an excessive number of unknown onces).
for d in &unexpected_active_disputes {
assert_eq!(res_active_set.contains(d), false);
}
},
)
}
}
}