mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-20 05:51:02 +00:00
Prefer fetching small PoVs from backing group (#7173)
* impl QueryChunkSize Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * QueryChunkSize message Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * enable fetching from backing group for small pov Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * review feedback Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Refactor `bypass_availability_store` Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * review feedback Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> --------- Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
@@ -1052,6 +1052,25 @@ fn process_message(
|
|||||||
let _ =
|
let _ =
|
||||||
tx.send(load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?);
|
tx.send(load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?);
|
||||||
},
|
},
|
||||||
|
AvailabilityStoreMessage::QueryChunkSize(candidate, tx) => {
|
||||||
|
let meta = load_meta(&subsystem.db, &subsystem.config, &candidate)?;
|
||||||
|
|
||||||
|
let validator_index = meta.map_or(None, |meta| meta.chunks_stored.first_one());
|
||||||
|
|
||||||
|
let maybe_chunk_size = if let Some(validator_index) = validator_index {
|
||||||
|
load_chunk(
|
||||||
|
&subsystem.db,
|
||||||
|
&subsystem.config,
|
||||||
|
&candidate,
|
||||||
|
ValidatorIndex(validator_index as u32),
|
||||||
|
)?
|
||||||
|
.map(|erasure_chunk| erasure_chunk.chunk.len())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = tx.send(maybe_chunk_size);
|
||||||
|
},
|
||||||
AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => {
|
AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => {
|
||||||
match load_meta(&subsystem.db, &subsystem.config, &candidate)? {
|
match load_meta(&subsystem.db, &subsystem.config, &candidate)? {
|
||||||
None => {
|
None => {
|
||||||
|
|||||||
@@ -1153,3 +1153,51 @@ async fn import_leaf(
|
|||||||
|
|
||||||
new_leaf
|
new_leaf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn query_chunk_size_works() {
|
||||||
|
let store = test_store();
|
||||||
|
|
||||||
|
test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move {
|
||||||
|
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
|
||||||
|
let validator_index = ValidatorIndex(5);
|
||||||
|
let n_validators = 10;
|
||||||
|
|
||||||
|
let chunk = ErasureChunk {
|
||||||
|
chunk: vec![1, 2, 3],
|
||||||
|
index: validator_index,
|
||||||
|
proof: Proof::try_from(vec![vec![3, 4, 5]]).unwrap(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Ensure an entry already exists. In reality this would come from watching
|
||||||
|
// chain events.
|
||||||
|
with_tx(&store, |tx| {
|
||||||
|
super::write_meta(
|
||||||
|
tx,
|
||||||
|
&TEST_CONFIG,
|
||||||
|
&candidate_hash,
|
||||||
|
&CandidateMeta {
|
||||||
|
data_available: false,
|
||||||
|
chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators],
|
||||||
|
state: State::Unavailable(BETimestamp(0)),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let chunk_msg =
|
||||||
|
AvailabilityStoreMessage::StoreChunk { candidate_hash, chunk: chunk.clone(), tx };
|
||||||
|
|
||||||
|
overseer_send(&mut virtual_overseer, chunk_msg).await;
|
||||||
|
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let query_chunk_size = AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx);
|
||||||
|
|
||||||
|
overseer_send(&mut virtual_overseer, query_chunk_size).await;
|
||||||
|
|
||||||
|
assert_eq!(rx.await.unwrap().unwrap(), chunk.chunk.len());
|
||||||
|
virtual_overseer
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@@ -99,15 +99,47 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
|
const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
|
||||||
|
|
||||||
/// The Availability Recovery Subsystem.
|
/// PoV size limit in bytes for which prefer fetching from backers.
|
||||||
pub struct AvailabilityRecoverySubsystem {
|
const SMALL_POV_LIMIT: usize = 128 * 1024;
|
||||||
|
|
||||||
|
#[derive(Clone, PartialEq)]
|
||||||
|
/// The strategy we use to recover the PoV.
|
||||||
|
pub enum RecoveryStrategy {
|
||||||
|
/// We always try the backing group first, then fallback to validator chunks.
|
||||||
|
BackersFirstAlways,
|
||||||
|
/// We try the backing group first if PoV size is lower than specified, then fallback to validator chunks.
|
||||||
|
BackersFirstIfSizeLower(usize),
|
||||||
|
/// We always recover using validator chunks.
|
||||||
|
ChunksAlways,
|
||||||
/// Do not request data from the availability store.
|
/// Do not request data from the availability store.
|
||||||
/// This is the useful for nodes where the
|
/// This is the useful for nodes where the
|
||||||
/// availability-store subsystem is not expected to run,
|
/// availability-store subsystem is not expected to run,
|
||||||
/// such as collators.
|
/// such as collators.
|
||||||
bypass_availability_store: bool,
|
BypassAvailabilityStore,
|
||||||
|
}
|
||||||
|
|
||||||
fast_path: bool,
|
impl RecoveryStrategy {
|
||||||
|
/// Returns true if the strategy needs backing group index.
|
||||||
|
pub fn needs_backing_group(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
RecoveryStrategy::BackersFirstAlways | RecoveryStrategy::BackersFirstIfSizeLower(_) =>
|
||||||
|
true,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the PoV size limit in bytes for `BackersFirstIfSizeLower` strategy, otherwise `None`.
|
||||||
|
pub fn pov_size_limit(&self) -> Option<usize> {
|
||||||
|
match *self {
|
||||||
|
RecoveryStrategy::BackersFirstIfSizeLower(limit) => Some(limit),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// The Availability Recovery Subsystem.
|
||||||
|
pub struct AvailabilityRecoverySubsystem {
|
||||||
|
/// PoV recovery strategy to use.
|
||||||
|
recovery_strategy: RecoveryStrategy,
|
||||||
/// Receiver for available data requests.
|
/// Receiver for available data requests.
|
||||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||||
/// Metrics for this subsystem.
|
/// Metrics for this subsystem.
|
||||||
@@ -863,10 +895,10 @@ async fn launch_recovery_task<Context>(
|
|||||||
ctx: &mut Context,
|
ctx: &mut Context,
|
||||||
session_info: SessionInfo,
|
session_info: SessionInfo,
|
||||||
receipt: CandidateReceipt,
|
receipt: CandidateReceipt,
|
||||||
backing_group: Option<GroupIndex>,
|
mut backing_group: Option<GroupIndex>,
|
||||||
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
|
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
|
||||||
bypass_availability_store: bool,
|
|
||||||
metrics: &Metrics,
|
metrics: &Metrics,
|
||||||
|
recovery_strategy: &RecoveryStrategy,
|
||||||
) -> error::Result<()> {
|
) -> error::Result<()> {
|
||||||
let candidate_hash = receipt.hash();
|
let candidate_hash = receipt.hash();
|
||||||
|
|
||||||
@@ -877,9 +909,33 @@ async fn launch_recovery_task<Context>(
|
|||||||
candidate_hash,
|
candidate_hash,
|
||||||
erasure_root: receipt.descriptor.erasure_root,
|
erasure_root: receipt.descriptor.erasure_root,
|
||||||
metrics: metrics.clone(),
|
metrics: metrics.clone(),
|
||||||
bypass_availability_store,
|
bypass_availability_store: recovery_strategy == &RecoveryStrategy::BypassAvailabilityStore,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(small_pov_limit) = recovery_strategy.pov_size_limit() {
|
||||||
|
// Get our own chunk size to get an estimate of the PoV size.
|
||||||
|
let chunk_size: Result<Option<usize>, error::Error> =
|
||||||
|
query_chunk_size(ctx, candidate_hash).await;
|
||||||
|
if let Ok(Some(chunk_size)) = chunk_size {
|
||||||
|
let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len()) / 3;
|
||||||
|
let prefer_backing_group = pov_size_estimate < small_pov_limit;
|
||||||
|
|
||||||
|
gum::trace!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
?candidate_hash,
|
||||||
|
pov_size_estimate,
|
||||||
|
small_pov_limit,
|
||||||
|
enabled = prefer_backing_group,
|
||||||
|
"Prefer fetch from backing group",
|
||||||
|
);
|
||||||
|
|
||||||
|
backing_group = backing_group.filter(|_| {
|
||||||
|
// We keep the backing group only if `1/3` of chunks sum up to less than `small_pov_limit`.
|
||||||
|
prefer_backing_group
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let phase = backing_group
|
let phase = backing_group
|
||||||
.and_then(|g| session_info.validator_groups.get(g))
|
.and_then(|g| session_info.validator_groups.get(g))
|
||||||
.map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone())))
|
.map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone())))
|
||||||
@@ -917,8 +973,8 @@ async fn handle_recover<Context>(
|
|||||||
session_index: SessionIndex,
|
session_index: SessionIndex,
|
||||||
backing_group: Option<GroupIndex>,
|
backing_group: Option<GroupIndex>,
|
||||||
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
|
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
|
||||||
bypass_availability_store: bool,
|
|
||||||
metrics: &Metrics,
|
metrics: &Metrics,
|
||||||
|
recovery_strategy: &RecoveryStrategy,
|
||||||
) -> error::Result<()> {
|
) -> error::Result<()> {
|
||||||
let candidate_hash = receipt.hash();
|
let candidate_hash = receipt.hash();
|
||||||
|
|
||||||
@@ -961,8 +1017,8 @@ async fn handle_recover<Context>(
|
|||||||
receipt,
|
receipt,
|
||||||
backing_group,
|
backing_group,
|
||||||
response_sender,
|
response_sender,
|
||||||
bypass_availability_store,
|
|
||||||
metrics,
|
metrics,
|
||||||
|
recovery_strategy,
|
||||||
)
|
)
|
||||||
.await,
|
.await,
|
||||||
None => {
|
None => {
|
||||||
@@ -988,6 +1044,18 @@ async fn query_full_data<Context>(
|
|||||||
rx.await.map_err(error::Error::CanceledQueryFullData)
|
rx.await.map_err(error::Error::CanceledQueryFullData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Queries a chunk from av-store.
|
||||||
|
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
|
||||||
|
async fn query_chunk_size<Context>(
|
||||||
|
ctx: &mut Context,
|
||||||
|
candidate_hash: CandidateHash,
|
||||||
|
) -> error::Result<Option<usize>> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
rx.await.map_err(error::Error::CanceledQueryFullData)
|
||||||
|
}
|
||||||
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
|
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
|
||||||
impl AvailabilityRecoverySubsystem {
|
impl AvailabilityRecoverySubsystem {
|
||||||
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
|
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
|
||||||
@@ -996,7 +1064,7 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics }
|
Self { recovery_strategy: RecoveryStrategy::BypassAvailabilityStore, req_receiver, metrics }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
|
/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
|
||||||
@@ -1005,7 +1073,7 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics }
|
Self { recovery_strategy: RecoveryStrategy::BackersFirstAlways, req_receiver, metrics }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
|
/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
|
||||||
@@ -1013,12 +1081,25 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics }
|
Self { recovery_strategy: RecoveryStrategy::ChunksAlways, req_receiver, metrics }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is
|
||||||
|
/// above a threshold.
|
||||||
|
pub fn with_chunks_if_pov_large(
|
||||||
|
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||||
|
metrics: Metrics,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
recovery_strategy: RecoveryStrategy::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
|
||||||
|
req_receiver,
|
||||||
|
metrics,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
|
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
|
||||||
let mut state = State::default();
|
let mut state = State::default();
|
||||||
let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self;
|
let Self { recovery_strategy, mut req_receiver, metrics } = self;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
|
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
|
||||||
@@ -1045,10 +1126,10 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
&mut ctx,
|
&mut ctx,
|
||||||
receipt,
|
receipt,
|
||||||
session_index,
|
session_index,
|
||||||
maybe_backing_group.filter(|_| fast_path),
|
maybe_backing_group.filter(|_| recovery_strategy.needs_backing_group()),
|
||||||
response_sender,
|
response_sender,
|
||||||
bypass_availability_store,
|
|
||||||
&metrics,
|
&metrics,
|
||||||
|
&recovery_strategy,
|
||||||
).await {
|
).await {
|
||||||
gum::warn!(
|
gum::warn!(
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
@@ -1064,7 +1145,7 @@ impl AvailabilityRecoverySubsystem {
|
|||||||
in_req = recv_req => {
|
in_req = recv_req => {
|
||||||
match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
|
match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
|
||||||
Ok(req) => {
|
Ok(req) => {
|
||||||
if bypass_availability_store {
|
if recovery_strategy == RecoveryStrategy::BypassAvailabilityStore {
|
||||||
gum::debug!(
|
gum::debug!(
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
"Skipping request to availability-store.",
|
"Skipping request to availability-store.",
|
||||||
|
|||||||
@@ -117,6 +117,44 @@ fn test_harness_chunks_only<T: Future<Output = (VirtualOverseer, RequestResponse
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn test_harness_chunks_if_pov_large<
|
||||||
|
T: Future<Output = (VirtualOverseer, RequestResponseConfig)>,
|
||||||
|
>(
|
||||||
|
test: impl FnOnce(VirtualOverseer, RequestResponseConfig) -> T,
|
||||||
|
) {
|
||||||
|
let _ = env_logger::builder()
|
||||||
|
.is_test(true)
|
||||||
|
.filter(Some("polkadot_availability_recovery"), log::LevelFilter::Trace)
|
||||||
|
.try_init();
|
||||||
|
|
||||||
|
let pool = sp_core::testing::TaskExecutor::new();
|
||||||
|
|
||||||
|
let (context, virtual_overseer) = make_subsystem_context(pool.clone());
|
||||||
|
|
||||||
|
let (collation_req_receiver, req_cfg) =
|
||||||
|
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(&GENESIS_HASH, None));
|
||||||
|
let subsystem = AvailabilityRecoverySubsystem::with_chunks_if_pov_large(
|
||||||
|
collation_req_receiver,
|
||||||
|
Metrics::new_dummy(),
|
||||||
|
);
|
||||||
|
let subsystem = subsystem.run(context);
|
||||||
|
|
||||||
|
let test_fut = test(virtual_overseer, req_cfg);
|
||||||
|
|
||||||
|
futures::pin_mut!(test_fut);
|
||||||
|
futures::pin_mut!(subsystem);
|
||||||
|
|
||||||
|
executor::block_on(future::join(
|
||||||
|
async move {
|
||||||
|
let (mut overseer, _req_cfg) = test_fut.await;
|
||||||
|
overseer_signal(&mut overseer, OverseerSignal::Conclude).await;
|
||||||
|
},
|
||||||
|
subsystem,
|
||||||
|
))
|
||||||
|
.1
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
const TIMEOUT: Duration = Duration::from_millis(300);
|
const TIMEOUT: Duration = Duration::from_millis(300);
|
||||||
|
|
||||||
macro_rules! delay {
|
macro_rules! delay {
|
||||||
@@ -249,7 +287,7 @@ impl TestState {
|
|||||||
let _ = tx.send(if with_data {
|
let _ = tx.send(if with_data {
|
||||||
Some(self.available_data.clone())
|
Some(self.available_data.clone())
|
||||||
} else {
|
} else {
|
||||||
println!("SENDING NONE");
|
gum::debug!("Sending None");
|
||||||
None
|
None
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -914,6 +952,169 @@ fn fast_path_backing_group_recovers() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn recovers_from_only_chunks_if_pov_large() {
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness_chunks_if_pov_large(|mut virtual_overseer, req_cfg| async move {
|
||||||
|
overseer_signal(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||||
|
hash: test_state.current.clone(),
|
||||||
|
number: 1,
|
||||||
|
status: LeafStatus::Fresh,
|
||||||
|
span: Arc::new(jaeger::Span::Disabled),
|
||||||
|
})),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
AvailabilityRecoveryMessage::RecoverAvailableData(
|
||||||
|
test_state.candidate.clone(),
|
||||||
|
test_state.session_index,
|
||||||
|
Some(GroupIndex(0)),
|
||||||
|
tx,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
test_state.test_runtime_api(&mut virtual_overseer).await;
|
||||||
|
|
||||||
|
let candidate_hash = test_state.candidate.hash();
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::AvailabilityStore(
|
||||||
|
AvailabilityStoreMessage::QueryChunkSize(_, tx)
|
||||||
|
) => {
|
||||||
|
let _ = tx.send(Some(1000000));
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||||
|
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||||
|
|
||||||
|
test_state
|
||||||
|
.test_chunk_requests(
|
||||||
|
candidate_hash,
|
||||||
|
&mut virtual_overseer,
|
||||||
|
test_state.threshold(),
|
||||||
|
|_| Has::Yes,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Recovered data should match the original one.
|
||||||
|
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
// Test another candidate, send no chunks.
|
||||||
|
let mut new_candidate = dummy_candidate_receipt(dummy_hash());
|
||||||
|
|
||||||
|
new_candidate.descriptor.relay_parent = test_state.candidate.descriptor.relay_parent;
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
AvailabilityRecoveryMessage::RecoverAvailableData(
|
||||||
|
new_candidate.clone(),
|
||||||
|
test_state.session_index,
|
||||||
|
None,
|
||||||
|
tx,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
test_state.test_runtime_api(&mut virtual_overseer).await;
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::AvailabilityStore(
|
||||||
|
AvailabilityStoreMessage::QueryChunkSize(_, tx)
|
||||||
|
) => {
|
||||||
|
let _ = tx.send(Some(1000000));
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||||
|
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
|
||||||
|
|
||||||
|
test_state
|
||||||
|
.test_chunk_requests(
|
||||||
|
new_candidate.hash(),
|
||||||
|
&mut virtual_overseer,
|
||||||
|
test_state.impossibility_threshold(),
|
||||||
|
|_| Has::No,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// A request times out with `Unavailable` error.
|
||||||
|
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
|
||||||
|
(virtual_overseer, req_cfg)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn fast_path_backing_group_recovers_if_pov_small() {
|
||||||
|
let test_state = TestState::default();
|
||||||
|
|
||||||
|
test_harness_chunks_if_pov_large(|mut virtual_overseer, req_cfg| async move {
|
||||||
|
overseer_signal(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||||
|
hash: test_state.current.clone(),
|
||||||
|
number: 1,
|
||||||
|
status: LeafStatus::Fresh,
|
||||||
|
span: Arc::new(jaeger::Span::Disabled),
|
||||||
|
})),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
overseer_send(
|
||||||
|
&mut virtual_overseer,
|
||||||
|
AvailabilityRecoveryMessage::RecoverAvailableData(
|
||||||
|
test_state.candidate.clone(),
|
||||||
|
test_state.session_index,
|
||||||
|
Some(GroupIndex(0)),
|
||||||
|
tx,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
test_state.test_runtime_api(&mut virtual_overseer).await;
|
||||||
|
|
||||||
|
let candidate_hash = test_state.candidate.hash();
|
||||||
|
|
||||||
|
let who_has = |i| match i {
|
||||||
|
3 => Has::Yes,
|
||||||
|
_ => Has::No,
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
overseer_recv(&mut virtual_overseer).await,
|
||||||
|
AllMessages::AvailabilityStore(
|
||||||
|
AvailabilityStoreMessage::QueryChunkSize(_, tx)
|
||||||
|
) => {
|
||||||
|
let _ = tx.send(Some(100));
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
|
||||||
|
|
||||||
|
test_state
|
||||||
|
.test_full_data_requests(candidate_hash, &mut virtual_overseer, who_has)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Recovered data should match the original one.
|
||||||
|
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
|
||||||
|
(virtual_overseer, req_cfg)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn no_answers_in_fast_path_causes_chunk_requests() {
|
fn no_answers_in_fast_path_causes_chunk_requests() {
|
||||||
let test_state = TestState::default();
|
let test_state = TestState::default();
|
||||||
|
|||||||
@@ -224,7 +224,7 @@ where
|
|||||||
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
|
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
|
||||||
Metrics::register(registry)?,
|
Metrics::register(registry)?,
|
||||||
))
|
))
|
||||||
.availability_recovery(AvailabilityRecoverySubsystem::with_chunks_only(
|
.availability_recovery(AvailabilityRecoverySubsystem::with_chunks_if_pov_large(
|
||||||
available_data_req_receiver,
|
available_data_req_receiver,
|
||||||
Metrics::register(registry)?,
|
Metrics::register(registry)?,
|
||||||
))
|
))
|
||||||
|
|||||||
@@ -429,6 +429,9 @@ pub enum AvailabilityStoreMessage {
|
|||||||
/// Query an `ErasureChunk` from the AV store by the candidate hash and validator index.
|
/// Query an `ErasureChunk` from the AV store by the candidate hash and validator index.
|
||||||
QueryChunk(CandidateHash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),
|
QueryChunk(CandidateHash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),
|
||||||
|
|
||||||
|
/// Get the size of an `ErasureChunk` from the AV store by the candidate hash.
|
||||||
|
QueryChunkSize(CandidateHash, oneshot::Sender<Option<usize>>),
|
||||||
|
|
||||||
/// Query all chunks that we have for the given candidate hash.
|
/// Query all chunks that we have for the given candidate hash.
|
||||||
QueryAllChunks(CandidateHash, oneshot::Sender<Vec<ErasureChunk>>),
|
QueryAllChunks(CandidateHash, oneshot::Sender<Vec<ErasureChunk>>),
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user