Validate chunks from disk in availability-recovery (#6078)

* Don't use corrupted chunks from disk.

Otherwise we would be going to dispute the candidate and get slashed.

* Add tests
This commit is contained in:
Robert Klotzner
2022-09-29 14:16:12 +02:00
committed by GitHub
parent 39e55ffd0a
commit 548b4c6c71
2 changed files with 143 additions and 41 deletions
@@ -376,49 +376,20 @@ impl RequestChunksFromValidators {
self.total_received_responses += 1;
match request_result {
Ok(Some(chunk)) => {
// Check merkle proofs of any received chunks.
let validator_index = chunk.index;
if let Ok(anticipated_hash) =
branch_hash(&params.erasure_root, chunk.proof(), chunk.index.0 as usize)
{
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if erasure_chunk_hash != anticipated_hash {
metrics.on_chunk_request_invalid();
self.error_count += 1;
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?validator_index,
"Merkle proof mismatch",
);
} else {
metrics.on_chunk_request_succeeded();
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?validator_index,
"Received valid chunk.",
);
self.received_chunks.insert(validator_index, chunk);
}
Ok(Some(chunk)) =>
if is_chunk_valid(params, &chunk) {
metrics.on_chunk_request_succeeded();
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
"Received valid chunk",
);
self.received_chunks.insert(chunk.index, chunk);
} else {
metrics.on_chunk_request_invalid();
self.error_count += 1;
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?validator_index,
"Invalid Merkle proof",
);
}
},
},
Ok(None) => {
metrics.on_chunk_request_no_such_chunk();
self.error_count += 1;
@@ -507,7 +478,20 @@ impl RequestChunksFromValidators {
self.shuffling.retain(|i| !chunk_indices.contains(i));
for chunk in chunks {
self.received_chunks.insert(chunk.index, chunk);
if is_chunk_valid(params, &chunk) {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
"Found valid chunk on disk"
);
self.received_chunks.insert(chunk.index, chunk);
} else {
gum::error!(
target: LOG_TARGET,
"Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!"
);
};
}
},
Err(oneshot::Canceled) => {
@@ -609,6 +593,35 @@ const fn is_unavailable(
received_chunks + requesting_chunks + unrequested_validators < threshold
}
/// Check validity of a chunk.
fn is_chunk_valid(params: &RecoveryParams, chunk: &ErasureChunk) -> bool {
let anticipated_hash =
match branch_hash(&params.erasure_root, chunk.proof(), chunk.index.0 as usize) {
Ok(hash) => hash,
Err(e) => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
error = ?e,
"Invalid Merkle proof",
);
return false
},
};
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
"Merkle proof mismatch"
);
return false
}
true
}
/// Re-encode the data into erasure chunks in order to verify
/// the root hash of the provided Merkle tree, which is built
/// on-top of the encoded chunks.
@@ -189,6 +189,7 @@ struct TestState {
available_data: AvailableData,
chunks: Vec<ErasureChunk>,
invalid_chunks: Vec<ErasureChunk>,
}
impl TestState {
@@ -273,6 +274,26 @@ impl TestState {
)
}
async fn respond_to_query_all_request_invalid(
&self,
virtual_overseer: &mut VirtualOverseer,
send_chunk: impl Fn(usize) -> bool,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryAllChunks(_, tx)
) => {
let v = self.invalid_chunks.iter()
.filter(|c| send_chunk(c.index.0 as usize))
.cloned()
.collect();
let _ = tx.send(v);
}
)
}
async fn test_chunk_requests(
&self,
candidate_hash: CandidateHash,
@@ -454,6 +475,22 @@ impl Default for TestState {
&available_data,
|_, _| {},
);
// Mess around:
let invalid_chunks = chunks
.iter()
.cloned()
.map(|mut chunk| {
if chunk.chunk.len() >= 2 && chunk.chunk[0] != chunk.chunk[1] {
chunk.chunk[0] = chunk.chunk[1];
} else if chunk.chunk.len() >= 1 {
chunk.chunk[0] = !chunk.chunk[0];
} else {
chunk.proof = Proof::dummy_proof();
}
chunk
})
.collect();
debug_assert_ne!(chunks, invalid_chunks);
candidate.descriptor.erasure_root = erasure_root;
candidate.descriptor.relay_parent = Hash::repeat_byte(10);
@@ -468,6 +505,7 @@ impl Default for TestState {
persisted_validation_data,
available_data,
chunks,
invalid_chunks,
}
}
}
@@ -1284,6 +1322,57 @@ fn does_not_query_local_validator() {
});
}
#[test]
fn invalid_local_chunk_is_ignored() {
let test_state = TestState::default();
test_harness_chunks_only(|mut virtual_overseer, req_cfg| async move {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
})),
)
.await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
None,
tx,
),
)
.await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state
.respond_to_query_all_request_invalid(&mut virtual_overseer, |i| i == 0)
.await;
let candidate_hash = test_state.candidate.hash();
test_state
.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
test_state.threshold() - 1,
|i| if i == 0 { panic!("requested from local validator") } else { Has::Yes },
)
.await;
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
(virtual_overseer, req_cfg)
});
}
#[test]
fn parallel_request_calculation_works_as_expected() {
let num_validators = 100;