mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 13:31:10 +00:00
Add option to skip av-store requests in availability-recovery-subsystem (#7131)
* Allow to skip availability-store * Update node/network/availability-recovery/src/lib.rs Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> --------- Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
This commit is contained in:
@@ -101,6 +101,12 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
|
||||
|
||||
/// The Availability Recovery Subsystem.
|
||||
pub struct AvailabilityRecoverySubsystem {
|
||||
/// Do not request data from the availability store.
|
||||
/// This is the useful for nodes where the
|
||||
/// availability-store subsystem is not expected to run,
|
||||
/// such as collators.
|
||||
bypass_availability_store: bool,
|
||||
|
||||
fast_path: bool,
|
||||
/// Receiver for available data requests.
|
||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||
@@ -147,6 +153,9 @@ struct RecoveryParams {
|
||||
|
||||
/// Metrics to report
|
||||
metrics: Metrics,
|
||||
|
||||
/// Do not request data from availability-store
|
||||
bypass_availability_store: bool,
|
||||
}
|
||||
|
||||
/// Source the availability data either by means
|
||||
@@ -467,7 +476,7 @@ impl RequestChunksFromValidators {
|
||||
let metrics = ¶ms.metrics;
|
||||
|
||||
// First query the store for any chunks we've got.
|
||||
{
|
||||
if !params.bypass_availability_store {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender
|
||||
.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
|
||||
@@ -668,7 +677,7 @@ where
|
||||
{
|
||||
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
|
||||
// First just see if we have the data available locally.
|
||||
{
|
||||
if !self.params.bypass_availability_store {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.sender
|
||||
.send_message(AvailabilityStoreMessage::QueryAvailableData(
|
||||
@@ -856,6 +865,7 @@ async fn launch_recovery_task<Context>(
|
||||
receipt: CandidateReceipt,
|
||||
backing_group: Option<GroupIndex>,
|
||||
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
|
||||
bypass_availability_store: bool,
|
||||
metrics: &Metrics,
|
||||
) -> error::Result<()> {
|
||||
let candidate_hash = receipt.hash();
|
||||
@@ -867,6 +877,7 @@ async fn launch_recovery_task<Context>(
|
||||
candidate_hash,
|
||||
erasure_root: receipt.descriptor.erasure_root,
|
||||
metrics: metrics.clone(),
|
||||
bypass_availability_store,
|
||||
};
|
||||
|
||||
let phase = backing_group
|
||||
@@ -906,6 +917,7 @@ async fn handle_recover<Context>(
|
||||
session_index: SessionIndex,
|
||||
backing_group: Option<GroupIndex>,
|
||||
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
|
||||
bypass_availability_store: bool,
|
||||
metrics: &Metrics,
|
||||
) -> error::Result<()> {
|
||||
let candidate_hash = receipt.hash();
|
||||
@@ -949,6 +961,7 @@ async fn handle_recover<Context>(
|
||||
receipt,
|
||||
backing_group,
|
||||
response_sender,
|
||||
bypass_availability_store,
|
||||
metrics,
|
||||
)
|
||||
.await,
|
||||
@@ -977,13 +990,22 @@ async fn query_full_data<Context>(
|
||||
|
||||
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
|
||||
impl AvailabilityRecoverySubsystem {
|
||||
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
|
||||
/// `AvailabilityStoreSubsystem` subsystem.
|
||||
pub fn with_availability_store_skip(
|
||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||
metrics: Metrics,
|
||||
) -> Self {
|
||||
Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics }
|
||||
}
|
||||
|
||||
/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
|
||||
/// request data from backers.
|
||||
pub fn with_fast_path(
|
||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||
metrics: Metrics,
|
||||
) -> Self {
|
||||
Self { fast_path: true, req_receiver, metrics }
|
||||
Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics }
|
||||
}
|
||||
|
||||
/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
|
||||
@@ -991,12 +1013,12 @@ impl AvailabilityRecoverySubsystem {
|
||||
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
|
||||
metrics: Metrics,
|
||||
) -> Self {
|
||||
Self { fast_path: false, req_receiver, metrics }
|
||||
Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics }
|
||||
}
|
||||
|
||||
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
|
||||
let mut state = State::default();
|
||||
let Self { fast_path, mut req_receiver, metrics } = self;
|
||||
let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self;
|
||||
|
||||
loop {
|
||||
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
|
||||
@@ -1025,6 +1047,7 @@ impl AvailabilityRecoverySubsystem {
|
||||
session_index,
|
||||
maybe_backing_group.filter(|_| fast_path),
|
||||
response_sender,
|
||||
bypass_availability_store,
|
||||
&metrics,
|
||||
).await {
|
||||
gum::warn!(
|
||||
@@ -1041,6 +1064,14 @@ impl AvailabilityRecoverySubsystem {
|
||||
in_req = recv_req => {
|
||||
match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
|
||||
Ok(req) => {
|
||||
if bypass_availability_store {
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
"Skipping request to availability-store.",
|
||||
);
|
||||
let _ = req.send_response(None.into());
|
||||
continue
|
||||
}
|
||||
match query_full_data(&mut ctx, req.payload.candidate_hash).await {
|
||||
Ok(res) => {
|
||||
let _ = req.send_response(res.into());
|
||||
|
||||
Reference in New Issue
Block a user