Dispute Coordinator: Batch queries (#3459)

* disputes: Allow batch queries in dispute-coordinator

This commit moves to batch queries when responding to QueryCandidateVotes
messages. This simplifies the code in the provisioner and dispute-coordinator
by no longer requiring to make use of a FuturesOrdered when awaiting multiple
quries. Instead, the provisioner need only request the batch itself.

* node/approval-voting: Address Feedback to fail on query element missing.

* Address feedback

* Fix implementer's guide
This commit is contained in:
Lldenaurois
2021-07-12 21:06:14 -04:00
committed by GitHub
parent 2d102308de
commit 2d66b8f256
8 changed files with 66 additions and 74 deletions
@@ -444,18 +444,27 @@ async fn handle_incoming(
let _ = rx.send(collect_active(recent_disputes, now));
}
DisputeCoordinatorMessage::QueryCandidateVotes(
session,
candidate_hash,
query,
rx
) => {
let candidate_votes = db::v1::load_candidate_votes(
store,
&config.column_config(),
session,
&candidate_hash,
)?;
let _ = rx.send(candidate_votes.map(Into::into));
let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
if let Some(v) = db::v1::load_candidate_votes(
store,
&config.column_config(),
session_index,
&candidate_hash,
)? {
query_output.push((session_index, candidate_hash, v.into()));
} else {
tracing::debug!(
target: LOG_TARGET,
session_index,
"No votes found for candidate",
);
}
}
let _ = rx.send(query_output);
}
DisputeCoordinatorMessage::IssueLocalStatement(
session,
@@ -332,13 +332,12 @@ fn conflicting_votes_lead_to_dispute_participation() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
session,
candidate_hash,
vec![(session, candidate_hash)],
tx,
),
}).await;
let votes = rx.await.unwrap().unwrap();
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.len(), 1);
assert_eq!(votes.invalid.len(), 1);
}
@@ -360,13 +359,12 @@ fn conflicting_votes_lead_to_dispute_participation() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
session,
candidate_hash,
vec![(session, candidate_hash)],
tx,
),
}).await;
let votes = rx.await.unwrap().unwrap();
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.len(), 1);
assert_eq!(votes.invalid.len(), 2);
}
@@ -430,13 +428,12 @@ fn positive_votes_dont_trigger_participation() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
session,
candidate_hash,
vec![(session, candidate_hash)],
tx,
),
}).await;
let votes = rx.await.unwrap().unwrap();
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.len(), 1);
assert!(votes.invalid.is_empty());
}
@@ -465,13 +462,12 @@ fn positive_votes_dont_trigger_participation() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
session,
candidate_hash,
vec![(session, candidate_hash)],
tx,
),
}).await;
let votes = rx.await.unwrap().unwrap();
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.len(), 2);
assert!(votes.invalid.is_empty());
}
@@ -536,13 +532,12 @@ fn wrong_validator_index_is_ignored() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
session,
candidate_hash,
vec![(session, candidate_hash)],
tx,
),
}).await;
let votes = rx.await.unwrap().unwrap();
let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert!(votes.valid.is_empty());
assert!(votes.invalid.is_empty());
}
+13 -29
View File
@@ -23,7 +23,6 @@ use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
prelude::*,
stream::FuturesOrdered,
};
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, SubsystemSender, jaeger,
@@ -568,37 +567,22 @@ async fn select_disputes(
// Load all votes for all disputes from the coordinator.
let dispute_candidate_votes = {
let mut awaited_votes = FuturesOrdered::new();
let (tx, rx) = oneshot::channel();
sender.send_message(DisputeCoordinatorMessage::QueryCandidateVotes(
recent_disputes,
tx,
).into()).await;
let n_disputes = recent_disputes.len();
for (session_index, candidate_hash) in recent_disputes {
let (tx, rx) = oneshot::channel();
sender.send_message(DisputeCoordinatorMessage::QueryCandidateVotes(
session_index,
candidate_hash,
tx,
).into()).await;
awaited_votes.push(async move {
rx.await
.map_err(Error::CanceledCandidateVotes)
.map(|maybe_votes| maybe_votes.map(|v| (session_index, candidate_hash, v)))
});
}
// Sadly `StreamExt::collect` requires `Default`, so we have to do this more
// manually.
let mut vote_sets = Vec::with_capacity(n_disputes);
while let Some(res) = awaited_votes.next().await {
// sanity check - anything present in recent disputes should have
// candidate votes. but we might race with block import on
// session boundaries.
if let Some(vote_set) = res? {
vote_sets.push(vote_set);
match rx.await {
Ok(v) => v,
Err(oneshot::Canceled) => {
tracing::debug!(
target: LOG_TARGET,
"Unable to query candidate votes - subsystem disconnected?",
);
Vec::new()
}
}
vote_sets
};
// Transform all `CandidateVotes` into `MultiDisputeStatementSet`.