From 2d66b8f2564f2b570ee0a72c7d228e3f7f8a6103 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Mon, 12 Jul 2021 21:06:14 -0400 Subject: [PATCH] Dispute Coordinator: Batch queries (#3459) * disputes: Allow batch queries in dispute-coordinator This commit moves to batch queries when responding to QueryCandidateVotes messages. This simplifies the code in the provisioner and dispute-coordinator by no longer requiring to make use of a FuturesOrdered when awaiting multiple quries. Instead, the provisioner need only request the batch itself. * node/approval-voting: Address Feedback to fail on query element missing. * Address feedback * Fix implementer's guide --- .../node/core/dispute-coordinator/src/lib.rs | 29 ++++++++----- .../core/dispute-coordinator/src/tests.rs | 25 +++++------ polkadot/node/core/provisioner/src/lib.rs | 42 ++++++------------- .../network/dispute-distribution/src/lib.rs | 3 +- .../dispute-distribution/src/sender/mod.rs | 7 ++-- .../dispute-distribution/src/tests/mod.rs | 24 +++++------ polkadot/node/subsystem-types/src/messages.rs | 5 ++- .../src/node/disputes/dispute-coordinator.md | 5 ++- 8 files changed, 66 insertions(+), 74 deletions(-) diff --git a/polkadot/node/core/dispute-coordinator/src/lib.rs b/polkadot/node/core/dispute-coordinator/src/lib.rs index c311529acd..1b7fc24a22 100644 --- a/polkadot/node/core/dispute-coordinator/src/lib.rs +++ b/polkadot/node/core/dispute-coordinator/src/lib.rs @@ -444,18 +444,27 @@ async fn handle_incoming( let _ = rx.send(collect_active(recent_disputes, now)); } DisputeCoordinatorMessage::QueryCandidateVotes( - session, - candidate_hash, + query, rx ) => { - let candidate_votes = db::v1::load_candidate_votes( - store, - &config.column_config(), - session, - &candidate_hash, - )?; - - let _ = rx.send(candidate_votes.map(Into::into)); + let mut query_output = Vec::new(); + for (session_index, candidate_hash) in query.into_iter() { + if let Some(v) = db::v1::load_candidate_votes( + store, + &config.column_config(), + session_index, + &candidate_hash, + )? { + query_output.push((session_index, candidate_hash, v.into())); + } else { + tracing::debug!( + target: LOG_TARGET, + session_index, + "No votes found for candidate", + ); + } + } + let _ = rx.send(query_output); } DisputeCoordinatorMessage::IssueLocalStatement( session, diff --git a/polkadot/node/core/dispute-coordinator/src/tests.rs b/polkadot/node/core/dispute-coordinator/src/tests.rs index ed5a7da062..860a3e31e2 100644 --- a/polkadot/node/core/dispute-coordinator/src/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/tests.rs @@ -332,13 +332,12 @@ fn conflicting_votes_lead_to_dispute_participation() { let (tx, rx) = oneshot::channel(); virtual_overseer.send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::QueryCandidateVotes( - session, - candidate_hash, + vec![(session, candidate_hash)], tx, ), }).await; - let votes = rx.await.unwrap().unwrap(); + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); assert_eq!(votes.valid.len(), 1); assert_eq!(votes.invalid.len(), 1); } @@ -360,13 +359,12 @@ fn conflicting_votes_lead_to_dispute_participation() { let (tx, rx) = oneshot::channel(); virtual_overseer.send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::QueryCandidateVotes( - session, - candidate_hash, + vec![(session, candidate_hash)], tx, ), }).await; - let votes = rx.await.unwrap().unwrap(); + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); assert_eq!(votes.valid.len(), 1); assert_eq!(votes.invalid.len(), 2); } @@ -430,13 +428,12 @@ fn positive_votes_dont_trigger_participation() { let (tx, rx) = oneshot::channel(); virtual_overseer.send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::QueryCandidateVotes( - session, - candidate_hash, + vec![(session, candidate_hash)], tx, ), }).await; - let votes = rx.await.unwrap().unwrap(); + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); assert_eq!(votes.valid.len(), 1); assert!(votes.invalid.is_empty()); } @@ -465,13 +462,12 @@ fn positive_votes_dont_trigger_participation() { let (tx, rx) = oneshot::channel(); virtual_overseer.send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::QueryCandidateVotes( - session, - candidate_hash, + vec![(session, candidate_hash)], tx, ), }).await; - let votes = rx.await.unwrap().unwrap(); + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); assert_eq!(votes.valid.len(), 2); assert!(votes.invalid.is_empty()); } @@ -536,13 +532,12 @@ fn wrong_validator_index_is_ignored() { let (tx, rx) = oneshot::channel(); virtual_overseer.send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::QueryCandidateVotes( - session, - candidate_hash, + vec![(session, candidate_hash)], tx, ), }).await; - let votes = rx.await.unwrap().unwrap(); + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); assert!(votes.valid.is_empty()); assert!(votes.invalid.is_empty()); } diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index e1ca828423..f9c87468f0 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -23,7 +23,6 @@ use bitvec::vec::BitVec; use futures::{ channel::{mpsc, oneshot}, prelude::*, - stream::FuturesOrdered, }; use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, SubsystemSender, jaeger, @@ -568,37 +567,22 @@ async fn select_disputes( // Load all votes for all disputes from the coordinator. let dispute_candidate_votes = { - let mut awaited_votes = FuturesOrdered::new(); + let (tx, rx) = oneshot::channel(); + sender.send_message(DisputeCoordinatorMessage::QueryCandidateVotes( + recent_disputes, + tx, + ).into()).await; - let n_disputes = recent_disputes.len(); - for (session_index, candidate_hash) in recent_disputes { - let (tx, rx) = oneshot::channel(); - sender.send_message(DisputeCoordinatorMessage::QueryCandidateVotes( - session_index, - candidate_hash, - tx, - ).into()).await; - - awaited_votes.push(async move { - rx.await - .map_err(Error::CanceledCandidateVotes) - .map(|maybe_votes| maybe_votes.map(|v| (session_index, candidate_hash, v))) - }); - } - - // Sadly `StreamExt::collect` requires `Default`, so we have to do this more - // manually. - let mut vote_sets = Vec::with_capacity(n_disputes); - while let Some(res) = awaited_votes.next().await { - // sanity check - anything present in recent disputes should have - // candidate votes. but we might race with block import on - // session boundaries. - if let Some(vote_set) = res? { - vote_sets.push(vote_set); + match rx.await { + Ok(v) => v, + Err(oneshot::Canceled) => { + tracing::debug!( + target: LOG_TARGET, + "Unable to query candidate votes - subsystem disconnected?", + ); + Vec::new() } } - - vote_sets }; // Transform all `CandidateVotes` into `MultiDisputeStatementSet`. diff --git a/polkadot/node/network/dispute-distribution/src/lib.rs b/polkadot/node/network/dispute-distribution/src/lib.rs index c9e53ad448..2f5d6de0a5 100644 --- a/polkadot/node/network/dispute-distribution/src/lib.rs +++ b/polkadot/node/network/dispute-distribution/src/lib.rs @@ -14,7 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . - //! # Sending and receiving of `DisputeRequest`s. //! //! This subsystem essentially consists of two parts: @@ -136,7 +135,7 @@ where } } -impl DisputeDistributionSubsystem +impl DisputeDistributionSubsystem where AD: AuthorityDiscovery + Clone, { diff --git a/polkadot/node/network/dispute-distribution/src/sender/mod.rs b/polkadot/node/network/dispute-distribution/src/sender/mod.rs index d8f6e19621..93e4879272 100644 --- a/polkadot/node/network/dispute-distribution/src/sender/mod.rs +++ b/polkadot/node/network/dispute-distribution/src/sender/mod.rs @@ -352,11 +352,12 @@ async fn get_candidate_votes( let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::DisputeCoordinator( DisputeCoordinatorMessage::QueryCandidateVotes( - session_index, - candidate_hash, + vec![(session_index, candidate_hash)], tx ) )) .await; - rx.await.map_err(|_| NonFatal::AskCandidateVotesCanceled) + rx.await + .map(|v| v.get(0).map(|inner| inner.to_owned().2)) + .map_err(|_| NonFatal::AskCandidateVotesCanceled) } diff --git a/polkadot/node/network/dispute-distribution/src/tests/mod.rs b/polkadot/node/network/dispute-distribution/src/tests/mod.rs index ca53545976..f973536a46 100644 --- a/polkadot/node/network/dispute-distribution/src/tests/mod.rs +++ b/polkadot/node/network/dispute-distribution/src/tests/mod.rs @@ -151,9 +151,9 @@ fn received_request_triggers_import() { message.clone().into(), ImportStatementsResult::InvalidImport, true, - move |handle, req_tx, message| + move |handle, req_tx, message| nested_network_dispute_request( - handle, + handle, req_tx, MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob), message.clone().into(), @@ -226,8 +226,8 @@ fn received_request_triggers_import() { // But should work fine for Bob: nested_network_dispute_request( - &mut handle, - &mut req_tx, + &mut handle, + &mut req_tx, MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob), message.clone().into(), ImportStatementsResult::ValidImport, @@ -258,15 +258,15 @@ fn disputes_are_recovered_at_startup() { handle.recv().await, AllMessages::DisputeCoordinator( DisputeCoordinatorMessage::QueryCandidateVotes( - session_index, - candidate_hash, + query, tx, ) ) => { + let (session_index, candidate_hash) = query.get(0).unwrap().clone(); assert_eq!(session_index, MOCK_SESSION_INDEX); assert_eq!(candidate_hash, candidate.hash()); let unchecked: UncheckedDisputeMessage = message.into(); - tx.send(Some(CandidateVotes { + tx.send(vec![(session_index, candidate_hash, CandidateVotes { candidate_receipt: candidate, valid: vec![( unchecked.valid_vote.kind, @@ -278,7 +278,7 @@ fn disputes_are_recovered_at_startup() { unchecked.invalid_vote.validator_index, unchecked.invalid_vote.signature )], - })) + })]) .expect("Receiver should stay alive."); } ); @@ -473,7 +473,7 @@ async fn send_network_dispute_request( ) -> oneshot::Receiver { let (pending_response, rx_response) = oneshot::channel(); let req = sc_network::config::IncomingRequest { - peer, + peer, payload: message.encode(), pending_response, }; @@ -492,7 +492,7 @@ async fn nested_network_dispute_request<'a, F, O>( import_result: ImportStatementsResult, need_session_info: bool, inner: F, -) +) where F: FnOnce( &'a mut TestSubsystemContextHandle, @@ -539,7 +539,7 @@ async fn nested_network_dispute_request<'a, F, O>( pending_confirmation } ); - + // Do the inner thing: inner(handle, req_tx, message).await; @@ -558,7 +558,7 @@ async fn nested_network_dispute_request<'a, F, O>( match import_result { ImportStatementsResult::ValidImport => { let result = result.unwrap(); - let decoded = + let decoded = ::decode(&mut result.as_slice()).unwrap(); assert!(decoded == DisputeResponse::Confirmed); diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 90c3d77257..b2f44e986d 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -232,7 +232,10 @@ pub enum DisputeCoordinatorMessage { /// These disputes are either unconcluded or recently concluded. ActiveDisputes(oneshot::Sender>), /// Get candidate votes for a candidate. - QueryCandidateVotes(SessionIndex, CandidateHash, oneshot::Sender>), + QueryCandidateVotes( + Vec<(SessionIndex, CandidateHash)>, + oneshot::Sender>, + ), /// Sign and issue local dispute votes. A value of `true` indicates validity, and `false` invalidity. IssueLocalStatement(SessionIndex, CandidateHash, CandidateReceipt, bool), /// Determine the highest undisputed block within the given chain, based on where candidates diff --git a/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md b/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md index c8b1750357..b2a5fe1021 100644 --- a/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md +++ b/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md @@ -129,7 +129,7 @@ Do nothing. according to the `SessionInfo` of the dispute candidate's session, the `DisputeStatus` should be set to `ConcludedPositive(now)` unless it was already `ConcludedNegative`. -13. If the dispute now has supermajority votes in the "invalid" direction, +13. If the dispute now has supermajority votes in the "invalid" direction, the `DisputeStatus` should be set to `ConcludedNegative(now)`. If it was `ConcludedPositive` before, the timestamp `now` should be copied from the previous status. It will be pruned after some time and all chains @@ -143,7 +143,8 @@ Do nothing. ### On `DisputeCoordinatorMessage::QueryCandidateVotes` -* Load `"candidate-votes"` and return the data within or `None` if missing. +* Load `"candidate-votes"` for every `(SessionIndex, CandidateHash)` in the query and return data within each `CandidateVote`. + If a particular `candidate-vote` is missing, that particular request is ommitted from the response. ### On `DisputeCoordinatorMessage::IssueLocalStatement`