mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 11:01:01 +00:00
cumulus-pov-recovery: check pov_hash instead of reencoding data (#2287)
Collators were previously reencoding the available data and checking the erasure root. Replace that with just checking the PoV hash, which consumes much less CPU and takes less time. We also don't need to check the `PersistedValidationData` hash, as collators don't use it. Reason: https://github.com/paritytech/polkadot-sdk/issues/575#issuecomment-1806572230 After systematic chunks recovery is merged, collators will no longer do any reed-solomon encoding/decoding, which has proven to be a great CPU consumer. Signed-off-by: alindima <alin@parity.io>
This commit is contained in:
@@ -16,12 +16,12 @@
|
|||||||
|
|
||||||
use sp_runtime::traits::Block as BlockT;
|
use sp_runtime::traits::Block as BlockT;
|
||||||
|
|
||||||
use polkadot_node_primitives::AvailableData;
|
use polkadot_node_primitives::PoV;
|
||||||
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
|
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
|
||||||
|
|
||||||
use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
|
use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
|
||||||
|
|
||||||
use std::{collections::HashSet, pin::Pin};
|
use std::{collections::HashSet, pin::Pin, sync::Arc};
|
||||||
|
|
||||||
use crate::RecoveryHandle;
|
use crate::RecoveryHandle;
|
||||||
|
|
||||||
@@ -30,9 +30,8 @@ use crate::RecoveryHandle;
|
|||||||
/// This handles the candidate recovery and tracks the activate recoveries.
|
/// This handles the candidate recovery and tracks the activate recoveries.
|
||||||
pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
|
pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
|
||||||
/// The recoveries that are currently being executed.
|
/// The recoveries that are currently being executed.
|
||||||
recoveries: FuturesUnordered<
|
recoveries:
|
||||||
Pin<Box<dyn Future<Output = (Block::Hash, Option<AvailableData>)> + Send>>,
|
FuturesUnordered<Pin<Box<dyn Future<Output = (Block::Hash, Option<Arc<PoV>>)> + Send>>>,
|
||||||
>,
|
|
||||||
/// The block hashes of the candidates currently being recovered.
|
/// The block hashes of the candidates currently being recovered.
|
||||||
candidates: HashSet<Block::Hash>,
|
candidates: HashSet<Block::Hash>,
|
||||||
recovery_handle: Box<dyn RecoveryHandle>,
|
recovery_handle: Box<dyn RecoveryHandle>,
|
||||||
@@ -68,7 +67,7 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
|
|||||||
self.recoveries.push(
|
self.recoveries.push(
|
||||||
async move {
|
async move {
|
||||||
match rx.await {
|
match rx.await {
|
||||||
Ok(Ok(res)) => (block_hash, Some(res)),
|
Ok(Ok(res)) => (block_hash, Some(res.pov)),
|
||||||
Ok(Err(error)) => {
|
Ok(Err(error)) => {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
target: crate::LOG_TARGET,
|
target: crate::LOG_TARGET,
|
||||||
@@ -93,8 +92,8 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
|
|||||||
|
|
||||||
/// Waits for the next recovery.
|
/// Waits for the next recovery.
|
||||||
///
|
///
|
||||||
/// If the returned [`AvailableData`] is `None`, it means that the recovery failed.
|
/// If the returned [`PoV`] is `None`, it means that the recovery failed.
|
||||||
pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<AvailableData>) {
|
pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<Arc<PoV>>) {
|
||||||
loop {
|
loop {
|
||||||
if let Some(res) = self.recoveries.next().await {
|
if let Some(res) = self.recoveries.next().await {
|
||||||
self.candidates.remove(&res.0);
|
self.candidates.remove(&res.0);
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
|
|||||||
use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
|
use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
|
||||||
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
||||||
|
|
||||||
use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
|
use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
|
||||||
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
|
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
|
||||||
use polkadot_overseer::Handle as OverseerHandle;
|
use polkadot_overseer::Handle as OverseerHandle;
|
||||||
use polkadot_primitives::{
|
use polkadot_primitives::{
|
||||||
@@ -346,15 +346,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a recovered candidate.
|
/// Handle a recovered candidate.
|
||||||
async fn handle_candidate_recovered(
|
async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) {
|
||||||
&mut self,
|
let pov = match pov {
|
||||||
block_hash: Block::Hash,
|
Some(pov) => {
|
||||||
available_data: Option<AvailableData>,
|
|
||||||
) {
|
|
||||||
let available_data = match available_data {
|
|
||||||
Some(data) => {
|
|
||||||
self.candidates_in_retry.remove(&block_hash);
|
self.candidates_in_retry.remove(&block_hash);
|
||||||
data
|
pov
|
||||||
},
|
},
|
||||||
None =>
|
None =>
|
||||||
if self.candidates_in_retry.insert(block_hash) {
|
if self.candidates_in_retry.insert(block_hash) {
|
||||||
@@ -373,18 +369,16 @@ where
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let raw_block_data = match sp_maybe_compressed_blob::decompress(
|
let raw_block_data =
|
||||||
&available_data.pov.block_data.0,
|
match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
|
||||||
POV_BOMB_LIMIT,
|
Ok(r) => r,
|
||||||
) {
|
Err(error) => {
|
||||||
Ok(r) => r,
|
tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
|
||||||
Err(error) => {
|
|
||||||
tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
|
|
||||||
|
|
||||||
self.reset_candidate(block_hash);
|
self.reset_candidate(block_hash);
|
||||||
return
|
return
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let block_data = match ParachainBlockData::<Block>::decode(&mut &raw_block_data[..]) {
|
let block_data = match ParachainBlockData::<Block>::decode(&mut &raw_block_data[..]) {
|
||||||
Ok(d) => d,
|
Ok(d) => d,
|
||||||
@@ -595,10 +589,10 @@ where
|
|||||||
next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
|
next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
|
||||||
self.recover_candidate(next_to_recover).await;
|
self.recover_candidate(next_to_recover).await;
|
||||||
},
|
},
|
||||||
(block_hash, available_data) =
|
(block_hash, pov) =
|
||||||
self.active_candidate_recovery.wait_for_recovery().fuse() =>
|
self.active_candidate_recovery.wait_for_recovery().fuse() =>
|
||||||
{
|
{
|
||||||
self.handle_candidate_recovered(block_hash, available_data).await;
|
self.handle_candidate_recovered(block_hash, pov.as_deref()).await;
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ fn build_overseer(
|
|||||||
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
|
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
|
||||||
let builder = Overseer::builder()
|
let builder = Overseer::builder()
|
||||||
.availability_distribution(DummySubsystem)
|
.availability_distribution(DummySubsystem)
|
||||||
.availability_recovery(AvailabilityRecoverySubsystem::with_availability_store_skip(
|
.availability_recovery(AvailabilityRecoverySubsystem::for_collator(
|
||||||
available_data_req_receiver,
|
available_data_req_receiver,
|
||||||
Metrics::register(registry)?,
|
Metrics::register(registry)?,
|
||||||
))
|
))
|
||||||
|
|||||||
@@ -105,6 +105,17 @@ pub struct AvailabilityRecoverySubsystem {
|
|||||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||||
/// Metrics for this subsystem.
|
/// Metrics for this subsystem.
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
|
/// The type of check to perform after available data was recovered.
|
||||||
|
post_recovery_check: PostRecoveryCheck,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, PartialEq, Debug)]
|
||||||
|
/// The type of check to perform after available data was recovered.
|
||||||
|
pub enum PostRecoveryCheck {
|
||||||
|
/// Reencode the data and check erasure root. For validators.
|
||||||
|
Reencode,
|
||||||
|
/// Only check the pov hash. For collators only.
|
||||||
|
PovHash,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Expensive erasure coding computations that we want to run on a blocking thread.
|
/// Expensive erasure coding computations that we want to run on a blocking thread.
|
||||||
@@ -344,6 +355,7 @@ async fn launch_recovery_task<Context>(
|
|||||||
metrics: &Metrics,
|
metrics: &Metrics,
|
||||||
recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
|
recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
|
||||||
bypass_availability_store: bool,
|
bypass_availability_store: bool,
|
||||||
|
post_recovery_check: PostRecoveryCheck,
|
||||||
) -> error::Result<()> {
|
) -> error::Result<()> {
|
||||||
let candidate_hash = receipt.hash();
|
let candidate_hash = receipt.hash();
|
||||||
let params = RecoveryParams {
|
let params = RecoveryParams {
|
||||||
@@ -354,6 +366,8 @@ async fn launch_recovery_task<Context>(
|
|||||||
erasure_root: receipt.descriptor.erasure_root,
|
erasure_root: receipt.descriptor.erasure_root,
|
||||||
metrics: metrics.clone(),
|
metrics: metrics.clone(),
|
||||||
bypass_availability_store,
|
bypass_availability_store,
|
||||||
|
post_recovery_check,
|
||||||
|
pov_hash: receipt.descriptor.pov_hash,
|
||||||
};
|
};
|
||||||
|
|
||||||
let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
|
let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
|
||||||
@@ -390,6 +404,7 @@ async fn handle_recover<Context>(
|
|||||||
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
|
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
|
||||||
recovery_strategy_kind: RecoveryStrategyKind,
|
recovery_strategy_kind: RecoveryStrategyKind,
|
||||||
bypass_availability_store: bool,
|
bypass_availability_store: bool,
|
||||||
|
post_recovery_check: PostRecoveryCheck,
|
||||||
) -> error::Result<()> {
|
) -> error::Result<()> {
|
||||||
let candidate_hash = receipt.hash();
|
let candidate_hash = receipt.hash();
|
||||||
|
|
||||||
@@ -486,6 +501,7 @@ async fn handle_recover<Context>(
|
|||||||
metrics,
|
metrics,
|
||||||
recovery_strategies,
|
recovery_strategies,
|
||||||
bypass_availability_store,
|
bypass_availability_store,
|
||||||
|
post_recovery_check,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
},
|
},
|
||||||
@@ -527,15 +543,17 @@ async fn query_chunk_size<Context>(
|
|||||||
|
|
||||||
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
|
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
|
||||||
impl AvailabilityRecoverySubsystem {
|
impl AvailabilityRecoverySubsystem {
|
||||||
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
|
/// Create a new instance of `AvailabilityRecoverySubsystem` suitable for collator nodes,
|
||||||
/// `AvailabilityStoreSubsystem` subsystem.
|
/// which never requests the `AvailabilityStoreSubsystem` subsystem and only checks the POV hash
|
||||||
pub fn with_availability_store_skip(
|
/// instead of reencoding the available data.
|
||||||
|
pub fn for_collator(
|
||||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
|
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
|
||||||
bypass_availability_store: true,
|
bypass_availability_store: true,
|
||||||
|
post_recovery_check: PostRecoveryCheck::PovHash,
|
||||||
req_receiver,
|
req_receiver,
|
||||||
metrics,
|
metrics,
|
||||||
}
|
}
|
||||||
@@ -550,6 +568,7 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
Self {
|
Self {
|
||||||
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways,
|
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways,
|
||||||
bypass_availability_store: false,
|
bypass_availability_store: false,
|
||||||
|
post_recovery_check: PostRecoveryCheck::Reencode,
|
||||||
req_receiver,
|
req_receiver,
|
||||||
metrics,
|
metrics,
|
||||||
}
|
}
|
||||||
@@ -563,6 +582,7 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
Self {
|
Self {
|
||||||
recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways,
|
recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways,
|
||||||
bypass_availability_store: false,
|
bypass_availability_store: false,
|
||||||
|
post_recovery_check: PostRecoveryCheck::Reencode,
|
||||||
req_receiver,
|
req_receiver,
|
||||||
metrics,
|
metrics,
|
||||||
}
|
}
|
||||||
@@ -577,6 +597,7 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
Self {
|
Self {
|
||||||
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
|
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
|
||||||
bypass_availability_store: false,
|
bypass_availability_store: false,
|
||||||
|
post_recovery_check: PostRecoveryCheck::Reencode,
|
||||||
req_receiver,
|
req_receiver,
|
||||||
metrics,
|
metrics,
|
||||||
}
|
}
|
||||||
@@ -584,8 +605,13 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
|
|
||||||
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
|
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
|
||||||
let mut state = State::default();
|
let mut state = State::default();
|
||||||
let Self { mut req_receiver, metrics, recovery_strategy_kind, bypass_availability_store } =
|
let Self {
|
||||||
self;
|
mut req_receiver,
|
||||||
|
metrics,
|
||||||
|
recovery_strategy_kind,
|
||||||
|
bypass_availability_store,
|
||||||
|
post_recovery_check,
|
||||||
|
} = self;
|
||||||
|
|
||||||
let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
|
let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
|
||||||
let mut erasure_task_rx = erasure_task_rx.fuse();
|
let mut erasure_task_rx = erasure_task_rx.fuse();
|
||||||
@@ -675,7 +701,8 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
&metrics,
|
&metrics,
|
||||||
erasure_task_tx.clone(),
|
erasure_task_tx.clone(),
|
||||||
recovery_strategy_kind.clone(),
|
recovery_strategy_kind.clone(),
|
||||||
bypass_availability_store
|
bypass_availability_store,
|
||||||
|
post_recovery_check.clone()
|
||||||
).await {
|
).await {
|
||||||
gum::warn!(
|
gum::warn!(
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
|
|||||||
@@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
futures_undead::FuturesUndead, is_chunk_valid, is_unavailable, metrics::Metrics, ErasureTask,
|
futures_undead::FuturesUndead, is_chunk_valid, is_unavailable, metrics::Metrics, ErasureTask,
|
||||||
LOG_TARGET,
|
PostRecoveryCheck, LOG_TARGET,
|
||||||
};
|
};
|
||||||
use futures::{channel::oneshot, SinkExt};
|
use futures::{channel::oneshot, SinkExt};
|
||||||
#[cfg(not(test))]
|
#[cfg(not(test))]
|
||||||
@@ -95,6 +95,12 @@ pub struct RecoveryParams {
|
|||||||
|
|
||||||
/// Do not request data from availability-store. Useful for collators.
|
/// Do not request data from availability-store. Useful for collators.
|
||||||
pub bypass_availability_store: bool,
|
pub bypass_availability_store: bool,
|
||||||
|
|
||||||
|
/// The type of check to perform after available data was recovered.
|
||||||
|
pub post_recovery_check: PostRecoveryCheck,
|
||||||
|
|
||||||
|
/// The blake2-256 hash of the PoV.
|
||||||
|
pub pov_hash: Hash,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Intermediate/common data that must be passed between `RecoveryStrategy`s belonging to the
|
/// Intermediate/common data that must be passed between `RecoveryStrategy`s belonging to the
|
||||||
@@ -501,39 +507,48 @@ impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender>
|
|||||||
|
|
||||||
match response.await {
|
match response.await {
|
||||||
Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
|
Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
|
||||||
let (reencode_tx, reencode_rx) = oneshot::channel();
|
let maybe_data = match common_params.post_recovery_check {
|
||||||
self.params
|
PostRecoveryCheck::Reencode => {
|
||||||
.erasure_task_tx
|
let (reencode_tx, reencode_rx) = oneshot::channel();
|
||||||
.send(ErasureTask::Reencode(
|
self.params
|
||||||
common_params.n_validators,
|
.erasure_task_tx
|
||||||
common_params.erasure_root,
|
.send(ErasureTask::Reencode(
|
||||||
data,
|
common_params.n_validators,
|
||||||
reencode_tx,
|
common_params.erasure_root,
|
||||||
))
|
data,
|
||||||
.await
|
reencode_tx,
|
||||||
.map_err(|_| RecoveryError::ChannelClosed)?;
|
))
|
||||||
|
.await
|
||||||
|
.map_err(|_| RecoveryError::ChannelClosed)?;
|
||||||
|
|
||||||
let reencode_response =
|
reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?
|
||||||
reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
|
},
|
||||||
|
PostRecoveryCheck::PovHash =>
|
||||||
|
(data.pov.hash() == common_params.pov_hash).then_some(data),
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(data) = reencode_response {
|
match maybe_data {
|
||||||
gum::trace!(
|
Some(data) => {
|
||||||
target: LOG_TARGET,
|
gum::trace!(
|
||||||
candidate_hash = ?common_params.candidate_hash,
|
target: LOG_TARGET,
|
||||||
"Received full data",
|
candidate_hash = ?common_params.candidate_hash,
|
||||||
);
|
"Received full data",
|
||||||
|
);
|
||||||
|
|
||||||
return Ok(data)
|
return Ok(data)
|
||||||
} else {
|
},
|
||||||
gum::debug!(
|
None => {
|
||||||
target: LOG_TARGET,
|
gum::debug!(
|
||||||
candidate_hash = ?common_params.candidate_hash,
|
target: LOG_TARGET,
|
||||||
?validator_index,
|
candidate_hash = ?common_params.candidate_hash,
|
||||||
"Invalid data response",
|
?validator_index,
|
||||||
);
|
"Invalid data response",
|
||||||
|
);
|
||||||
|
|
||||||
// it doesn't help to report the peer with req/res.
|
// it doesn't help to report the peer with req/res.
|
||||||
}
|
// we'll try the next backer.
|
||||||
|
},
|
||||||
|
};
|
||||||
},
|
},
|
||||||
Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {},
|
Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {},
|
||||||
Err(e) => gum::debug!(
|
Err(e) => gum::debug!(
|
||||||
@@ -647,22 +662,43 @@ impl FetchChunks {
|
|||||||
|
|
||||||
match available_data_response {
|
match available_data_response {
|
||||||
Ok(data) => {
|
Ok(data) => {
|
||||||
// Send request to re-encode the chunks and check merkle root.
|
let maybe_data = match common_params.post_recovery_check {
|
||||||
let (reencode_tx, reencode_rx) = oneshot::channel();
|
PostRecoveryCheck::Reencode => {
|
||||||
self.erasure_task_tx
|
// Send request to re-encode the chunks and check merkle root.
|
||||||
.send(ErasureTask::Reencode(
|
let (reencode_tx, reencode_rx) = oneshot::channel();
|
||||||
common_params.n_validators,
|
self.erasure_task_tx
|
||||||
common_params.erasure_root,
|
.send(ErasureTask::Reencode(
|
||||||
data,
|
common_params.n_validators,
|
||||||
reencode_tx,
|
common_params.erasure_root,
|
||||||
))
|
data,
|
||||||
.await
|
reencode_tx,
|
||||||
.map_err(|_| RecoveryError::ChannelClosed)?;
|
))
|
||||||
|
.await
|
||||||
|
.map_err(|_| RecoveryError::ChannelClosed)?;
|
||||||
|
|
||||||
let reencode_response =
|
reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?.or_else(|| {
|
||||||
reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
|
gum::trace!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
candidate_hash = ?common_params.candidate_hash,
|
||||||
|
erasure_root = ?common_params.erasure_root,
|
||||||
|
"Data recovery error - root mismatch",
|
||||||
|
);
|
||||||
|
None
|
||||||
|
})
|
||||||
|
},
|
||||||
|
PostRecoveryCheck::PovHash =>
|
||||||
|
(data.pov.hash() == common_params.pov_hash).then_some(data).or_else(|| {
|
||||||
|
gum::trace!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
candidate_hash = ?common_params.candidate_hash,
|
||||||
|
pov_hash = ?common_params.pov_hash,
|
||||||
|
"Data recovery error - PoV hash mismatch",
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(data) = reencode_response {
|
if let Some(data) = maybe_data {
|
||||||
gum::trace!(
|
gum::trace!(
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
candidate_hash = ?common_params.candidate_hash,
|
candidate_hash = ?common_params.candidate_hash,
|
||||||
@@ -673,12 +709,6 @@ impl FetchChunks {
|
|||||||
Ok(data)
|
Ok(data)
|
||||||
} else {
|
} else {
|
||||||
recovery_duration.map(|rd| rd.stop_and_discard());
|
recovery_duration.map(|rd| rd.stop_and_discard());
|
||||||
gum::trace!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
candidate_hash = ?common_params.candidate_hash,
|
|
||||||
erasure_root = ?common_params.erasure_root,
|
|
||||||
"Data recovery error - root mismatch",
|
|
||||||
);
|
|
||||||
|
|
||||||
Err(RecoveryError::Invalid)
|
Err(RecoveryError::Invalid)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user