Distribute a PoV after seconding it (#1924)

We need to distribute the PoV after we have seconded it. Other nodes
that will receive our `Secondded` statement and want to validate the
candidate another time will request this PoV from us.
This commit is contained in:
Bastian Köcher
2020-11-06 09:38:44 +01:00
committed by GitHub
parent 2cdc1c0409
commit 8a2911b85d
9 changed files with 63 additions and 43 deletions
@@ -318,7 +318,7 @@ fn erasure_root(
) -> crate::error::Result<Hash> {
let available_data = AvailableData {
validation_data: persisted_validation,
pov,
pov: Arc::new(pov),
};
let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
+6 -6
View File
@@ -282,7 +282,7 @@ fn store_block_works() {
};
let available_data = AvailableData {
pov,
pov: Arc::new(pov),
validation_data: test_state.persisted_validation_data,
};
@@ -335,7 +335,7 @@ fn store_pov_and_query_chunk_works() {
};
let available_data = AvailableData {
pov,
pov: Arc::new(pov),
validation_data: test_state.persisted_validation_data,
};
@@ -433,7 +433,7 @@ fn stored_but_not_included_data_is_pruned() {
};
let available_data = AvailableData {
pov,
pov: Arc::new(pov),
validation_data: test_state.persisted_validation_data,
};
@@ -487,7 +487,7 @@ fn stored_data_kept_until_finalized() {
let candidate_hash = candidate.hash();
let available_data = AvailableData {
pov,
pov: Arc::new(pov),
validation_data: test_state.persisted_validation_data,
};
@@ -726,12 +726,12 @@ fn forkfullness_works() {
let candidate_2_hash = candidate_2.hash();
let available_data_1 = AvailableData {
pov: pov_1,
pov: Arc::new(pov_1),
validation_data: test_state.persisted_validation_data.clone(),
};
let available_data_2 = AvailableData {
pov: pov_2,
pov: Arc::new(pov_2),
validation_data: test_state.persisted_validation_data,
};
+28 -10
View File
@@ -24,10 +24,7 @@ use std::pin::Pin;
use std::sync::Arc;
use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
Future, FutureExt, SinkExt, StreamExt,
};
use futures::{channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_primitives::v1::{
@@ -314,7 +311,7 @@ impl CandidateBackingJob {
async fn validate_and_second(
&mut self,
candidate: &CandidateReceipt,
pov: PoV,
pov: Arc<PoV>,
) -> Result<bool, Error> {
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
@@ -326,7 +323,7 @@ impl CandidateBackingJob {
let valid = self.request_candidate_validation(
candidate.descriptor().clone(),
Arc::new(pov.clone()),
pov.clone(),
).await?;
let candidate_hash = candidate.hash();
@@ -491,14 +488,16 @@ impl CandidateBackingJob {
if self.seconded.is_none() {
// This job has not seconded a candidate yet.
let candidate_hash = candidate.hash();
let pov = Arc::new(pov);
if !self.issued_statements.contains(&candidate_hash) {
if let Ok(true) = self.validate_and_second(
&candidate,
pov,
pov.clone(),
).await {
self.metrics.on_candidate_seconded();
self.seconded = Some(candidate_hash);
self.distribute_pov(candidate.descriptor, pov).await?;
}
}
}
@@ -559,7 +558,7 @@ impl CandidateBackingJob {
ValidationResult::Valid(outputs, validation_data) => {
// If validation produces a new set of commitments, we vote the candidate as invalid.
let commitments_check = self.make_pov_available(
(&*pov).clone(),
pov,
candidate_hash,
validation_data,
outputs,
@@ -633,6 +632,16 @@ impl CandidateBackingJob {
Ok(())
}
async fn distribute_pov(
&mut self,
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<(), Error> {
self.tx_from.send(FromJob::PoVDistribution(
PoVDistributionMessage::DistributePoV(self.parent, descriptor, pov),
)).await.map_err(Into::into)
}
async fn request_pov_from_distribution(
&mut self,
descriptor: CandidateDescriptor,
@@ -696,7 +705,7 @@ impl CandidateBackingJob {
// early without making the PoV available.
async fn make_pov_available<T, E>(
&mut self,
pov: PoV,
pov: Arc<PoV>,
candidate_hash: CandidateHash,
validation_data: polkadot_primitives::v1::PersistedValidationData,
outputs: ValidationOutputs,
@@ -1068,7 +1077,7 @@ mod tests {
fn make_erasure_root(test: &TestState, pov: PoV) -> Hash {
let available_data = AvailableData {
validation_data: test.validation_data.persisted.clone(),
pov,
pov: Arc::new(pov),
};
let chunks = erasure_coding::obtain_chunks_v1(test.validators.len(), &available_data).unwrap();
@@ -1232,6 +1241,15 @@ mod tests {
}
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::PoVDistribution(PoVDistributionMessage::DistributePoV(hash, descriptor, pov_received)) => {
assert_eq!(test_state.relay_parent, hash);
assert_eq!(candidate.descriptor, descriptor);
assert_eq!(pov, *pov_received);
}
);
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
@@ -23,7 +23,7 @@ use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex,
GroupRotationInfo, HeadData, OccupiedCore, PersistedValidationData, PoV, ScheduledCore,
};
use polkadot_subsystem_testhelpers::{self as test_helpers};
use polkadot_subsystem_testhelpers as test_helpers;
use futures::{executor, future, Future};
use futures_timer::Delay;
@@ -241,7 +241,7 @@ impl Default for TestState {
fn make_available_data(test: &TestState, pov: PoV) -> AvailableData {
AvailableData {
validation_data: test.persisted_validation_data.clone(),
pov,
pov: Arc::new(pov),
}
}
@@ -30,12 +30,8 @@ use polkadot_subsystem::{
PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage,
},
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_network_protocol::{v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View};
use futures::prelude::*;
use futures::channel::oneshot;
@@ -43,6 +39,9 @@ use futures::channel::oneshot;
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::sync::Arc;
#[cfg(test)]
mod tests;
const COST_APPARENT_FLOOD: Rep = Rep::new(-500, "Peer appears to be flooding us with PoV requests");
const COST_UNEXPECTED_POV: Rep = Rep::new(-500, "Peer sent us an unexpected PoV");
const COST_AWAITED_NOT_IN_VIEW: Rep
@@ -616,6 +615,3 @@ impl metrics::Metrics for Metrics {
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests;