diff --git a/polkadot/node/core/dispute-coordinator/src/lib.rs b/polkadot/node/core/dispute-coordinator/src/lib.rs
index c311529acd..1b7fc24a22 100644
--- a/polkadot/node/core/dispute-coordinator/src/lib.rs
+++ b/polkadot/node/core/dispute-coordinator/src/lib.rs
@@ -444,18 +444,27 @@ async fn handle_incoming(
let _ = rx.send(collect_active(recent_disputes, now));
}
DisputeCoordinatorMessage::QueryCandidateVotes(
- session,
- candidate_hash,
+ query,
rx
) => {
- let candidate_votes = db::v1::load_candidate_votes(
- store,
- &config.column_config(),
- session,
- &candidate_hash,
- )?;
-
- let _ = rx.send(candidate_votes.map(Into::into));
+ let mut query_output = Vec::new();
+ for (session_index, candidate_hash) in query.into_iter() {
+ if let Some(v) = db::v1::load_candidate_votes(
+ store,
+ &config.column_config(),
+ session_index,
+ &candidate_hash,
+ )? {
+ query_output.push((session_index, candidate_hash, v.into()));
+ } else {
+ tracing::debug!(
+ target: LOG_TARGET,
+ session_index,
+ "No votes found for candidate",
+ );
+ }
+ }
+ let _ = rx.send(query_output);
}
DisputeCoordinatorMessage::IssueLocalStatement(
session,
diff --git a/polkadot/node/core/dispute-coordinator/src/tests.rs b/polkadot/node/core/dispute-coordinator/src/tests.rs
index ed5a7da062..860a3e31e2 100644
--- a/polkadot/node/core/dispute-coordinator/src/tests.rs
+++ b/polkadot/node/core/dispute-coordinator/src/tests.rs
@@ -332,13 +332,12 @@ fn conflicting_votes_lead_to_dispute_participation() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
- session,
- candidate_hash,
+ vec![(session, candidate_hash)],
tx,
),
}).await;
- let votes = rx.await.unwrap().unwrap();
+ let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.len(), 1);
assert_eq!(votes.invalid.len(), 1);
}
@@ -360,13 +359,12 @@ fn conflicting_votes_lead_to_dispute_participation() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
- session,
- candidate_hash,
+ vec![(session, candidate_hash)],
tx,
),
}).await;
- let votes = rx.await.unwrap().unwrap();
+ let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.len(), 1);
assert_eq!(votes.invalid.len(), 2);
}
@@ -430,13 +428,12 @@ fn positive_votes_dont_trigger_participation() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
- session,
- candidate_hash,
+ vec![(session, candidate_hash)],
tx,
),
}).await;
- let votes = rx.await.unwrap().unwrap();
+ let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.len(), 1);
assert!(votes.invalid.is_empty());
}
@@ -465,13 +462,12 @@ fn positive_votes_dont_trigger_participation() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
- session,
- candidate_hash,
+ vec![(session, candidate_hash)],
tx,
),
}).await;
- let votes = rx.await.unwrap().unwrap();
+ let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert_eq!(votes.valid.len(), 2);
assert!(votes.invalid.is_empty());
}
@@ -536,13 +532,12 @@ fn wrong_validator_index_is_ignored() {
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::QueryCandidateVotes(
- session,
- candidate_hash,
+ vec![(session, candidate_hash)],
tx,
),
}).await;
- let votes = rx.await.unwrap().unwrap();
+ let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone();
assert!(votes.valid.is_empty());
assert!(votes.invalid.is_empty());
}
diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs
index e1ca828423..f9c87468f0 100644
--- a/polkadot/node/core/provisioner/src/lib.rs
+++ b/polkadot/node/core/provisioner/src/lib.rs
@@ -23,7 +23,6 @@ use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
prelude::*,
- stream::FuturesOrdered,
};
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, SubsystemSender, jaeger,
@@ -568,37 +567,22 @@ async fn select_disputes(
// Load all votes for all disputes from the coordinator.
let dispute_candidate_votes = {
- let mut awaited_votes = FuturesOrdered::new();
+ let (tx, rx) = oneshot::channel();
+ sender.send_message(DisputeCoordinatorMessage::QueryCandidateVotes(
+ recent_disputes,
+ tx,
+ ).into()).await;
- let n_disputes = recent_disputes.len();
- for (session_index, candidate_hash) in recent_disputes {
- let (tx, rx) = oneshot::channel();
- sender.send_message(DisputeCoordinatorMessage::QueryCandidateVotes(
- session_index,
- candidate_hash,
- tx,
- ).into()).await;
-
- awaited_votes.push(async move {
- rx.await
- .map_err(Error::CanceledCandidateVotes)
- .map(|maybe_votes| maybe_votes.map(|v| (session_index, candidate_hash, v)))
- });
- }
-
- // Sadly `StreamExt::collect` requires `Default`, so we have to do this more
- // manually.
- let mut vote_sets = Vec::with_capacity(n_disputes);
- while let Some(res) = awaited_votes.next().await {
- // sanity check - anything present in recent disputes should have
- // candidate votes. but we might race with block import on
- // session boundaries.
- if let Some(vote_set) = res? {
- vote_sets.push(vote_set);
+ match rx.await {
+ Ok(v) => v,
+ Err(oneshot::Canceled) => {
+ tracing::debug!(
+ target: LOG_TARGET,
+ "Unable to query candidate votes - subsystem disconnected?",
+ );
+ Vec::new()
}
}
-
- vote_sets
};
// Transform all `CandidateVotes` into `MultiDisputeStatementSet`.
diff --git a/polkadot/node/network/dispute-distribution/src/lib.rs b/polkadot/node/network/dispute-distribution/src/lib.rs
index c9e53ad448..2f5d6de0a5 100644
--- a/polkadot/node/network/dispute-distribution/src/lib.rs
+++ b/polkadot/node/network/dispute-distribution/src/lib.rs
@@ -14,7 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-
//! # Sending and receiving of `DisputeRequest`s.
//!
//! This subsystem essentially consists of two parts:
@@ -136,7 +135,7 @@ where
}
}
-impl DisputeDistributionSubsystem
+impl DisputeDistributionSubsystem
where
AD: AuthorityDiscovery + Clone,
{
diff --git a/polkadot/node/network/dispute-distribution/src/sender/mod.rs b/polkadot/node/network/dispute-distribution/src/sender/mod.rs
index d8f6e19621..93e4879272 100644
--- a/polkadot/node/network/dispute-distribution/src/sender/mod.rs
+++ b/polkadot/node/network/dispute-distribution/src/sender/mod.rs
@@ -352,11 +352,12 @@ async fn get_candidate_votes(
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::QueryCandidateVotes(
- session_index,
- candidate_hash,
+ vec![(session_index, candidate_hash)],
tx
)
))
.await;
- rx.await.map_err(|_| NonFatal::AskCandidateVotesCanceled)
+ rx.await
+ .map(|v| v.get(0).map(|inner| inner.to_owned().2))
+ .map_err(|_| NonFatal::AskCandidateVotesCanceled)
}
diff --git a/polkadot/node/network/dispute-distribution/src/tests/mod.rs b/polkadot/node/network/dispute-distribution/src/tests/mod.rs
index ca53545976..f973536a46 100644
--- a/polkadot/node/network/dispute-distribution/src/tests/mod.rs
+++ b/polkadot/node/network/dispute-distribution/src/tests/mod.rs
@@ -151,9 +151,9 @@ fn received_request_triggers_import() {
message.clone().into(),
ImportStatementsResult::InvalidImport,
true,
- move |handle, req_tx, message|
+ move |handle, req_tx, message|
nested_network_dispute_request(
- handle,
+ handle,
req_tx,
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob),
message.clone().into(),
@@ -226,8 +226,8 @@ fn received_request_triggers_import() {
// But should work fine for Bob:
nested_network_dispute_request(
- &mut handle,
- &mut req_tx,
+ &mut handle,
+ &mut req_tx,
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob),
message.clone().into(),
ImportStatementsResult::ValidImport,
@@ -258,15 +258,15 @@ fn disputes_are_recovered_at_startup() {
handle.recv().await,
AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::QueryCandidateVotes(
- session_index,
- candidate_hash,
+ query,
tx,
)
) => {
+ let (session_index, candidate_hash) = query.get(0).unwrap().clone();
assert_eq!(session_index, MOCK_SESSION_INDEX);
assert_eq!(candidate_hash, candidate.hash());
let unchecked: UncheckedDisputeMessage = message.into();
- tx.send(Some(CandidateVotes {
+ tx.send(vec![(session_index, candidate_hash, CandidateVotes {
candidate_receipt: candidate,
valid: vec![(
unchecked.valid_vote.kind,
@@ -278,7 +278,7 @@ fn disputes_are_recovered_at_startup() {
unchecked.invalid_vote.validator_index,
unchecked.invalid_vote.signature
)],
- }))
+ })])
.expect("Receiver should stay alive.");
}
);
@@ -473,7 +473,7 @@ async fn send_network_dispute_request(
) -> oneshot::Receiver {
let (pending_response, rx_response) = oneshot::channel();
let req = sc_network::config::IncomingRequest {
- peer,
+ peer,
payload: message.encode(),
pending_response,
};
@@ -492,7 +492,7 @@ async fn nested_network_dispute_request<'a, F, O>(
import_result: ImportStatementsResult,
need_session_info: bool,
inner: F,
-)
+)
where
F: FnOnce(
&'a mut TestSubsystemContextHandle,
@@ -539,7 +539,7 @@ async fn nested_network_dispute_request<'a, F, O>(
pending_confirmation
}
);
-
+
// Do the inner thing:
inner(handle, req_tx, message).await;
@@ -558,7 +558,7 @@ async fn nested_network_dispute_request<'a, F, O>(
match import_result {
ImportStatementsResult::ValidImport => {
let result = result.unwrap();
- let decoded =
+ let decoded =
::decode(&mut result.as_slice()).unwrap();
assert!(decoded == DisputeResponse::Confirmed);
diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs
index 90c3d77257..b2f44e986d 100644
--- a/polkadot/node/subsystem-types/src/messages.rs
+++ b/polkadot/node/subsystem-types/src/messages.rs
@@ -232,7 +232,10 @@ pub enum DisputeCoordinatorMessage {
/// These disputes are either unconcluded or recently concluded.
ActiveDisputes(oneshot::Sender>),
/// Get candidate votes for a candidate.
- QueryCandidateVotes(SessionIndex, CandidateHash, oneshot::Sender