mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 18:11:10 +00:00
Don't store available data on disputes (#5950)
* Don't store available data on disputes If there are lots of disputes, this leads to blowing up disk space on validators. Rob luckily remembered that we do store the full availability in participation. The argument in the code does not make too much sense with the current implementation, as no validator will ever request anything else from us, than the one piece we are meant to posess. * Fix warnings. * Fix compile warnings * Remove redundant field. Co-authored-by: Vsevolod Stakhov <vsevolod.stakhov@parity.io>
This commit is contained in:
@@ -890,11 +890,7 @@ impl Initialized {
|
||||
.queue_participation(
|
||||
ctx,
|
||||
priority,
|
||||
ParticipationRequest::new(
|
||||
new_state.candidate_receipt().clone(),
|
||||
session,
|
||||
env.validators().len(),
|
||||
),
|
||||
ParticipationRequest::new(new_state.candidate_receipt().clone(), session),
|
||||
)
|
||||
.await;
|
||||
log_error(r)?;
|
||||
|
||||
@@ -304,7 +304,6 @@ impl DisputeCoordinatorSubsystem {
|
||||
Some(info) => info.validators.clone(),
|
||||
};
|
||||
|
||||
let n_validators = validators.len();
|
||||
let voted_indices = votes.voted_indices();
|
||||
|
||||
// Determine if there are any missing local statements for this dispute. Validators are
|
||||
@@ -335,11 +334,7 @@ impl DisputeCoordinatorSubsystem {
|
||||
if missing_local_statement {
|
||||
participation_requests.push((
|
||||
ParticipationPriority::with_priority_if(is_included),
|
||||
ParticipationRequest::new(
|
||||
votes.candidate_receipt.clone(),
|
||||
session,
|
||||
n_validators,
|
||||
),
|
||||
ParticipationRequest::new(votes.candidate_receipt.clone(), session),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use futures_timer::Delay;
|
||||
|
||||
use polkadot_node_primitives::{ValidationResult, APPROVAL_EXECUTION_TIMEOUT};
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage, CandidateValidationMessage},
|
||||
messages::{AvailabilityRecoveryMessage, CandidateValidationMessage},
|
||||
overseer, ActiveLeavesUpdate, RecoveryError,
|
||||
};
|
||||
use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash;
|
||||
@@ -319,38 +319,6 @@ async fn participate(
|
||||
},
|
||||
};
|
||||
|
||||
// we dispatch a request to store the available data for the candidate. We
|
||||
// want to maximize data availability for other potential checkers involved
|
||||
// in the dispute
|
||||
let (store_available_data_tx, store_available_data_rx) = oneshot::channel();
|
||||
sender
|
||||
.send_message(AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash: *req.candidate_hash(),
|
||||
n_validators: req.n_validators() as u32,
|
||||
available_data: available_data.clone(),
|
||||
tx: store_available_data_tx,
|
||||
})
|
||||
.await;
|
||||
|
||||
match store_available_data_rx.await {
|
||||
Err(oneshot::Canceled) => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
"`Oneshot` got cancelled when storing available data {:?}",
|
||||
req.candidate_hash(),
|
||||
);
|
||||
},
|
||||
Ok(Err(err)) => {
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
?err,
|
||||
"Failed to store available data for candidate {:?}",
|
||||
req.candidate_hash(),
|
||||
);
|
||||
},
|
||||
Ok(Ok(())) => {},
|
||||
}
|
||||
|
||||
// Issue a request to validate the candidate with the provided exhaustive
|
||||
// parameters
|
||||
//
|
||||
|
||||
@@ -78,7 +78,6 @@ pub struct ParticipationRequest {
|
||||
candidate_hash: CandidateHash,
|
||||
candidate_receipt: CandidateReceipt,
|
||||
session: SessionIndex,
|
||||
n_validators: usize,
|
||||
}
|
||||
|
||||
/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
|
||||
@@ -122,12 +121,8 @@ pub enum QueueError {
|
||||
|
||||
impl ParticipationRequest {
|
||||
/// Create a new `ParticipationRequest` to be queued.
|
||||
pub fn new(
|
||||
candidate_receipt: CandidateReceipt,
|
||||
session: SessionIndex,
|
||||
n_validators: usize,
|
||||
) -> Self {
|
||||
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, n_validators }
|
||||
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self {
|
||||
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session }
|
||||
}
|
||||
|
||||
pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
|
||||
@@ -139,9 +134,6 @@ impl ParticipationRequest {
|
||||
pub fn session(&self) -> SessionIndex {
|
||||
self.session
|
||||
}
|
||||
pub fn n_validators(&self) -> usize {
|
||||
self.n_validators
|
||||
}
|
||||
pub fn into_candidate_info(self) -> (CandidateHash, CandidateReceipt) {
|
||||
let Self { candidate_hash, candidate_receipt, .. } = self;
|
||||
(candidate_hash, candidate_receipt)
|
||||
|
||||
@@ -25,7 +25,7 @@ fn make_participation_request(hash: Hash) -> ParticipationRequest {
|
||||
let mut receipt = dummy_candidate_receipt(dummy_hash());
|
||||
// make it differ:
|
||||
receipt.commitments_hash = hash;
|
||||
ParticipationRequest::new(receipt, 1, 100)
|
||||
ParticipationRequest::new(receipt, 1)
|
||||
}
|
||||
|
||||
/// Make dummy comparator for request, based on the given block number.
|
||||
|
||||
@@ -29,10 +29,7 @@ use parity_scale_codec::Encode;
|
||||
use polkadot_node_primitives::{AvailableData, BlockData, InvalidCandidate, PoV};
|
||||
use polkadot_node_subsystem::{
|
||||
jaeger,
|
||||
messages::{
|
||||
AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest,
|
||||
ValidationFailed,
|
||||
},
|
||||
messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest},
|
||||
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SpawnGlue,
|
||||
};
|
||||
use polkadot_node_subsystem_test_helpers::{
|
||||
@@ -71,9 +68,8 @@ async fn participate_with_commitments_hash<Context>(
|
||||
receipt
|
||||
};
|
||||
let session = 1;
|
||||
let n_validators = 10;
|
||||
|
||||
let req = ParticipationRequest::new(candidate_receipt, session, n_validators);
|
||||
let req = ParticipationRequest::new(candidate_receipt, session);
|
||||
|
||||
participation
|
||||
.queue_participation(ctx, ParticipationPriority::BestEffort, req)
|
||||
@@ -116,7 +112,6 @@ pub async fn participation_full_happy_path(
|
||||
) {
|
||||
recover_available_data(ctx_handle).await;
|
||||
fetch_validation_code(ctx_handle).await;
|
||||
store_available_data(ctx_handle, true).await;
|
||||
|
||||
assert_matches!(
|
||||
ctx_handle.recv().await,
|
||||
@@ -185,20 +180,6 @@ async fn fetch_validation_code(virtual_overseer: &mut VirtualOverseer) -> Hash {
|
||||
)
|
||||
}
|
||||
|
||||
async fn store_available_data(virtual_overseer: &mut VirtualOverseer, success: bool) {
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData { tx, .. }) => {
|
||||
if success {
|
||||
tx.send(Ok(())).unwrap();
|
||||
} else {
|
||||
tx.send(Err(())).unwrap();
|
||||
}
|
||||
},
|
||||
"overseer did not receive store available data request",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn same_req_wont_get_queued_if_participation_is_already_running() {
|
||||
futures::executor::block_on(async {
|
||||
@@ -423,7 +404,6 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() {
|
||||
fetch_validation_code(&mut ctx_handle).await,
|
||||
participation.recent_block.unwrap().1
|
||||
);
|
||||
store_available_data(&mut ctx_handle, true).await;
|
||||
|
||||
assert_matches!(
|
||||
ctx_handle.recv().await,
|
||||
@@ -461,7 +441,6 @@ fn cast_invalid_vote_if_commitments_dont_match() {
|
||||
fetch_validation_code(&mut ctx_handle).await,
|
||||
participation.recent_block.unwrap().1
|
||||
);
|
||||
store_available_data(&mut ctx_handle, true).await;
|
||||
|
||||
assert_matches!(
|
||||
ctx_handle.recv().await,
|
||||
@@ -499,7 +478,6 @@ fn cast_valid_vote_if_validation_passes() {
|
||||
fetch_validation_code(&mut ctx_handle).await,
|
||||
participation.recent_block.unwrap().1
|
||||
);
|
||||
store_available_data(&mut ctx_handle, true).await;
|
||||
|
||||
assert_matches!(
|
||||
ctx_handle.recv().await,
|
||||
@@ -521,42 +499,3 @@ fn cast_valid_vote_if_validation_passes() {
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn failure_to_store_available_data_does_not_preclude_participation() {
|
||||
futures::executor::block_on(async {
|
||||
let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new());
|
||||
|
||||
let (sender, mut worker_receiver) = mpsc::channel(1);
|
||||
let mut participation = Participation::new(sender);
|
||||
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
|
||||
participate(&mut ctx, &mut participation).await.unwrap();
|
||||
|
||||
recover_available_data(&mut ctx_handle).await;
|
||||
assert_eq!(
|
||||
fetch_validation_code(&mut ctx_handle).await,
|
||||
participation.recent_block.unwrap().1
|
||||
);
|
||||
// the store available data request should fail:
|
||||
store_available_data(&mut ctx_handle, false).await;
|
||||
|
||||
assert_matches!(
|
||||
ctx_handle.recv().await,
|
||||
AllMessages::CandidateValidation(
|
||||
CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx)
|
||||
) if timeout == APPROVAL_EXECUTION_TIMEOUT => {
|
||||
tx.send(Err(ValidationFailed("fail".to_string()))).unwrap();
|
||||
},
|
||||
"overseer did not receive candidate validation message",
|
||||
);
|
||||
|
||||
let result = participation
|
||||
.get_participation_result(&mut ctx, worker_receiver.next().await.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_matches!(
|
||||
result.outcome,
|
||||
ParticipationOutcome::Invalid => {}
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user