Provisioner should ignore unconfirmed disputes (#6294)

* Fix typos

* Filter unconfirmed disputes in provisioner -  random_selection

* Rework dispute coordinator to return `DisputeStatus` with
`ActiveDisputes` message.
* Rework the random_selection implementation of `select_disptues` in
  `provisioner` to return only confirmed disputes.

* Filter unconfirmed disputes in provisioner - prioritized_selection

* Add test for unconfirmed disputes handling

* Fix `dispute-distribution` tests
This commit is contained in:
Tsvetomir Dimitrov
2022-11-16 16:39:05 +02:00
committed by GitHub
parent 40221806d4
commit a0f4287dd9
8 changed files with 148 additions and 66 deletions
@@ -617,7 +617,9 @@ impl Initialized {
let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.map(|((session_idx, candidate_hash), dispute_status)| {
(session_idx, candidate_hash, dispute_status)
})
.collect(),
);
},
@@ -31,7 +31,9 @@ use futures::{
use polkadot_node_subsystem_util::database::Database;
use polkadot_node_primitives::{SignedDisputeStatement, SignedFullStatement, Statement};
use polkadot_node_primitives::{
DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement,
};
use polkadot_node_subsystem::{
messages::{
ApprovalVotingMessage, ChainApiMessage, DisputeCoordinatorMessage,
@@ -682,7 +684,10 @@ fn too_many_unconfirmed_statements_are_considered_spam() {
})
.await;
assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);
let (tx, rx) = oneshot::channel();
virtual_overseer
@@ -812,7 +817,10 @@ fn approval_vote_import_works() {
})
.await;
assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);
let (tx, rx) = oneshot::channel();
virtual_overseer
@@ -934,7 +942,10 @@ fn dispute_gets_confirmed_via_participation() {
})
.await;
assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);
let (tx, rx) = oneshot::channel();
virtual_overseer
@@ -1099,7 +1110,10 @@ fn dispute_gets_confirmed_at_byzantine_threshold() {
})
.await;
assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Confirmed)]
);
let (tx, rx) = oneshot::channel();
virtual_overseer
@@ -1358,7 +1372,10 @@ fn conflicting_votes_lead_to_dispute_participation() {
})
.await;
assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash, DisputeStatus::Active)]
);
let (tx, rx) = oneshot::channel();
virtual_overseer
@@ -139,6 +139,13 @@ where
onchain.len(),
);
// Filter out unconfirmed disputes. However if the dispute is already onchain - don't skip it.
// In this case we'd better push as much fresh votes as possible to bring it to conclusion faster.
let recent_disputes = recent_disputes
.into_iter()
.filter(|d| d.2.is_confirmed_concluded() || onchain.contains_key(&(d.0, d.1)))
.collect::<Vec<_>>();
let partitioned = partition_recent_disputes(recent_disputes, &onchain);
metrics.on_partition_recent_disputes(&partitioned);
@@ -426,7 +426,7 @@ impl TestDisputes {
pub fn add_unconfirmed_disputes_concluded_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 80 / 100;
let session_idx = 0;
@@ -444,7 +444,7 @@ impl TestDisputes {
pub fn add_unconfirmed_disputes_unconcluded_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 40 / 100;
let session_idx = 1;
@@ -459,22 +459,25 @@ impl TestDisputes {
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}
pub fn add_unconfirmed_disputes_unknown_onchain(
pub fn add_confirmed_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let session_idx = 2;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Confirmed);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
}
(session_idx, local_votes_count * dispute_count)
}
pub fn add_concluded_disputes_known_onchain(&mut self, dispute_count: usize) -> (u32, usize) {
pub fn add_concluded_disputes_known_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 75 / 100;
let session_idx = 3;
@@ -488,7 +491,10 @@ impl TestDisputes {
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}
pub fn add_concluded_disputes_unknown_onchain(&mut self, dispute_count: usize) -> (u32, usize) {
pub fn add_concluded_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let session_idx = 4;
let lf = leaf();
@@ -500,6 +506,40 @@ impl TestDisputes {
(session_idx, local_votes_count * dispute_count)
}
pub fn add_unconfirmed_disputes_known_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 10 / 100;
let onchain_votes_count = self.validators_count * 10 / 100;
let session_idx = 5;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
self.add_onchain_dispute(d, onchain_votes_count);
}
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}
pub fn add_unconfirmed_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 10 / 100;
let session_idx = 6;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
}
(session_idx, local_votes_count * dispute_count)
}
fn generate_local_votes<T: Clone>(
statement_kind: T,
start_idx: usize,
@@ -554,9 +594,9 @@ fn normal_flow() {
// concluded disputes known onchain - these should be ignored
let (_, _) = input.add_concluded_disputes_known_onchain(DISPUTES_PER_BATCH);
// active disputes unknown onchain
// confirmed disputes unknown onchain
let (second_idx, second_votes) =
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH);
input.add_confirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH);
let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
@@ -635,8 +675,8 @@ fn many_batches() {
// concluded disputes known onchain
input.add_concluded_disputes_known_onchain(DISPUTES_PER_PARTITION);
// active disputes unknown onchain
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);
// confirmed disputes unknown onchain
input.add_confirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);
let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
@@ -720,3 +760,30 @@ fn votes_above_limit() {
ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT
);
}
#[test]
fn unconfirmed_are_handled_correctly() {
const VALIDATOR_COUNT: usize = 10;
const DISPUTES_PER_PARTITION: usize = 50;
let mut input = TestDisputes::new(VALIDATOR_COUNT);
// Add unconfirmed known onchain -> this should be pushed
let (pushed_idx, _) = input.add_unconfirmed_disputes_known_onchain(DISPUTES_PER_PARTITION);
// Add unconfirmed unknown onchain -> this should be ignored
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);
let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
test_harness(
|r| mock_overseer(r, &mut input, &mut vote_queries),
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let result = select_disputes(&mut tx, &metrics, &lf).await;
assert!(result.len() == DISPUTES_PER_PARTITION);
result.iter().for_each(|d| assert!(d.session == pushed_idx));
},
);
}
@@ -42,51 +42,35 @@ enum RequestType {
}
/// Request open disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_disputes(
/// Returns only confirmed/concluded disputes. The rest are filtered out.
async fn request_confirmed_disputes(
sender: &mut impl overseer::ProvisionerSenderTrait,
active_or_recent: RequestType,
) -> Vec<(SessionIndex, CandidateHash)> {
let disputes = match active_or_recent {
RequestType::Recent => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::RecentDisputes(tx);
sender.send_unbounded_message(msg);
let recent_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Channel closed: unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
recent_disputes
.into_iter()
.map(|(sesion_idx, candodate_hash, _)| (sesion_idx, candodate_hash))
.collect::<Vec<_>>()
},
RequestType::Active => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::ActiveDisputes(tx);
sender.send_unbounded_message(msg);
let active_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
active_disputes
let (tx, rx) = oneshot::channel();
let msg = match active_or_recent {
RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx),
RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx),
};
sender.send_unbounded_message(msg);
let disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Channel closed: unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
disputes
.into_iter()
.filter(|d| d.2.is_confirmed_concluded())
.map(|d| (d.0, d.1))
.collect()
}
/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent.
@@ -132,7 +116,7 @@ where
// In case of an overload condition, we limit ourselves to active disputes, and fill up to the
// upper bound of disputes to pass to wasm `fn create_inherent_data`.
// If the active ones are already exceeding the bounds, randomly select a subset.
let recent = request_disputes(sender, RequestType::Recent).await;
let recent = request_confirmed_disputes(sender, RequestType::Recent).await;
let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
gum::warn!(
target: LOG_TARGET,
@@ -140,7 +124,7 @@ where
recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME
);
let mut active = request_disputes(sender, RequestType::Active).await;
let mut active = request_confirmed_disputes(sender, RequestType::Active).await;
let n_active = active.len();
let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME);
@@ -430,7 +430,9 @@ async fn get_active_disputes<Context>(
// Caller scope is in `update_leaves` and this is bounded by fork count.
ctx.send_unbounded_message(DisputeCoordinatorMessage::ActiveDisputes(tx));
rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
rx.await
.map_err(|_| JfyiError::AskActiveDisputesCanceled)
.map(|disputes| disputes.into_iter().map(|d| (d.0, d.1)).collect())
}
/// Get all locally available dispute votes for a given dispute.
@@ -45,7 +45,7 @@ use polkadot_node_network_protocol::{
request_response::{v1::DisputeResponse, Recipient, Requests},
IfDisconnected,
};
use polkadot_node_primitives::{CandidateVotes, UncheckedDisputeMessage};
use polkadot_node_primitives::{CandidateVotes, DisputeStatus, UncheckedDisputeMessage};
use polkadot_node_subsystem::{
messages::{
AllMessages, DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult,
@@ -658,7 +658,7 @@ fn dispute_retries_and_works_across_session_boundaries() {
Some(old_head),
MOCK_SESSION_INDEX,
None,
vec![(MOCK_SESSION_INDEX, candidate.hash())],
vec![(MOCK_SESSION_INDEX, candidate.hash(), DisputeStatus::Active)],
)
.await;
@@ -673,7 +673,7 @@ fn dispute_retries_and_works_across_session_boundaries() {
Some(old_head2),
MOCK_NEXT_SESSION_INDEX,
Some(MOCK_NEXT_SESSION_INFO.clone()),
vec![(MOCK_SESSION_INDEX, candidate.hash())],
vec![(MOCK_SESSION_INDEX, candidate.hash(), DisputeStatus::Active)],
)
.await;
@@ -832,7 +832,7 @@ async fn activate_leaf(
// New session if we expect the subsystem to request it.
new_session: Option<SessionInfo>,
// Currently active disputes to send to the subsystem.
active_disputes: Vec<(SessionIndex, CandidateHash)>,
active_disputes: Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
) {
let has_active_disputes = !active_disputes.is_empty();
handle
@@ -934,7 +934,10 @@ async fn handle_subsystem_startup(
None,
MOCK_SESSION_INDEX,
Some(MOCK_SESSION_INFO.clone()),
ongoing_dispute.into_iter().map(|c| (MOCK_SESSION_INDEX, c)).collect(),
ongoing_dispute
.into_iter()
.map(|c| (MOCK_SESSION_INDEX, c, DisputeStatus::Active))
.collect(),
)
.await;
relay_parent
@@ -268,13 +268,13 @@ pub enum DisputeCoordinatorMessage {
/// - or the imported statements are backing/approval votes, which are always accepted.
pending_confirmation: Option<oneshot::Sender<ImportStatementsResult>>,
},
/// Fetch a list of all recent disputes the co-ordinator is aware of.
/// Fetch a list of all recent disputes the coordinator is aware of.
/// These are disputes which have occurred any time in recent sessions,
/// and which may have already concluded.
RecentDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash, DisputeStatus)>>),
/// Fetch a list of all active disputes that the coordinator is aware of.
/// These disputes are either not yet concluded or recently concluded.
ActiveDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash)>>),
ActiveDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash, DisputeStatus)>>),
/// Get candidate votes for a candidate.
QueryCandidateVotes(
Vec<(SessionIndex, CandidateHash)>,