Disputes which are unknown for the Runtime are sent with priority by the Provisioner when preparing inherent data (#5336)

* Implement MallocSizeOf for DisputeState

* Implementation of `Disputes` Runtime API message

* Modify on-chain dispute import

* Add feature flag for the new functionality

* Update node/core/provisioner/src/onchain_disputes.rs

Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>

* Add target to log messages

* Update node/core/provisioner/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Use `staging-client` feature to enable the client code using the staging runtime api

* Remove TODO comment

* Don't filter out DisputeState

* Fix disputes selection logic

* spelling

* Tests

* Rename `Disputes` message to `StagingDisputes`

* Update node/core/provisioner/src/lib.rs

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* Code review feedback

- Logging
- Separate error module
- Add additional fields for GetOnchainDisputesErr
- logging and impl MallocSizeOf
- fix impl MallocSizeOf for DisputeState
- fix tests

* Update node/core/provisioner/src/error.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/core/provisioner/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/core/provisioner/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* Apply suggestions from code review

dummy metrics instance

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* Revert "Apply suggestions from code review"

This reverts commit 6dc518cbf77e037ff4760d315938a68c806e662e.

* Code review feedback: #[cfg(test)] for new_dummy() in metrics

* Code review feedback: break the disputes generation logic in separate functions

* Code review feedback - align_eight

Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
Co-authored-by: Andronik <write@reusable.software>
Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
This commit is contained in:
Tsvetomir Dimitrov
2022-05-06 15:58:04 +03:00
committed by GitHub
parent 673a32d968
commit 20e56a453c
14 changed files with 753 additions and 94 deletions
+1
View File
@@ -6958,6 +6958,7 @@ name = "polkadot-node-core-provisioner"
version = "0.9.19"
dependencies = [
"bitvec",
"fatality",
"futures 0.3.21",
"futures-timer",
"polkadot-node-primitives",
+1
View File
@@ -203,6 +203,7 @@ try-runtime = [ "polkadot-cli/try-runtime" ]
fast-runtime = [ "polkadot-cli/fast-runtime" ]
runtime-metrics = [ "polkadot-cli/runtime-metrics" ]
pyroscope = ["polkadot-cli/pyroscope"]
staging-client = ["polkadot-cli/staging-client"]
# Configuration for building a .deb package - for use with `cargo-deb`
[package.metadata.deb]
+1
View File
@@ -73,3 +73,4 @@ rococo-native = ["service/rococo-native"]
malus = ["full-node", "service/malus"]
runtime-metrics = ["service/runtime-metrics", "polkadot-node-metrics/runtime-metrics"]
staging-client = ["service/staging-client"]
@@ -15,9 +15,13 @@ polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
futures-timer = "3.0.2"
rand = "0.8.5"
fatality = "0.0.6"
[dev-dependencies]
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" }
[features]
staging-client = []
@@ -0,0 +1,83 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
///! Error types for provisioner module
use fatality;
use futures::channel::{mpsc, oneshot};
use polkadot_node_subsystem::errors::{ChainApiError, RuntimeApiError};
use polkadot_node_subsystem_util as util;
use polkadot_primitives::v2::Hash;
use thiserror::Error;
/// Errors in the provisioner.
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum Error {
#[error(transparent)]
Util(#[from] util::Error),
#[error("failed to get availability cores")]
CanceledAvailabilityCores(#[source] oneshot::Canceled),
#[error("failed to get persisted validation data")]
CanceledPersistedValidationData(#[source] oneshot::Canceled),
#[error("failed to get block number")]
CanceledBlockNumber(#[source] oneshot::Canceled),
#[error("failed to get backed candidates")]
CanceledBackedCandidates(#[source] oneshot::Canceled),
#[error("failed to get votes on dispute")]
CanceledCandidateVotes(#[source] oneshot::Canceled),
#[error(transparent)]
ChainApi(#[from] ChainApiError),
#[error(transparent)]
Runtime(#[from] RuntimeApiError),
#[error("failed to send message to ChainAPI")]
ChainApiMessageSend(#[source] mpsc::SendError),
#[error("failed to send message to CandidateBacking to get backed candidates")]
GetBackedCandidatesSend(#[source] mpsc::SendError),
#[error("failed to send return message with Inherents")]
InherentDataReturnChannel,
#[error(
"backed candidate does not correspond to selected candidate; check logic in provisioner"
)]
BackedCandidateOrderingProblem,
}
/// Used by `get_onchain_disputes` to represent errors related to fetching on-chain disputes from the Runtime
#[allow(dead_code)] // Remove when promoting to stable
#[fatality::fatality]
pub enum GetOnchainDisputesError {
#[fatal]
#[error("runtime subsystem is down")]
Channel,
#[error("runtime execution error occurred while fetching onchain disputes for parent {1}")]
Execution(#[source] RuntimeApiError, Hash),
#[error(
"runtime doesn't support RuntimeApiRequest::Disputes/RuntimeApiRequest::StagingDisputes for parent {1}"
)]
NotSupported(#[source] RuntimeApiError, Hash),
}
+101 -71
View File
@@ -27,7 +27,6 @@ use futures::{
use futures_timer::Delay;
use polkadot_node_primitives::CandidateVotes;
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
jaeger,
messages::{
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData,
@@ -36,23 +35,25 @@ use polkadot_node_subsystem::{
ActivatedLeaf, LeafStatus, PerLeafSpan, SubsystemSender,
};
use polkadot_node_subsystem_util::{
self as util, request_availability_cores, request_persisted_validation_data, JobSender,
JobSubsystem, JobTrait,
request_availability_cores, request_persisted_validation_data, JobSender, JobSubsystem,
JobTrait,
};
use polkadot_primitives::v2::{
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeStatement,
DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption, SessionIndex,
SignedAvailabilityBitfield, ValidatorIndex,
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState,
DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
SessionIndex, SignedAvailabilityBitfield, ValidatorIndex,
};
use std::{
collections::{BTreeMap, HashSet},
collections::{BTreeMap, HashMap, HashSet},
pin::Pin,
};
use thiserror::Error;
mod error;
mod metrics;
mod onchain_disputes;
pub use self::metrics::*;
use error::Error;
#[cfg(test)]
mod tests;
@@ -105,49 +106,6 @@ pub struct ProvisionerJob {
awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>,
}
/// Errors in the provisioner.
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum Error {
#[error(transparent)]
Util(#[from] util::Error),
#[error("failed to get availability cores")]
CanceledAvailabilityCores(#[source] oneshot::Canceled),
#[error("failed to get persisted validation data")]
CanceledPersistedValidationData(#[source] oneshot::Canceled),
#[error("failed to get block number")]
CanceledBlockNumber(#[source] oneshot::Canceled),
#[error("failed to get backed candidates")]
CanceledBackedCandidates(#[source] oneshot::Canceled),
#[error("failed to get votes on dispute")]
CanceledCandidateVotes(#[source] oneshot::Canceled),
#[error(transparent)]
ChainApi(#[from] ChainApiError),
#[error(transparent)]
Runtime(#[from] RuntimeApiError),
#[error("failed to send message to ChainAPI")]
ChainApiMessageSend(#[source] mpsc::SendError),
#[error("failed to send message to CandidateBacking to get backed candidates")]
GetBackedCandidatesSend(#[source] mpsc::SendError),
#[error("failed to send return message with Inherents")]
InherentDataReturnChannel,
#[error(
"backed candidate does not correspond to selected candidate; check logic in provisioner"
)]
BackedCandidateOrderingProblem,
}
/// Provisioner run arguments.
#[derive(Debug, Clone, Copy)]
pub struct ProvisionerConfig;
@@ -325,7 +283,7 @@ async fn send_inherent_data(
.await
.map_err(|err| Error::CanceledAvailabilityCores(err))??;
let disputes = select_disputes(from_job, metrics).await?;
let disputes = select_disputes(from_job, metrics, leaf).await?;
// Only include bitfields on fresh leaves. On chain reversions, we want to make sure that
// there will be at least one block, which cannot get disputed, so the chain can make progress.
@@ -700,11 +658,80 @@ fn extend_by_random_subset_without_repetition(
acc.sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
/// The maximum number of disputes Provisioner will include in the inherent data.
/// Serves as a protection not to flood the Runtime with excessive data.
const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 1_000;
async fn select_disputes(
sender: &mut impl SubsystemSender,
metrics: &metrics::Metrics,
_leaf: &ActivatedLeaf,
) -> Result<MultiDisputeStatementSet, Error> {
const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 1_000;
// Helper lambda
// Gets the active disputes as input and partitions it in seen and unseen disputes by the Runtime
// Returns as much unseen disputes as possible and optionally some seen disputes up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit.
let generate_unseen_active_subset =
|active: Vec<(SessionIndex, CandidateHash)>,
onchain: HashMap<(SessionIndex, CandidateHash), DisputeState>|
-> Vec<(SessionIndex, CandidateHash)> {
let (seen_onchain, mut unseen_onchain): (
Vec<(SessionIndex, CandidateHash)>,
Vec<(SessionIndex, CandidateHash)>,
) = active.into_iter().partition(|d| onchain.contains_key(d));
if unseen_onchain.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
// Even unseen on-chain don't fit within the limit. Add as many as possible.
let mut unseen_subset = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME);
extend_by_random_subset_without_repetition(
&mut unseen_subset,
unseen_onchain,
MAX_DISPUTES_FORWARDED_TO_RUNTIME,
);
unseen_subset
} else {
// Add all unseen onchain disputes and as much of the seen ones as there is space.
let n_unseen_onchain = unseen_onchain.len();
extend_by_random_subset_without_repetition(
&mut unseen_onchain,
seen_onchain,
MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_unseen_onchain),
);
unseen_onchain
}
};
// Helper lambda
// Extends the active disputes with recent ones up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit. Unseen recent disputes are prioritised.
let generate_active_and_unseen_recent_subset =
|recent: Vec<(SessionIndex, CandidateHash)>,
mut active: Vec<(SessionIndex, CandidateHash)>,
onchain: HashMap<(SessionIndex, CandidateHash), DisputeState>|
-> Vec<(SessionIndex, CandidateHash)> {
let mut n_active = active.len();
// All active disputes can be sent. Fill the rest of the space with recent ones.
// We assume there is not enough space for all recent disputes. So we prioritise the unseen ones.
let (seen_onchain, unseen_onchain): (
Vec<(SessionIndex, CandidateHash)>,
Vec<(SessionIndex, CandidateHash)>,
) = recent.into_iter().partition(|d| onchain.contains_key(d));
extend_by_random_subset_without_repetition(
&mut active,
unseen_onchain,
MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active),
);
n_active = active.len();
if n_active < MAX_DISPUTES_FORWARDED_TO_RUNTIME {
// Looks like we can add some of the seen disputes too
extend_by_random_subset_without_repetition(
&mut active,
seen_onchain,
MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active),
);
}
active
};
// We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine.
// It's heavier than `ActiveDisputes` but ensures that everything from the dispute
@@ -713,6 +740,22 @@ async fn select_disputes(
// upper bound of disputes to pass to wasm `fn create_inherent_data`.
// If the active ones are already exceeding the bounds, randomly select a subset.
let recent = request_disputes(sender, RequestType::Recent).await;
// On chain disputes are fetched from the runtime. We want to prioritise the inclusion of unknown
// disputes in the inherent data. The call relies on staging Runtime API. If the staging API is not
// enabled in the binary an empty set is generated which doesn't affect the rest of the logic.
let onchain = match onchain_disputes::get_onchain_disputes(sender, _leaf.hash.clone()).await {
Ok(r) => r,
Err(e) => {
gum::debug!(
target: LOG_TARGET,
?e,
"Can't fetch onchain disputes. Will continue with empty onchain disputes set.",
);
HashMap::new()
},
};
let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
gum::warn!(
target: LOG_TARGET,
@@ -720,25 +763,12 @@ async fn select_disputes(
recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME
);
let mut active = request_disputes(sender, RequestType::Active).await;
let n_active = active.len();
let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME);
extend_by_random_subset_without_repetition(
&mut picked,
active,
MAX_DISPUTES_FORWARDED_TO_RUNTIME,
);
picked
let active = request_disputes(sender, RequestType::Active).await;
if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
generate_unseen_active_subset(active, onchain)
} else {
extend_by_random_subset_without_repetition(
&mut active,
recent,
MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active),
);
active
};
active
generate_active_and_unseen_recent_subset(recent, active, onchain)
}
} else {
recent
};
@@ -34,6 +34,12 @@ struct MetricsInner {
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
/// Creates new dummy `Metrics` instance. Used for testing only.
#[cfg(test)]
pub fn new_dummy() -> Metrics {
Metrics(None)
}
pub(crate) fn on_inherent_data_request(&self, response: Result<(), ()>) {
if let Some(metrics) = &self.0 {
match response {
@@ -0,0 +1,74 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::error::GetOnchainDisputesError;
use polkadot_node_subsystem::SubsystemSender;
use polkadot_primitives::v2::{CandidateHash, DisputeState, Hash, SessionIndex};
use std::collections::HashMap;
pub async fn get_onchain_disputes(
_sender: &mut impl SubsystemSender,
_relay_parent: Hash,
) -> Result<HashMap<(SessionIndex, CandidateHash), DisputeState>, GetOnchainDisputesError> {
let _onchain = Result::<
HashMap<(SessionIndex, CandidateHash), DisputeState>,
GetOnchainDisputesError,
>::Ok(HashMap::new());
#[cfg(feature = "staging-client")]
let _onchain = self::staging_impl::get_onchain_disputes(_sender, _relay_parent).await;
_onchain
}
// Merge this module with the outer (current one) when promoting to stable
#[cfg(feature = "staging-client")]
mod staging_impl {
use super::*; // remove this when promoting to stable
use crate::LOG_TARGET;
use futures::channel::oneshot;
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{RuntimeApiMessage, RuntimeApiRequest},
SubsystemSender,
};
/// Gets the on-chain disputes at a given block number and returns them as a `HashSet` so that searching in them is cheap.
pub async fn get_onchain_disputes(
sender: &mut impl SubsystemSender,
relay_parent: Hash,
) -> Result<HashMap<(SessionIndex, CandidateHash), DisputeState>, GetOnchainDisputesError> {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching on-chain disputes");
let (tx, rx) = oneshot::channel();
sender
.send_message(
RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::StagingDisputes(tx))
.into(),
)
.await;
rx.await
.map_err(|_| GetOnchainDisputesError::Channel)
.and_then(|res| {
res.map_err(|e| match e {
RuntimeApiError::Execution { .. } =>
GetOnchainDisputesError::Execution(e, relay_parent),
RuntimeApiError::NotSupported { .. } =>
GetOnchainDisputesError::NotSupported(e, relay_parent),
})
})
.map(|v| v.into_iter().map(|e| ((e.0, e.1), e.2)).collect())
}
}
+424 -15
View File
@@ -195,23 +195,12 @@ mod select_availability_bitfields {
}
}
mod select_candidates {
use super::{super::*, build_occupied_core, default_bitvec, occupied_core, scheduled_core};
use ::test_helpers::{dummy_candidate_descriptor, dummy_hash};
use polkadot_node_subsystem::messages::{
AllMessages, RuntimeApiMessage,
RuntimeApiRequest::{
AvailabilityCores, PersistedValidationData as PersistedValidationDataReq,
},
};
mod common {
use super::super::*;
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem_test_helpers::TestSubsystemSender;
use polkadot_primitives::v2::{
BlockNumber, CandidateCommitments, CommittedCandidateReceipt, PersistedValidationData,
};
const BLOCK_UNDER_PRODUCTION: BlockNumber = 128;
fn test_harness<OverseerFactory, Overseer, TestFactory, Test>(
pub fn test_harness<OverseerFactory, Overseer, TestFactory, Test>(
overseer_factory: OverseerFactory,
test_factory: TestFactory,
) where
@@ -228,6 +217,26 @@ mod select_candidates {
let _ = futures::executor::block_on(future::join(overseer, test));
}
}
mod select_candidates {
use super::{
super::*, build_occupied_core, common::test_harness, default_bitvec, occupied_core,
scheduled_core,
};
use ::test_helpers::{dummy_candidate_descriptor, dummy_hash};
use polkadot_node_subsystem::messages::{
AllMessages, RuntimeApiMessage,
RuntimeApiRequest::{
AvailabilityCores, PersistedValidationData as PersistedValidationDataReq,
},
};
use polkadot_node_subsystem_test_helpers::TestSubsystemSender;
use polkadot_primitives::v2::{
BlockNumber, CandidateCommitments, CommittedCandidateReceipt, PersistedValidationData,
};
const BLOCK_UNDER_PRODUCTION: BlockNumber = 128;
// For test purposes, we always return this set of availability cores:
//
@@ -486,3 +495,403 @@ mod select_candidates {
)
}
}
mod select_disputes {
use super::{super::*, common::test_harness};
use polkadot_node_subsystem::{
messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest},
RuntimeApiError,
};
use polkadot_node_subsystem_test_helpers::TestSubsystemSender;
use polkadot_primitives::v2::DisputeState;
use std::sync::Arc;
use test_helpers;
// Global Test Data
fn recent_disputes(len: usize) -> Vec<(SessionIndex, CandidateHash)> {
let mut res = Vec::with_capacity(len);
for _ in 0..len {
res.push((0, CandidateHash(Hash::random())));
}
res
}
// same as recent_disputes() but with SessionIndex set to 1
fn active_disputes(len: usize) -> Vec<(SessionIndex, CandidateHash)> {
let mut res = Vec::with_capacity(len);
for _ in 0..len {
res.push((1, CandidateHash(Hash::random())));
}
res
}
fn leaf() -> ActivatedLeaf {
ActivatedLeaf {
hash: Hash::repeat_byte(0xAA),
number: 0xAA,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}
}
async fn mock_overseer(
leaf: ActivatedLeaf,
mut receiver: mpsc::UnboundedReceiver<AllMessages>,
onchain_disputes: Result<Vec<(SessionIndex, CandidateHash, DisputeState)>, RuntimeApiError>,
recent_disputes: Vec<(SessionIndex, CandidateHash)>,
active_disputes: Vec<(SessionIndex, CandidateHash)>,
) {
while let Some(from_job) = receiver.next().await {
match from_job {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_,
RuntimeApiRequest::StagingDisputes(sender),
)) => {
let _ = sender.send(onchain_disputes.clone());
},
AllMessages::RuntimeApi(_) => panic!("Unexpected RuntimeApi request"),
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::RecentDisputes(
sender,
)) => {
let _ = sender.send(recent_disputes.clone());
},
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes(
sender,
)) => {
let _ = sender.send(active_disputes.clone());
},
AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::QueryCandidateVotes(disputes, sender),
) => {
let mut res = Vec::new();
let v = CandidateVotes {
candidate_receipt: test_helpers::dummy_candidate_receipt(leaf.hash.clone()),
valid: vec![],
invalid: vec![],
};
for r in disputes.iter() {
res.push((r.0, r.1, v.clone()));
}
let _ = sender.send(res);
},
_ => panic!("Unexpected message: {:?}", from_job),
}
}
}
#[test]
fn recent_disputes_are_withing_onchain_limit() {
const RECENT_DISPUTES_SIZE: usize = 10;
let metrics = metrics::Metrics::new_dummy();
let onchain_disputes = Ok(Vec::new());
let active_disputes = Vec::new();
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let recent_disputes_overseer = recent_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes_overseer,
active_disputes,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
let result = disputes.iter().zip(recent_disputes.iter());
// We should get all recent disputes.
for (d, r) in result {
assert_eq!(d.session, r.0);
assert_eq!(d.candidate_hash, r.1);
}
},
)
}
#[test]
fn recent_disputes_are_too_much_but_active_are_within_limit() {
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME;
let metrics = metrics::Metrics::new_dummy();
let onchain_disputes = Ok(Vec::new());
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let active_disputes_overseer = active_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
let result = disputes.iter().zip(active_disputes.iter());
// We should get all active disputes.
for (d, r) in result {
assert_eq!(d.session, r.0);
assert_eq!(d.candidate_hash, r.1);
}
},
)
}
#[test]
fn recent_disputes_are_too_much_but_active_are_less_than_the_limit() {
// In this case all active disputes + a random set of recent disputes should be returned
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME - 10;
let metrics = metrics::Metrics::new_dummy();
let onchain_disputes = Ok(Vec::new());
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let active_disputes_overseer = active_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
// Recent disputes are generated with `SessionIndex` = 0
let (res_recent, res_active): (Vec<DisputeStatementSet>, Vec<DisputeStatementSet>) =
disputes.into_iter().partition(|d| d.session == 0);
// It should be good enough the count the number of active disputes and not compare them one by one. Checking the exact values is already covered by the previous tests.
assert_eq!(res_active.len(), active_disputes.len()); // We have got all active disputes
assert_eq!(res_active.len() + res_recent.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME);
// And some recent ones.
},
)
}
//These tests rely on staging Runtime functions so they are separated and compiled conditionally.
#[cfg(feature = "staging-client")]
mod staging_tests {
use super::*;
fn dummy_dispute_state() -> DisputeState {
DisputeState {
validators_for: BitVec::new(),
validators_against: BitVec::new(),
start: 0,
concluded_at: None,
}
}
#[test]
fn recent_disputes_are_too_much_active_fits_test_recent_prioritisation() {
// In this case recent disputes are above `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit and the active ones are below it.
// The expected behaviour is to send all active disputes and extend the set with recent ones. During the extension the disputes unknown for the Runtime are added with priority.
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME - 10;
const ONCHAIN_DISPUTE_SIZE: usize = RECENT_DISPUTES_SIZE - 9;
let metrics = metrics::Metrics::new_dummy();
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let onchain_disputes: Result<
Vec<(SessionIndex, CandidateHash, DisputeState)>,
RuntimeApiError,
> = Ok(Vec::from(&recent_disputes[0..ONCHAIN_DISPUTE_SIZE])
.iter()
.map(|(session_index, candidate_hash)| {
(*session_index, candidate_hash.clone(), dummy_dispute_state())
})
.collect());
let active_disputes_overseer = active_disputes.clone();
let recent_disputes_overseer = recent_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes_overseer,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
// Recent disputes are generated with `SessionIndex` = 0
let (res_recent, res_active): (
Vec<DisputeStatementSet>,
Vec<DisputeStatementSet>,
) = disputes.into_iter().partition(|d| d.session == 0);
// It should be good enough the count the number of the disputes and not compare them one by one as this was already covered in other tests.
assert_eq!(res_active.len(), active_disputes.len()); // We've got all active disputes.
assert_eq!(
res_recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME - active_disputes.len()
); // And some recent ones.
// Check if the recent disputes were unknown for the Runtime.
let expected_recent_disputes =
Vec::from(&recent_disputes[ONCHAIN_DISPUTE_SIZE..]);
let res_recent_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter(
res_recent.iter().map(|d| (d.session, d.candidate_hash)),
);
// Explicitly check that all unseen disputes are sent to the Runtime.
for d in &expected_recent_disputes {
assert_eq!(res_recent_set.contains(d), true);
}
},
)
}
#[test]
fn active_disputes_are_too_much_test_active_prioritisation() {
// In this case the active disputes are above the `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit so the unseen ones should be prioritised.
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ONCHAIN_DISPUTE_SIZE: usize = ACTIVE_DISPUTES_SIZE - 9;
let metrics = metrics::Metrics::new_dummy();
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let onchain_disputes: Result<
Vec<(SessionIndex, CandidateHash, DisputeState)>,
RuntimeApiError,
> = Ok(Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE])
.iter()
.map(|(session_index, candidate_hash)| {
(*session_index, candidate_hash.clone(), dummy_dispute_state())
})
.collect());
let active_disputes_overseer = active_disputes.clone();
let recent_disputes_overseer = recent_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes_overseer,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
// Recent disputes are generated with `SessionIndex` = 0
let (res_recent, res_active): (
Vec<DisputeStatementSet>,
Vec<DisputeStatementSet>,
) = disputes.into_iter().partition(|d| d.session == 0);
// It should be good enough the count the number of the disputes and not compare them one by one
assert_eq!(res_recent.len(), 0); // We expect no recent disputes
assert_eq!(res_active.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME);
let expected_active_disputes =
Vec::from(&active_disputes[ONCHAIN_DISPUTE_SIZE..]);
let res_active_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter(
res_active.iter().map(|d| (d.session, d.candidate_hash)),
);
// Explicitly check that the unseen disputes are delivered to the Runtime.
for d in &expected_active_disputes {
assert_eq!(res_active_set.contains(d), true);
}
},
)
}
#[test]
fn active_disputes_are_too_much_and_are_all_unseen() {
// In this case there are a lot of active disputes unseen by the Runtime. The focus of the test is to verify that in such cases known disputes are NOT sent to the Runtime.
const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10;
const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 5;
const ONCHAIN_DISPUTE_SIZE: usize = 5;
let metrics = metrics::Metrics::new_dummy();
let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE);
let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE);
let onchain_disputes: Result<
Vec<(SessionIndex, CandidateHash, DisputeState)>,
RuntimeApiError,
> = Ok(Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE])
.iter()
.map(|(session_index, candidate_hash)| {
(*session_index, candidate_hash.clone(), dummy_dispute_state())
})
.collect());
let active_disputes_overseer = active_disputes.clone();
let recent_disputes_overseer = recent_disputes.clone();
test_harness(
|r| {
mock_overseer(
leaf(),
r,
onchain_disputes,
recent_disputes_overseer,
active_disputes_overseer,
)
},
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap();
assert!(!disputes.is_empty());
// Recent disputes are generated with `SessionIndex` = 0
let (res_recent, res_active): (
Vec<DisputeStatementSet>,
Vec<DisputeStatementSet>,
) = disputes.into_iter().partition(|d| d.session == 0);
// It should be good enough the count the number of the disputes and not compare them one by one
assert_eq!(res_recent.len(), 0);
assert_eq!(res_active.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME);
// For sure we don't want to see any of this disputes in the result
let unexpected_active_disputes =
Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE]);
let res_active_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter(
res_active.iter().map(|d| (d.session, d.candidate_hash)),
);
// Verify that the result DOESN'T contain known disputes (because there is an excessive number of unknown onces).
for d in &unexpected_active_disputes {
assert_eq!(res_active_set.contains(d), false);
}
},
)
}
}
}
+24 -2
View File
@@ -21,8 +21,8 @@ use parity_util_mem::{MallocSizeOf, MallocSizeOfExt};
use sp_consensus_babe::Epoch;
use polkadot_primitives::v2::{
AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent,
CommittedCandidateReceipt, CoreState, GroupRotationInfo, Hash, Id as ParaId,
AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash,
CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, Id as ParaId,
InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData,
PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
@@ -47,6 +47,7 @@ const ON_CHAIN_VOTES_CACHE_SIZE: usize = 3 * 1024;
const PVFS_REQUIRE_PRECHECK_SIZE: usize = 1024;
const VALIDATION_CODE_HASH_CACHE_SIZE: usize = 64 * 1024;
const VERSION_CACHE_SIZE: usize = 4 * 1024;
const DISPUTES_CACHE_SIZE: usize = 64 * 1024;
struct ResidentSizeOf<T>(T);
@@ -115,6 +116,10 @@ pub(crate) struct RequestResultCache {
ResidentSizeOf<Option<ValidationCodeHash>>,
>,
version: MemoryLruCache<Hash, ResidentSizeOf<u32>>,
disputes: MemoryLruCache<
Hash,
ResidentSizeOf<Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>>,
>,
}
impl Default for RequestResultCache {
@@ -142,6 +147,7 @@ impl Default for RequestResultCache {
pvfs_require_precheck: MemoryLruCache::new(PVFS_REQUIRE_PRECHECK_SIZE),
validation_code_hash: MemoryLruCache::new(VALIDATION_CODE_HASH_CACHE_SIZE),
version: MemoryLruCache::new(VERSION_CACHE_SIZE),
disputes: MemoryLruCache::new(DISPUTES_CACHE_SIZE),
}
}
}
@@ -407,6 +413,21 @@ impl RequestResultCache {
pub(crate) fn cache_version(&mut self, key: Hash, value: u32) {
self.version.insert(key, ResidentSizeOf(value));
}
pub(crate) fn disputes(
&mut self,
relay_parent: &Hash,
) -> Option<&Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>> {
self.disputes.get(relay_parent).map(|v| &v.0)
}
pub(crate) fn cache_disputes(
&mut self,
relay_parent: Hash,
value: Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>,
) {
self.disputes.insert(relay_parent, ResidentSizeOf(value));
}
}
pub(crate) enum RequestResult {
@@ -442,4 +463,5 @@ pub(crate) enum RequestResult {
SubmitPvfCheckStatement(Hash, PvfCheckStatement, ValidatorSignature, ()),
ValidationCodeHash(Hash, ParaId, OccupiedCoreAssumption, Option<ValidationCodeHash>),
Version(Hash, u32),
StagingDisputes(Hash, Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>),
}
@@ -169,6 +169,8 @@ where
.cache_validation_code_hash((relay_parent, para_id, assumption), hash),
Version(relay_parent, version) =>
self.requests_cache.cache_version(relay_parent, version),
StagingDisputes(relay_parent, disputes) =>
self.requests_cache.cache_disputes(relay_parent, disputes),
}
}
@@ -270,6 +272,8 @@ where
Request::ValidationCodeHash(para, assumption, sender) =>
query!(validation_code_hash(para, assumption), sender)
.map(|sender| Request::ValidationCodeHash(para, assumption, sender)),
Request::StagingDisputes(sender) =>
query!(disputes(), sender).map(|sender| Request::StagingDisputes(sender)),
}
}
@@ -526,5 +530,7 @@ where
},
Request::ValidationCodeHash(para, assumption, sender) =>
query!(ValidationCodeHash, validation_code_hash(para, assumption), ver = 2, sender),
Request::StagingDisputes(sender) =>
query!(StagingDisputes, staging_get_disputes(), ver = 2, sender),
}
}
+2
View File
@@ -198,3 +198,5 @@ runtime-metrics = [
"polkadot-runtime/runtime-metrics",
"polkadot-runtime-parachains/runtime-metrics"
]
staging-client = ["polkadot-node-core-provisioner/staging-client"]
+10 -6
View File
@@ -40,12 +40,12 @@ use polkadot_node_primitives::{
};
use polkadot_primitives::v2::{
AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateEvent, CandidateHash,
CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState, GroupIndex,
GroupRotationInfo, Hash, Header as BlockHeader, Id as ParaId, InboundDownwardMessage,
InboundHrmpMessage, MultiDisputeStatementSet, OccupiedCoreAssumption, PersistedValidationData,
PvfCheckStatement, SessionIndex, SessionInfo, SignedAvailabilityBitfield,
SignedAvailabilityBitfields, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
ValidatorSignature,
CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState,
DisputeState, GroupIndex, GroupRotationInfo, Hash, Header as BlockHeader, Id as ParaId,
InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet, OccupiedCoreAssumption,
PersistedValidationData, PvfCheckStatement, SessionIndex, SessionInfo,
SignedAvailabilityBitfield, SignedAvailabilityBitfields, ValidationCode, ValidationCodeHash,
ValidatorId, ValidatorIndex, ValidatorSignature,
};
use polkadot_statement_table::v2::Misbehavior;
use std::{
@@ -693,6 +693,10 @@ pub enum RuntimeApiRequest {
OccupiedCoreAssumption,
RuntimeApiSender<Option<ValidationCodeHash>>,
),
/// Returns all on-chain disputes at given block number.
StagingDisputes(
RuntimeApiSender<Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>>,
),
}
/// A message to the Runtime API subsystem.
+16
View File
@@ -1403,6 +1403,22 @@ pub struct DisputeState<N = BlockNumber> {
pub concluded_at: Option<N>,
}
#[cfg(feature = "std")]
impl MallocSizeOf for DisputeState {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
// destructuring to make sure no new fields are added to the struct without modifying this function
let Self { validators_for, validators_against, start, concluded_at } = self;
// According to the documentation `.capacity()` might not return a byte aligned value, so just in case:
let align_eight = |d: usize| (d + 7) / 8;
align_eight(validators_for.capacity()) +
align_eight(validators_against.capacity()) +
start.size_of(ops) +
concluded_at.size_of(ops)
}
}
/// Parachains inherent-data passed into the runtime by a block author
#[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug, TypeInfo)]
pub struct InherentData<HDR: HeaderT = Header> {