mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-09 05:57:59 +00:00
1a1bfd2af9
* cargo fmt * Create benchmarks for XCM instructions introduced in v3 (#4564) * Create benchmarks for BurnAsset and ExpectAsset * Add benchmarks for ExpectOrigin and ExpectError * Add benchmarks for QueryPallet and ExpectPallet * Add benchmarks for ReportTransactStatus and ClearTransactStatus * cargo fmt * Use AllPalletsWithSystem in mocks * Update XCM generic benchmarks for westend * Remove default impls for some XCM weight functions * Fix compilation error * Add weight_args helper attribute * Remove manually written XcmWeightInfo * Parse trailing comma * Revert "Add weight_args helper attribute" This reverts commit 3b7c47a6182e1b9227036c38b406d494c3fcf6fd. * Fixes * Fixes * XCM v3: Introduce querier field into `QueryReponse` (#4732) * Introduce querier field into QueryReponse * Convert &Option<MultiLocation> to Option<&MultiLocation> &Option<T> is almost always never quite useful, most of the time it still gets converted to an Option<&T> via `as_ref`, so we should simply make functions that accept Option<&T> instead. * Fix tests * cargo fmt * Fix benchmarks * Appease spellchecker * Fix test * Fix tests * Fix test * Fix mock * Fixes * Fix tests * Add test for response queriers * Update xcm/pallet-xcm/src/lib.rs * Test for non-existence of querier Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> * Fixes * Fixes * Add `starts_with` function to `MultiLocation` and `Junctions` (#4835) * add matches_prefix function to MultiLocation and Junctions * rename matches_prefix to starts_with * remove unnecessary main in doc comment Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Make use of starts_with in match_and_split Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> * XCM v3: Bridge infrastructure (#4681) * XCM bridge infrastructure * Missing bit of cherry-pick * Revamped XCM proc macros; new NetworkIds * Fixes * Formatting * ExportMessage instruction and config type * Add MessageExporter definitions * Formatting * Missing files * Fixes * Initial bridging config API * Allow for two-stage XCM execution * Update xcm/src/v3/mod.rs Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> * XCM crate building again * Initial bridging primitive * Docs * Docs * More work * More work * Merge branch 'gav-xcm-v3' into gav-xcm-v3-bridging * Make build * WithComputedOrigin and SovereignPaidRemoteExporter * Remove TODOs * Slim bridge API and tests. * Fixes * More work * First bridge test passing * Formatting * Another test * Next round of bridging tests * Repot tests * Cleanups * Paid bridging * Formatting * Tests * Spelling * Formatting * Fees and refactoring * Fixes * Formatting * Refactor SendXcm to become two-phase * Fix tests * Refactoring of SendXcm and ExportXcm complete * Formatting * Rename CannotReachDestination -> NotApplicable * Remove XCM v0 * Minor grumbles * Formatting * Formatting * Fixes * Fixes * Cleanup XCM config * Fee handling * Fixes * Formatting * Fixes * Bump Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> * Bump Substrate * XCM v3: `ExchangeAsset` and Remote-locking (#4945) * Asset Exchange and Locks * Make sure XCM typers impl MaxEncodedLen * Basic implementation for locks * Bump Substrate * Missing files * Use new API * Introduce instruction * Big refactor * Docs * Remove deprecated struct * Remove deprecated struct * Repot XCM builder tests * ExchangeAsset test * Exchange tests * Locking tests * Locking tests * Fixes and tests * Fixes * Formatting * Spelling * Add simulator test for remote locking * Fix tests * Bump * XCM v3: Support for non-fungibles (#4950) * NFT support and a test * New files. * Integration tests for sending NFTs * Formatting * Broken Cargo features * Use 2021 edition * Fixes * Formatting * Formatting * Update xcm/xcm-builder/src/asset_conversion.rs Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> * Update xcm/xcm-builder/src/nonfungibles_adapter.rs Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> * Update xcm/xcm-executor/src/lib.rs Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> * Fixes * Fixes * Fixes * Formatting * Fixes Co-authored-by: Bastian Köcher <info@kchr.de> Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> * XCM v3: Context & ID hash (#4756) * send_xcm returns message hash * cargo fmt * Create topic register and instructions * Fix weights * Use tabs * Sketch out XcmContext * Fix doc test * Add the XCM context as a parameter to executor trait fns * Fixes * Add XcmContext parameter * Revert adding context as an arg to SendXcm trait methods * Revert adding context argument to ConvertOrigin trait methods * cargo fmt * Do not change the API of XcmExecutor::execute * Fixes * Fixes * Fixes * Fixes * Remove convenience method * Fixes * Fixes * cargo fmt * Fixes * Add benchmarks for XCM topic instructions * cargo run --quiet --profile=production --features=runtime-benchmarks -- benchmark --chain=westend-dev --steps=50 --repeat=20 --pallet=pallet_xcm_benchmarks::generic --extrinsic=* --execution=wasm --wasm-execution=compiled --heap-pages=4096 --header=./file_header.txt --template=./xcm/pallet-xcm-benchmarks/template.hbs --output=./runtime/westend/src/weights/xcm/pallet_xcm_benchmarks_generic.rs * Remove context argument on FilterAssetLocation * Fixes * Remove unused import * Fixes * Fixes * Fixes * Accept XCM hash parameter in ExecuteXcm trait methods * cargo fmt * Properly enable sp-io/std * Fixes * default-features = false * Fixes * Fixes * Fixes * Make XcmContext optional in withdraw_asset * Fixes * Fixes * Fixes * Modify tests to check for the correct XCM hash * Small refactor * cargo fmt * Check for expected hash in xcm-builder unit tests * Add doc comment for the optionality of the XCM context in withdraw_asset * Update xcm/src/v3/traits.rs * Update xcm/src/v3/traits.rs * Store XcmContext and avoid rebuilding * Use ref for XcmContext * Formatting * Fix incorrect hash CC @KiChjang * Refactor and make clear fake hashes * Fixes * Fixes * Fixes * Fix broken hashing * Docs * Fixes * Fixes * Fixes * Formatting * Fixes * Fixes * Fixes * Remove unknowable hash * Formatting * Use message hash for greater identifiability * Formatting * Fixes * Formatting Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> Co-authored-by: Parity Bot <admin@parity.io> * Fixes * Fixes * Fixes * Fixes * Formatting * Fixes * Formatting * Fixes * Fixes * Formatting * Formatting * Remove horrible names * Bump * Remove InvertLocation trait (#5092) * Remove InvertLocation trait * Remove unneeded functions * Formatting * Fixes * Remove XCMv1 (#5094) * Remove XCMv1 * Remove XCMv1 * Formatting * Fixes * Fixes * Formatting * derive serialize/deserialize for xcm primitives (#5036) * derive serialize/deserialize for xcm primitives * derive serialize/deserialize for xcm primitives * update v3 * update v2 Co-authored-by: Gav Wood <gavin@parity.io> * Update lock * Fixes * Add benchmarks for the ExchangeAsset instruction * `AliasOrigin` instruction stub (#5122) * AliasOrigin instruction stub * Fixes * Fixes * Update substrate * Fixes * Ensure same array length before using copy_from_slice * Fixes * Add benchmarks for the UniversalOrigin instruction * Remove unused import * Remove unused import * Add benchmarks for SetFeesMode instruction * Add benchmarks for asset (un)locking instructions * Leave AliasOrigin unbenchmarked * Fixes after merge * cargo fmt * Fixes * Fixes * Set TrustedReserves to None on both Kusama and Westend * Remove extraneous reserve_asset_deposited benchmark * Fix universal_origin benchmark * cargo run --quiet --profile=production --features=runtime-benchmarks -- benchmark pallet --chain=westend-dev --steps=50 --repeat=20 --pallet=pallet_xcm_benchmarks::generic --extrinsic=* --execution=wasm --wasm-execution=compiled --heap-pages=4096 --header=./file_header.txt --template=./xcm/pallet-xcm-benchmarks/template.hbs --output=./runtime/westend/src/weights/xcm/pallet_xcm_benchmarks_generic.rs * Don't rely on skipped benchmark functions * Fixes * cargo run --quiet --profile=production --features=runtime-benchmarks -- benchmark pallet --chain=kusama-dev --steps=50 --repeat=20 --pallet=pallet_xcm_benchmarks::generic --extrinsic=* --execution=wasm --wasm-execution=compiled --heap-pages=4096 --header=./file_header.txt --template=./xcm/pallet-xcm-benchmarks/template.hbs --output=./runtime/kusama/src/weights/xcm/pallet_xcm_benchmarks_generic.rs * Fix unused variables * Fixes * Spelling * Fixes * Fix codec index of VersionedXcm * Allows to customize how calls are dispatched from XCM (#5657) * CallDispatcher trait * fmt * unused import * fix test-runtime * remove JustDispatch type * fix typo in test-runtime * missing CallDispatcher * more missing CallDispatcher * Update comment `NoteAssetLocked` -> `NoteUnlockable` * Fixes * Fixes * Adjust MultiAssets weights based on new wild card variants * Fixes * Fixes * Fixes * Fixes * Fixes * Some late fixes for XCMv3 (#5237) * Maximise chances that trapped assets can be reclaimed * Do origin check as part of ExportMessage for security * Formatting * Fixes * Cleanup export XCM APIs * Formatting * Update xcm/src/v3/junctions.rs * UnpaidExecution instruction and associated barrier. * Tighten barriers (ClearOrigin/QueryResponse) * Allow only 1 ClearOrigin instruction in AllowTopLevelPaidExecutionFrom * Bi-directional teleport accounting * Revert other fix * Build fixes] * Tests build * Benchmark fixes Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> * Update Substrate * Re-export `pub` stuff from universal_exports.rs + removed unecessary clone (#6145) * Re-export `pub` stuff from universal_exports.rs * Removed unnecessary clone * Use 2D weights in XCM v3 (#6134) * Depend upon sp-core instead of sp-runtime * Make sp-io a dev-dependency * Use 2D weights in XCM v3 * cargo fmt * Add XCM pallet migration to runtimes * Use from_parts * cargo fmt * Fixes * cargo fmt * Remove XCMWeight import * Fixes * Fixes * Fixes * Fixes * Use translate in migration * Increase max upward message size in tests * Fix doc test * Remove most uses of from_ref_time * cargo fmt * Fixes * Fixes * Add extrinsic benchmarking to XCM pallet * cargo fmt * Fixes * Use old syntax * cargo fmt * Fixes * Remove hardcoded weights * Add XCM pallet to benchmarks * Use successful origin * Fix weird type parameter compilation issue * Fixes * ".git/.scripts/bench-bot.sh" runtime westend-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime rococo-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime kusama-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime polkadot-dev pallet_xcm * Use benchmarked XCM pallet weights * Fixes * Fixes * Use override instead of skip * Fixes * Fixes * Fixes * Fixes * ".git/.scripts/bench-bot.sh" runtime polkadot-dev pallet_xcm * Fixes * ".git/.scripts/bench-bot.sh" runtime polkadot-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime westend-dev pallet_xcm Co-authored-by: command-bot <> * Replace Weight::MAX with 100b weight units * Add test to ensure all_gte in barriers is correct * Update xcm/src/v3/junction.rs Co-authored-by: asynchronous rob <rphmeier@gmail.com> * Add more weight tests * cargo fmt * Create thread_local in XCM executor to limit recursion depth (#6304) * Create thread_local in XCM executor to limit recursion depth * Add unit test for recursion limit * Fix statefulness in tests * Remove panic * Use defer and environmental macro * Fix the implementation * Use nicer interface * Change ThisNetwork to AnyNetwork * Move recursion check up to top level * cargo fmt * Update comment Co-authored-by: Bastian Köcher <info@kchr.de> * Add upper limit on the number of overweight messages in the queue (#6298) * Add upper limit on the number of ovwerweight messages in the queue * Add newline * Introduce whitelist for Transact and limit UMP processing to 10 messages per block (#6280) * Add SafeCallFilter to XcmConfig * Limit UMP to receive 10 messages every block * Place 10 message limit on processing instead of receiving * Always increment the message_processed count whenever a message is processed * Add as_derivative to the Transact whitelist * cargo fmt * Fixes * Update xcm/xcm-builder/src/universal_exports.rs Co-authored-by: Branislav Kontur <bkontur@gmail.com> * Fixes * Fixes * Remove topic register and instead use the topic field in XcmContext * Derive some common traits for DispatchBlobError * Fixes * cargo fmt * Fixes * Fixes * Fix comments * Fixes * Introduce WithOriginFilter and apply it as the CallDispatcher for runtimes * Fixes * Appease clippy and fixes * Fixes * Fix more clippy issues * Fixes * ".git/.scripts/bench-bot.sh" runtime polkadot-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime westend-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime westend-dev pallet_xcm * Add benchmark function for ExportMessage * Fix comment * Add upper limit to DownwardMessageQueues size * Add max size check for queue in can_queue_downward_message * Fixes * Make Transact runtime call configurable * Return Weight::MAX when there is no successful send XCM origin * Update substrate * Fixes * Fixes * Remove ExportMessage benchmark * Remove assertion on Transact instruction benchmark * Make reachable destination configurable in XCM pallet benchmarks * Fixes * Fixes * Remove cfg attribute in fuzzer * Fixes * Remove cfg attribute for XCM pallet in test runtime * Fixes * Use ReachableDest where possible * Fixes * Add benchmark for UnpaidExecution * Update substrate * Ensure benchmark functions pass filters * Add runtime-benchmarks feature to fuzzer * Ensure FixedRateOfFungible accounts for proof size weights * cargo fmt * Whitelist remark_with_event when runtime-benchmarks feature is enabled * Use remark_with_event for Transact benchmarks * Fix Cargo.lock * Allow up to 3 DescendOrigin instructions before UnpaidExecution * cargo fmt * Edit code comment * Check check_origin for unpaid execution privilege * Fixes * Small nits for xcm-v3 (#6408) * Add possibility to skip benchmark for export_message * ".git/.scripts/bench-bot.sh" xcm westend-dev pallet_xcm_benchmarks::generic * Revert * ".git/.scripts/bench-bot.sh" xcm westend-dev pallet_xcm_benchmarks::generic * Add HaulBlobError to `fn haul_blob` * ".git/.scripts/bench-bot.sh" xcm westend-dev pallet_xcm_benchmarks::generic Co-authored-by: command-bot <> * Revert changes to UnpaidExecution * Change AllowUnpaidExecutionFrom to be explicit * Fix log text * cargo fmt * Add benchmarks for XCM pallet version migration (#6448) * Add benchmarks for XCM pallet version migration * cargo fmt * Fixes * Fixes * Fixes * ".git/.scripts/bench-bot.sh" runtime westend-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime kusama-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime rococo-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime polkadot-dev pallet_xcm * Fix benchmarks * Fix benchmarks * ".git/.scripts/bench-bot.sh" runtime westend-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime kusama-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime rococo-dev pallet_xcm * ".git/.scripts/bench-bot.sh" runtime polkadot-dev pallet_xcm Co-authored-by: command-bot <> * Merge remote-tracking branch 'origin/master' into gav-xcm-v3 * Fixes * Fix comments (#6470) * Specify Ethereum networks by their chain id (#6286) Co-authored-by: Squirrel <gilescope@gmail.com> * Use for Kusama * Use WithComputedOrigin for Polkadot, Rococo and Westend * Update lock * Fix warning * Update xcm/pallet-xcm/src/tests.rs Co-authored-by: Squirrel <gilescope@gmail.com> * Update runtime/parachains/src/ump/migration.rs Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io> * Update xcm/pallet-xcm/src/migration.rs Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io> * Fixes * cargo fmt * Typo * Update xcm/src/v3/mod.rs Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io> * Docs * Docs * Docs * Docs * Docs * Update xcm/src/v3/multiasset.rs Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io> * Add tests for MultiAssets::from_sorted_and_deduplicated * Fail gracefully when same instance NFTs are detected during push * Update Substrate to fix benchmarks * Apply suggestions from code review * Update runtime/kusama/src/xcm_config.rs * Rename arguments * Attempt to fix benchmark * ".git/.scripts/commands/bench/bench.sh" runtime polkadot-dev runtime_parachains::ump * Use actual weights for UMP pallet in Polkadot * ".git/.scripts/commands/bench/bench.sh" runtime kusama-dev runtime_parachains::ump * ".git/.scripts/commands/bench/bench.sh" runtime westend-dev runtime_parachains::ump * ".git/.scripts/commands/bench/bench.sh" runtime rococo-dev runtime_parachains::ump Co-authored-by: Keith Yeung <kungfukeith11@gmail.com> Co-authored-by: Alexander Popiak <alexander.popiak@parity.io> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Bastian Köcher <info@kchr.de> Co-authored-by: Parity Bot <admin@parity.io> Co-authored-by: stanly-johnson <stanlyjohnson@outlook.com> Co-authored-by: nanocryk <6422796+nanocryk@users.noreply.github.com> Co-authored-by: Branislav Kontur <bkontur@gmail.com> Co-authored-by: asynchronous rob <rphmeier@gmail.com> Co-authored-by: command-bot <> Co-authored-by: Vincent Geddes <vincent.geddes@hey.com> Co-authored-by: Squirrel <gilescope@gmail.com> Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io> Co-authored-by: Shawn Tabrizi <shawntabrizi@gmail.com>
1643 lines
48 KiB
Rust
1643 lines
48 KiB
Rust
// Copyright 2021 Parity Technologies (UK) Ltd.
|
|
// This file is part of Polkadot.
|
|
|
|
// Polkadot is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// Polkadot is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
//! Validation host - is the primary interface for this crate. It allows the clients to enqueue
|
|
//! jobs for PVF execution or preparation.
|
|
//!
|
|
//! The validation host is represented by a future/task that runs an event-loop and by a handle,
|
|
//! [`ValidationHost`], that allows communication with that event-loop.
|
|
|
|
use crate::{
|
|
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
|
|
error::PrepareError,
|
|
execute,
|
|
metrics::Metrics,
|
|
prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET,
|
|
};
|
|
use always_assert::never;
|
|
use futures::{
|
|
channel::{mpsc, oneshot},
|
|
Future, FutureExt, SinkExt, StreamExt,
|
|
};
|
|
use polkadot_parachain::primitives::ValidationResult;
|
|
use std::{
|
|
collections::HashMap,
|
|
path::{Path, PathBuf},
|
|
time::{Duration, SystemTime},
|
|
};
|
|
|
|
/// For prechecking requests, the time period after which the preparation worker is considered
|
|
/// unresponsive and will be killed.
|
|
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
|
|
pub const PRECHECK_PREPARATION_TIMEOUT: Duration = Duration::from_secs(60);
|
|
|
|
/// For execution and heads-up requests, the time period after which the preparation worker is
|
|
/// considered unresponsive and will be killed. More lenient than the timeout for prechecking to
|
|
/// prevent honest validators from timing out on valid PVFs.
|
|
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
|
|
pub const LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360);
|
|
|
|
/// The time period after which a failed preparation artifact is considered ready to be retried.
|
|
/// Note that we will only retry if another request comes in after this cooldown has passed.
|
|
#[cfg(not(test))]
|
|
pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60);
|
|
#[cfg(test)]
|
|
pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
|
|
|
|
/// The amount of times we will retry failed prepare jobs.
|
|
pub const NUM_PREPARE_RETRIES: u32 = 5;
|
|
|
|
/// An alias to not spell the type for the oneshot sender for the PVF execution result.
|
|
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
|
|
|
|
/// Transmission end used for sending the PVF preparation result.
|
|
pub(crate) type PrepareResultSender = oneshot::Sender<PrepareResult>;
|
|
|
|
/// A handle to the async process serving the validation host requests.
|
|
#[derive(Clone)]
|
|
pub struct ValidationHost {
|
|
to_host_tx: mpsc::Sender<ToHost>,
|
|
}
|
|
|
|
impl ValidationHost {
|
|
/// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time
|
|
/// limit. This will prepare the PVF. The result of preparation will be sent to the provided
|
|
/// result sender.
|
|
///
|
|
/// This is async to accommodate the possibility of back-pressure. In the vast majority of
|
|
/// situations this function should return immediately.
|
|
///
|
|
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
|
|
pub async fn precheck_pvf(
|
|
&mut self,
|
|
pvf: Pvf,
|
|
result_tx: PrepareResultSender,
|
|
) -> Result<(), String> {
|
|
self.to_host_tx
|
|
.send(ToHost::PrecheckPvf { pvf, result_tx })
|
|
.await
|
|
.map_err(|_| "the inner loop hung up".to_string())
|
|
}
|
|
|
|
/// Execute PVF with the given code, execution timeout, parameters and priority.
|
|
/// The result of execution will be sent to the provided result sender.
|
|
///
|
|
/// This is async to accommodate the possibility of back-pressure. In the vast majority of
|
|
/// situations this function should return immediately.
|
|
///
|
|
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
|
|
pub async fn execute_pvf(
|
|
&mut self,
|
|
pvf: Pvf,
|
|
execution_timeout: Duration,
|
|
params: Vec<u8>,
|
|
priority: Priority,
|
|
result_tx: ResultSender,
|
|
) -> Result<(), String> {
|
|
self.to_host_tx
|
|
.send(ToHost::ExecutePvf(ExecutePvfInputs {
|
|
pvf,
|
|
execution_timeout,
|
|
params,
|
|
priority,
|
|
result_tx,
|
|
}))
|
|
.await
|
|
.map_err(|_| "the inner loop hung up".to_string())
|
|
}
|
|
|
|
/// Sends a signal to the validation host requesting to prepare a list of the given PVFs.
|
|
///
|
|
/// This is async to accommodate the possibility of back-pressure. In the vast majority of
|
|
/// situations this function should return immediately.
|
|
///
|
|
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
|
|
pub async fn heads_up(&mut self, active_pvfs: Vec<Pvf>) -> Result<(), String> {
|
|
self.to_host_tx
|
|
.send(ToHost::HeadsUp { active_pvfs })
|
|
.await
|
|
.map_err(|_| "the inner loop hung up".to_string())
|
|
}
|
|
}
|
|
|
|
enum ToHost {
|
|
PrecheckPvf { pvf: Pvf, result_tx: PrepareResultSender },
|
|
ExecutePvf(ExecutePvfInputs),
|
|
HeadsUp { active_pvfs: Vec<Pvf> },
|
|
}
|
|
|
|
struct ExecutePvfInputs {
|
|
pvf: Pvf,
|
|
execution_timeout: Duration,
|
|
params: Vec<u8>,
|
|
priority: Priority,
|
|
result_tx: ResultSender,
|
|
}
|
|
|
|
/// Configuration for the validation host.
|
|
pub struct Config {
|
|
/// The root directory where the prepared artifacts can be stored.
|
|
pub cache_path: PathBuf,
|
|
/// The path to the program that can be used to spawn the prepare workers.
|
|
pub prepare_worker_program_path: PathBuf,
|
|
/// The time allotted for a prepare worker to spawn and report to the host.
|
|
pub prepare_worker_spawn_timeout: Duration,
|
|
/// The maximum number of workers that can be spawned in the prepare pool for tasks with the
|
|
/// priority below critical.
|
|
pub prepare_workers_soft_max_num: usize,
|
|
/// The absolute number of workers that can be spawned in the prepare pool.
|
|
pub prepare_workers_hard_max_num: usize,
|
|
/// The path to the program that can be used to spawn the execute workers.
|
|
pub execute_worker_program_path: PathBuf,
|
|
/// The time allotted for an execute worker to spawn and report to the host.
|
|
pub execute_worker_spawn_timeout: Duration,
|
|
/// The maximum number of execute workers that can run at the same time.
|
|
pub execute_workers_max_num: usize,
|
|
}
|
|
|
|
impl Config {
|
|
/// Create a new instance of the configuration.
|
|
pub fn new(cache_path: std::path::PathBuf, program_path: std::path::PathBuf) -> Self {
|
|
// Do not contaminate the other parts of the codebase with the types from `tokio`.
|
|
let cache_path = PathBuf::from(cache_path);
|
|
let program_path = PathBuf::from(program_path);
|
|
|
|
Self {
|
|
cache_path,
|
|
prepare_worker_program_path: program_path.clone(),
|
|
prepare_worker_spawn_timeout: Duration::from_secs(3),
|
|
prepare_workers_soft_max_num: 1,
|
|
prepare_workers_hard_max_num: 1,
|
|
execute_worker_program_path: program_path,
|
|
execute_worker_spawn_timeout: Duration::from_secs(3),
|
|
execute_workers_max_num: 2,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Start the validation host.
|
|
///
|
|
/// Returns a [handle][`ValidationHost`] to the started validation host and the future. The future
|
|
/// must be polled in order for validation host to function.
|
|
///
|
|
/// The future should not return normally but if it does then that indicates an unrecoverable error.
|
|
/// In that case all pending requests will be canceled, dropping the result senders and new ones
|
|
/// will be rejected.
|
|
pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<Output = ()>) {
|
|
let (to_host_tx, to_host_rx) = mpsc::channel(10);
|
|
|
|
let validation_host = ValidationHost { to_host_tx };
|
|
|
|
let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
|
|
metrics.clone(),
|
|
config.prepare_worker_program_path.clone(),
|
|
config.cache_path.clone(),
|
|
config.prepare_worker_spawn_timeout,
|
|
);
|
|
|
|
let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
|
|
metrics.clone(),
|
|
config.prepare_workers_soft_max_num,
|
|
config.prepare_workers_hard_max_num,
|
|
config.cache_path.clone(),
|
|
to_prepare_pool,
|
|
from_prepare_pool,
|
|
);
|
|
|
|
let (to_execute_queue_tx, run_execute_queue) = execute::start(
|
|
metrics,
|
|
config.execute_worker_program_path.to_owned(),
|
|
config.execute_workers_max_num,
|
|
config.execute_worker_spawn_timeout,
|
|
);
|
|
|
|
let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
|
|
let run_sweeper = sweeper_task(to_sweeper_rx);
|
|
|
|
let run_host = async move {
|
|
let artifacts = Artifacts::new(&config.cache_path).await;
|
|
|
|
run(Inner {
|
|
cache_path: config.cache_path,
|
|
cleanup_pulse_interval: Duration::from_secs(3600),
|
|
artifact_ttl: Duration::from_secs(3600 * 24),
|
|
artifacts,
|
|
to_host_rx,
|
|
to_prepare_queue_tx,
|
|
from_prepare_queue_rx,
|
|
to_execute_queue_tx,
|
|
to_sweeper_tx,
|
|
awaiting_prepare: AwaitingPrepare::default(),
|
|
})
|
|
.await
|
|
};
|
|
|
|
let task = async move {
|
|
// Bundle the sub-components' tasks together into a single future.
|
|
futures::select! {
|
|
_ = run_host.fuse() => {},
|
|
_ = run_prepare_queue.fuse() => {},
|
|
_ = run_prepare_pool.fuse() => {},
|
|
_ = run_execute_queue.fuse() => {},
|
|
_ = run_sweeper.fuse() => {},
|
|
};
|
|
};
|
|
|
|
(validation_host, task)
|
|
}
|
|
|
|
/// An execution request that should execute the PVF (known in the context) and send the results
|
|
/// to the given result sender.
|
|
#[derive(Debug)]
|
|
struct PendingExecutionRequest {
|
|
execution_timeout: Duration,
|
|
params: Vec<u8>,
|
|
result_tx: ResultSender,
|
|
}
|
|
|
|
/// A mapping from an artifact ID which is in preparation state to the list of pending execution
|
|
/// requests that should be executed once the artifact's preparation is finished.
|
|
#[derive(Default)]
|
|
struct AwaitingPrepare(HashMap<ArtifactId, Vec<PendingExecutionRequest>>);
|
|
|
|
impl AwaitingPrepare {
|
|
fn add(
|
|
&mut self,
|
|
artifact_id: ArtifactId,
|
|
execution_timeout: Duration,
|
|
params: Vec<u8>,
|
|
result_tx: ResultSender,
|
|
) {
|
|
self.0.entry(artifact_id).or_default().push(PendingExecutionRequest {
|
|
execution_timeout,
|
|
params,
|
|
result_tx,
|
|
});
|
|
}
|
|
|
|
fn take(&mut self, artifact_id: &ArtifactId) -> Vec<PendingExecutionRequest> {
|
|
self.0.remove(artifact_id).unwrap_or_default()
|
|
}
|
|
}
|
|
|
|
struct Inner {
|
|
cache_path: PathBuf,
|
|
cleanup_pulse_interval: Duration,
|
|
artifact_ttl: Duration,
|
|
artifacts: Artifacts,
|
|
|
|
to_host_rx: mpsc::Receiver<ToHost>,
|
|
|
|
to_prepare_queue_tx: mpsc::Sender<prepare::ToQueue>,
|
|
from_prepare_queue_rx: mpsc::UnboundedReceiver<prepare::FromQueue>,
|
|
|
|
to_execute_queue_tx: mpsc::Sender<execute::ToQueue>,
|
|
to_sweeper_tx: mpsc::Sender<PathBuf>,
|
|
|
|
awaiting_prepare: AwaitingPrepare,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct Fatal;
|
|
|
|
async fn run(
|
|
Inner {
|
|
cache_path,
|
|
cleanup_pulse_interval,
|
|
artifact_ttl,
|
|
mut artifacts,
|
|
to_host_rx,
|
|
from_prepare_queue_rx,
|
|
mut to_prepare_queue_tx,
|
|
mut to_execute_queue_tx,
|
|
mut to_sweeper_tx,
|
|
mut awaiting_prepare,
|
|
}: Inner,
|
|
) {
|
|
macro_rules! break_if_fatal {
|
|
($expr:expr) => {
|
|
match $expr {
|
|
Err(Fatal) => {
|
|
gum::error!(
|
|
target: LOG_TARGET,
|
|
"Fatal error occurred, terminating the host. Line: {}",
|
|
line!(),
|
|
);
|
|
break
|
|
},
|
|
Ok(v) => v,
|
|
}
|
|
};
|
|
}
|
|
|
|
let cleanup_pulse = pulse_every(cleanup_pulse_interval).fuse();
|
|
futures::pin_mut!(cleanup_pulse);
|
|
|
|
let mut to_host_rx = to_host_rx.fuse();
|
|
let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse();
|
|
|
|
loop {
|
|
// biased to make it behave deterministically for tests.
|
|
futures::select_biased! {
|
|
() = cleanup_pulse.select_next_some() => {
|
|
// `select_next_some` because we don't expect this to fail, but if it does, we
|
|
// still don't fail. The trade-off is that the compiled cache will start growing
|
|
// in size. That is, however, rather a slow process and hopefully the operator
|
|
// will notice it.
|
|
|
|
break_if_fatal!(handle_cleanup_pulse(
|
|
&cache_path,
|
|
&mut to_sweeper_tx,
|
|
&mut artifacts,
|
|
artifact_ttl,
|
|
).await);
|
|
},
|
|
to_host = to_host_rx.next() => {
|
|
let to_host = match to_host {
|
|
None => {
|
|
// The sending half of the channel has been closed, meaning the
|
|
// `ValidationHost` struct was dropped. Shutting down gracefully.
|
|
break;
|
|
},
|
|
Some(to_host) => to_host,
|
|
};
|
|
|
|
// If the artifact failed before, it could be re-scheduled for preparation here if
|
|
// the preparation failure cooldown has elapsed.
|
|
break_if_fatal!(handle_to_host(
|
|
&cache_path,
|
|
&mut artifacts,
|
|
&mut to_prepare_queue_tx,
|
|
&mut to_execute_queue_tx,
|
|
&mut awaiting_prepare,
|
|
to_host,
|
|
)
|
|
.await);
|
|
},
|
|
from_prepare_queue = from_prepare_queue_rx.next() => {
|
|
let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
|
|
|
|
// Note that the preparation outcome is always reported as concluded.
|
|
//
|
|
// That's because the error conditions are written into the artifact and will be
|
|
// reported at the time of the execution. It potentially, but not necessarily, can
|
|
// be scheduled for execution as a result of this function call, in case there are
|
|
// pending executions.
|
|
//
|
|
// We could be eager in terms of reporting and plumb the result from the preparation
|
|
// worker but we don't for the sake of simplicity.
|
|
break_if_fatal!(handle_prepare_done(
|
|
&cache_path,
|
|
&mut artifacts,
|
|
&mut to_execute_queue_tx,
|
|
&mut awaiting_prepare,
|
|
from_queue,
|
|
).await);
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn handle_to_host(
|
|
cache_path: &Path,
|
|
artifacts: &mut Artifacts,
|
|
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
|
|
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
|
|
awaiting_prepare: &mut AwaitingPrepare,
|
|
to_host: ToHost,
|
|
) -> Result<(), Fatal> {
|
|
match to_host {
|
|
ToHost::PrecheckPvf { pvf, result_tx } => {
|
|
handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
|
|
},
|
|
ToHost::ExecutePvf(inputs) => {
|
|
handle_execute_pvf(
|
|
cache_path,
|
|
artifacts,
|
|
prepare_queue,
|
|
execute_queue,
|
|
awaiting_prepare,
|
|
inputs,
|
|
)
|
|
.await?;
|
|
},
|
|
ToHost::HeadsUp { active_pvfs } =>
|
|
handle_heads_up(artifacts, prepare_queue, active_pvfs).await?,
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Handles PVF prechecking requests.
|
|
///
|
|
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_PREPARATION_TIMEOUT`]).
|
|
///
|
|
/// If the prepare job failed previously, we may retry it under certain conditions.
|
|
async fn handle_precheck_pvf(
|
|
artifacts: &mut Artifacts,
|
|
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
|
|
pvf: Pvf,
|
|
result_sender: PrepareResultSender,
|
|
) -> Result<(), Fatal> {
|
|
let artifact_id = pvf.as_artifact_id();
|
|
|
|
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
|
|
match state {
|
|
ArtifactState::Prepared { last_time_needed, cpu_time_elapsed } => {
|
|
*last_time_needed = SystemTime::now();
|
|
let _ = result_sender.send(Ok(*cpu_time_elapsed));
|
|
},
|
|
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
|
|
waiting_for_response.push(result_sender),
|
|
ArtifactState::FailedToProcess { error, .. } => {
|
|
// Do not retry failed preparation if another pre-check request comes in. We do not retry pre-checking,
|
|
// anyway.
|
|
let _ = result_sender.send(PrepareResult::Err(error.clone()));
|
|
},
|
|
}
|
|
} else {
|
|
artifacts.insert_preparing(artifact_id, vec![result_sender]);
|
|
send_prepare(
|
|
prepare_queue,
|
|
prepare::ToQueue::Enqueue {
|
|
priority: Priority::Normal,
|
|
pvf,
|
|
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
|
|
},
|
|
)
|
|
.await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Handles PVF execution.
|
|
///
|
|
/// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is already a
|
|
/// preparation job, we coalesce the two preparation jobs.
|
|
///
|
|
/// If the prepare job failed previously, we may retry it under certain conditions.
|
|
///
|
|
/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_PREPARATION_TIMEOUT`])
|
|
/// than when prechecking.
|
|
async fn handle_execute_pvf(
|
|
cache_path: &Path,
|
|
artifacts: &mut Artifacts,
|
|
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
|
|
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
|
|
awaiting_prepare: &mut AwaitingPrepare,
|
|
inputs: ExecutePvfInputs,
|
|
) -> Result<(), Fatal> {
|
|
let ExecutePvfInputs { pvf, execution_timeout, params, priority, result_tx } = inputs;
|
|
let artifact_id = pvf.as_artifact_id();
|
|
|
|
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
|
|
match state {
|
|
ArtifactState::Prepared { last_time_needed, .. } => {
|
|
*last_time_needed = SystemTime::now();
|
|
|
|
// This artifact has already been prepared, send it to the execute queue.
|
|
send_execute(
|
|
execute_queue,
|
|
execute::ToQueue::Enqueue {
|
|
artifact: ArtifactPathId::new(artifact_id, cache_path),
|
|
execution_timeout,
|
|
params,
|
|
result_tx,
|
|
},
|
|
)
|
|
.await?;
|
|
},
|
|
ArtifactState::Preparing { .. } => {
|
|
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
|
|
},
|
|
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
|
|
if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
|
|
gum::warn!(
|
|
target: LOG_TARGET,
|
|
?pvf,
|
|
?artifact_id,
|
|
?last_time_failed,
|
|
%num_failures,
|
|
%error,
|
|
"handle_execute_pvf: Re-trying failed PVF preparation."
|
|
);
|
|
|
|
// If we are allowed to retry the failed prepare job, change the state to
|
|
// Preparing and re-queue this job.
|
|
*state = ArtifactState::Preparing {
|
|
waiting_for_response: Vec::new(),
|
|
num_failures: *num_failures,
|
|
};
|
|
send_prepare(
|
|
prepare_queue,
|
|
prepare::ToQueue::Enqueue {
|
|
priority,
|
|
pvf,
|
|
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
|
|
},
|
|
)
|
|
.await?;
|
|
|
|
// Add an execution request that will wait to run after this prepare job has
|
|
// finished.
|
|
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
|
|
} else {
|
|
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
|
|
}
|
|
},
|
|
}
|
|
} else {
|
|
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
|
|
// PVF.
|
|
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
|
|
send_prepare(
|
|
prepare_queue,
|
|
prepare::ToQueue::Enqueue {
|
|
priority,
|
|
pvf,
|
|
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
|
|
},
|
|
)
|
|
.await?;
|
|
|
|
// Add an execution request that will wait to run after this prepare job has finished.
|
|
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_heads_up(
|
|
artifacts: &mut Artifacts,
|
|
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
|
|
active_pvfs: Vec<Pvf>,
|
|
) -> Result<(), Fatal> {
|
|
let now = SystemTime::now();
|
|
|
|
for active_pvf in active_pvfs {
|
|
let artifact_id = active_pvf.as_artifact_id();
|
|
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
|
|
match state {
|
|
ArtifactState::Prepared { last_time_needed, .. } => {
|
|
*last_time_needed = now;
|
|
},
|
|
ArtifactState::Preparing { .. } => {
|
|
// The artifact is already being prepared, so we don't need to do anything.
|
|
},
|
|
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
|
|
if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
|
|
gum::warn!(
|
|
target: LOG_TARGET,
|
|
?active_pvf,
|
|
?artifact_id,
|
|
?last_time_failed,
|
|
%num_failures,
|
|
%error,
|
|
"handle_heads_up: Re-trying failed PVF preparation."
|
|
);
|
|
|
|
// If we are allowed to retry the failed prepare job, change the state to
|
|
// Preparing and re-queue this job.
|
|
*state = ArtifactState::Preparing {
|
|
waiting_for_response: vec![],
|
|
num_failures: *num_failures,
|
|
};
|
|
send_prepare(
|
|
prepare_queue,
|
|
prepare::ToQueue::Enqueue {
|
|
priority: Priority::Normal,
|
|
pvf: active_pvf,
|
|
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
|
|
},
|
|
)
|
|
.await?;
|
|
}
|
|
},
|
|
}
|
|
} else {
|
|
// It's not in the artifacts, so we need to enqueue a job to prepare it.
|
|
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
|
|
|
|
send_prepare(
|
|
prepare_queue,
|
|
prepare::ToQueue::Enqueue {
|
|
priority: Priority::Normal,
|
|
pvf: active_pvf,
|
|
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
|
|
},
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_prepare_done(
|
|
cache_path: &Path,
|
|
artifacts: &mut Artifacts,
|
|
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
|
|
awaiting_prepare: &mut AwaitingPrepare,
|
|
from_queue: prepare::FromQueue,
|
|
) -> Result<(), Fatal> {
|
|
let prepare::FromQueue { artifact_id, result } = from_queue;
|
|
|
|
// Make some sanity checks and extract the current state.
|
|
let state = match artifacts.artifact_state_mut(&artifact_id) {
|
|
None => {
|
|
// before sending request to prepare, the artifact is inserted with `preparing` state;
|
|
// the requests are deduplicated for the same artifact id;
|
|
// there is only one possible state change: prepare is done;
|
|
// thus the artifact cannot be unknown, only preparing;
|
|
// qed.
|
|
never!("an unknown artifact was prepared: {:?}", artifact_id);
|
|
return Ok(())
|
|
},
|
|
Some(ArtifactState::Prepared { .. }) => {
|
|
// before sending request to prepare, the artifact is inserted with `preparing` state;
|
|
// the requests are deduplicated for the same artifact id;
|
|
// there is only one possible state change: prepare is done;
|
|
// thus the artifact cannot be prepared, only preparing;
|
|
// qed.
|
|
never!("the artifact is already prepared: {:?}", artifact_id);
|
|
return Ok(())
|
|
},
|
|
Some(ArtifactState::FailedToProcess { .. }) => {
|
|
// The reasoning is similar to the above, the artifact cannot be
|
|
// processed at this point.
|
|
never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
|
|
return Ok(())
|
|
},
|
|
Some(state @ ArtifactState::Preparing { .. }) => state,
|
|
};
|
|
|
|
let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } =
|
|
state
|
|
{
|
|
for result_sender in waiting_for_response.drain(..) {
|
|
let _ = result_sender.send(result.clone());
|
|
}
|
|
num_failures
|
|
} else {
|
|
never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed");
|
|
return Ok(())
|
|
};
|
|
|
|
// It's finally time to dispatch all the execution requests that were waiting for this artifact
|
|
// to be prepared.
|
|
let pending_requests = awaiting_prepare.take(&artifact_id);
|
|
for PendingExecutionRequest { execution_timeout, params, result_tx } in pending_requests {
|
|
if result_tx.is_canceled() {
|
|
// Preparation could've taken quite a bit of time and the requester may be not interested
|
|
// in execution anymore, in which case we just skip the request.
|
|
continue
|
|
}
|
|
|
|
// Don't send failed artifacts to the execution's queue.
|
|
if let Err(ref error) = result {
|
|
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
|
|
continue
|
|
}
|
|
|
|
send_execute(
|
|
execute_queue,
|
|
execute::ToQueue::Enqueue {
|
|
artifact: ArtifactPathId::new(artifact_id.clone(), cache_path),
|
|
execution_timeout,
|
|
params,
|
|
result_tx,
|
|
},
|
|
)
|
|
.await?;
|
|
}
|
|
|
|
*state = match result {
|
|
Ok(cpu_time_elapsed) =>
|
|
ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed },
|
|
Err(error) => {
|
|
let last_time_failed = SystemTime::now();
|
|
let num_failures = *num_failures + 1;
|
|
|
|
gum::warn!(
|
|
target: LOG_TARGET,
|
|
?artifact_id,
|
|
time_failed = ?last_time_failed,
|
|
%num_failures,
|
|
"artifact preparation failed: {}",
|
|
error
|
|
);
|
|
ArtifactState::FailedToProcess { last_time_failed, num_failures, error }
|
|
},
|
|
};
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_prepare(
|
|
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
|
|
to_queue: prepare::ToQueue,
|
|
) -> Result<(), Fatal> {
|
|
prepare_queue.send(to_queue).await.map_err(|_| Fatal)
|
|
}
|
|
|
|
async fn send_execute(
|
|
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
|
|
to_queue: execute::ToQueue,
|
|
) -> Result<(), Fatal> {
|
|
execute_queue.send(to_queue).await.map_err(|_| Fatal)
|
|
}
|
|
|
|
async fn handle_cleanup_pulse(
|
|
cache_path: &Path,
|
|
sweeper_tx: &mut mpsc::Sender<PathBuf>,
|
|
artifacts: &mut Artifacts,
|
|
artifact_ttl: Duration,
|
|
) -> Result<(), Fatal> {
|
|
let to_remove = artifacts.prune(artifact_ttl);
|
|
gum::debug!(
|
|
target: LOG_TARGET,
|
|
"PVF pruning: {} artifacts reached their end of life",
|
|
to_remove.len(),
|
|
);
|
|
for artifact_id in to_remove {
|
|
gum::debug!(
|
|
target: LOG_TARGET,
|
|
validation_code_hash = ?artifact_id.code_hash,
|
|
"pruning artifact",
|
|
);
|
|
let artifact_path = artifact_id.path(cache_path);
|
|
sweeper_tx.send(artifact_path).await.map_err(|_| Fatal)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// A simple task which sole purpose is to delete files thrown at it.
|
|
async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
|
|
loop {
|
|
match sweeper_rx.next().await {
|
|
None => break,
|
|
Some(condemned) => {
|
|
let result = tokio::fs::remove_file(&condemned).await;
|
|
gum::trace!(
|
|
target: LOG_TARGET,
|
|
?result,
|
|
"Sweeping the artifact file {}",
|
|
condemned.display(),
|
|
);
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Check if the conditions to retry a prepare job have been met.
|
|
fn can_retry_prepare_after_failure(
|
|
last_time_failed: SystemTime,
|
|
num_failures: u32,
|
|
error: &PrepareError,
|
|
) -> bool {
|
|
if error.is_deterministic() {
|
|
// This error is considered deterministic, so it will probably be reproducible. Don't retry.
|
|
return false
|
|
}
|
|
|
|
// Retry if the retry cooldown has elapsed and if we have already retried less than `NUM_PREPARE_RETRIES` times. IO
|
|
// errors may resolve themselves.
|
|
SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
|
|
num_failures <= NUM_PREPARE_RETRIES
|
|
}
|
|
|
|
/// A stream that yields a pulse continuously at a given interval.
|
|
fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> {
|
|
futures::stream::unfold(interval, {
|
|
|interval| async move {
|
|
futures_timer::Delay::new(interval).await;
|
|
Some(((), interval))
|
|
}
|
|
})
|
|
.map(|_| ())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::{InvalidCandidate, PrepareError};
|
|
use assert_matches::assert_matches;
|
|
use futures::future::BoxFuture;
|
|
|
|
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
|
|
|
|
#[tokio::test]
|
|
async fn pulse_test() {
|
|
let pulse = pulse_every(Duration::from_millis(100));
|
|
futures::pin_mut!(pulse);
|
|
|
|
for _ in 0usize..5usize {
|
|
let start = std::time::Instant::now();
|
|
let _ = pulse.next().await.unwrap();
|
|
|
|
let el = start.elapsed().as_millis();
|
|
assert!(el > 50 && el < 150, "{}", el);
|
|
}
|
|
}
|
|
|
|
/// Creates a new PVF which artifact id can be uniquely identified by the given number.
|
|
fn artifact_id(descriminator: u32) -> ArtifactId {
|
|
Pvf::from_discriminator(descriminator).as_artifact_id()
|
|
}
|
|
|
|
fn artifact_path(descriminator: u32) -> PathBuf {
|
|
artifact_id(descriminator).path(&PathBuf::from(std::env::temp_dir())).to_owned()
|
|
}
|
|
|
|
struct Builder {
|
|
cleanup_pulse_interval: Duration,
|
|
artifact_ttl: Duration,
|
|
artifacts: Artifacts,
|
|
}
|
|
|
|
impl Builder {
|
|
fn default() -> Self {
|
|
Self {
|
|
// these are selected high to not interfere in tests in which pruning is irrelevant.
|
|
cleanup_pulse_interval: Duration::from_secs(3600),
|
|
artifact_ttl: Duration::from_secs(3600),
|
|
|
|
artifacts: Artifacts::empty(),
|
|
}
|
|
}
|
|
|
|
fn build(self) -> Test {
|
|
Test::new(self)
|
|
}
|
|
}
|
|
|
|
struct Test {
|
|
to_host_tx: Option<mpsc::Sender<ToHost>>,
|
|
|
|
to_prepare_queue_rx: mpsc::Receiver<prepare::ToQueue>,
|
|
from_prepare_queue_tx: mpsc::UnboundedSender<prepare::FromQueue>,
|
|
to_execute_queue_rx: mpsc::Receiver<execute::ToQueue>,
|
|
to_sweeper_rx: mpsc::Receiver<PathBuf>,
|
|
|
|
run: BoxFuture<'static, ()>,
|
|
}
|
|
|
|
impl Test {
|
|
fn new(Builder { cleanup_pulse_interval, artifact_ttl, artifacts }: Builder) -> Self {
|
|
let cache_path = PathBuf::from(std::env::temp_dir());
|
|
|
|
let (to_host_tx, to_host_rx) = mpsc::channel(10);
|
|
let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
|
|
let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
|
|
let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10);
|
|
let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10);
|
|
|
|
let run = run(Inner {
|
|
cache_path,
|
|
cleanup_pulse_interval,
|
|
artifact_ttl,
|
|
artifacts,
|
|
to_host_rx,
|
|
to_prepare_queue_tx,
|
|
from_prepare_queue_rx,
|
|
to_execute_queue_tx,
|
|
to_sweeper_tx,
|
|
awaiting_prepare: AwaitingPrepare::default(),
|
|
})
|
|
.boxed();
|
|
|
|
Self {
|
|
to_host_tx: Some(to_host_tx),
|
|
to_prepare_queue_rx,
|
|
from_prepare_queue_tx,
|
|
to_execute_queue_rx,
|
|
to_sweeper_rx,
|
|
run,
|
|
}
|
|
}
|
|
|
|
fn host_handle(&mut self) -> ValidationHost {
|
|
let to_host_tx = self.to_host_tx.take().unwrap();
|
|
ValidationHost { to_host_tx }
|
|
}
|
|
|
|
async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
|
|
where
|
|
T: Send,
|
|
{
|
|
run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
|
|
}
|
|
|
|
async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
|
|
let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
|
|
run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
|
|
.await
|
|
}
|
|
|
|
async fn poll_and_recv_to_execute_queue(&mut self) -> execute::ToQueue {
|
|
let to_execute_queue_rx = &mut self.to_execute_queue_rx;
|
|
run_until(&mut self.run, async { to_execute_queue_rx.next().await.unwrap() }.boxed())
|
|
.await
|
|
}
|
|
|
|
async fn poll_ensure_to_prepare_queue_is_empty(&mut self) {
|
|
use futures_timer::Delay;
|
|
|
|
let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
|
|
run_until(
|
|
&mut self.run,
|
|
async {
|
|
futures::select! {
|
|
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
|
|
_ = to_prepare_queue_rx.next().fuse() => {
|
|
panic!("the prepare queue is supposed to be empty")
|
|
}
|
|
}
|
|
}
|
|
.boxed(),
|
|
)
|
|
.await
|
|
}
|
|
|
|
async fn poll_ensure_to_execute_queue_is_empty(&mut self) {
|
|
use futures_timer::Delay;
|
|
|
|
let to_execute_queue_rx = &mut self.to_execute_queue_rx;
|
|
run_until(
|
|
&mut self.run,
|
|
async {
|
|
futures::select! {
|
|
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
|
|
_ = to_execute_queue_rx.next().fuse() => {
|
|
panic!("the execute queue is supposed to be empty")
|
|
}
|
|
}
|
|
}
|
|
.boxed(),
|
|
)
|
|
.await
|
|
}
|
|
|
|
async fn poll_ensure_to_sweeper_is_empty(&mut self) {
|
|
use futures_timer::Delay;
|
|
|
|
let to_sweeper_rx = &mut self.to_sweeper_rx;
|
|
run_until(
|
|
&mut self.run,
|
|
async {
|
|
futures::select! {
|
|
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
|
|
msg = to_sweeper_rx.next().fuse() => {
|
|
panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
|
|
}
|
|
}
|
|
}
|
|
.boxed(),
|
|
)
|
|
.await
|
|
}
|
|
}
|
|
|
|
async fn run_until<R>(
|
|
task: &mut (impl Future<Output = ()> + Unpin),
|
|
mut fut: (impl Future<Output = R> + Unpin),
|
|
) -> R {
|
|
use std::task::Poll;
|
|
|
|
let start = std::time::Instant::now();
|
|
let fut = &mut fut;
|
|
loop {
|
|
if start.elapsed() > std::time::Duration::from_secs(2) {
|
|
// We expect that this will take only a couple of iterations and thus to take way
|
|
// less than a second.
|
|
panic!("timeout");
|
|
}
|
|
|
|
if let Poll::Ready(r) = futures::poll!(&mut *fut) {
|
|
break r
|
|
}
|
|
|
|
if futures::poll!(&mut *task).is_ready() {
|
|
panic!()
|
|
}
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shutdown_on_handle_drop() {
|
|
let test = Builder::default().build();
|
|
|
|
let join_handle = tokio::task::spawn(test.run);
|
|
|
|
// Dropping the handle will lead to conclusion of the read part and thus will make the event
|
|
// loop to stop, which in turn will resolve the join handle.
|
|
drop(test.to_host_tx);
|
|
join_handle.await.unwrap();
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn pruning() {
|
|
let mock_now = SystemTime::now() - Duration::from_millis(1000);
|
|
|
|
let mut builder = Builder::default();
|
|
builder.cleanup_pulse_interval = Duration::from_millis(100);
|
|
builder.artifact_ttl = Duration::from_millis(500);
|
|
builder.artifacts.insert_prepared(artifact_id(1), mock_now, Duration::default());
|
|
builder.artifacts.insert_prepared(artifact_id(2), mock_now, Duration::default());
|
|
let mut test = builder.build();
|
|
let mut host = test.host_handle();
|
|
|
|
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
|
|
|
|
let to_sweeper_rx = &mut test.to_sweeper_rx;
|
|
run_until(
|
|
&mut test.run,
|
|
async {
|
|
assert_eq!(to_sweeper_rx.next().await.unwrap(), artifact_path(2));
|
|
}
|
|
.boxed(),
|
|
)
|
|
.await;
|
|
|
|
// Extend TTL for the first artifact and make sure we don't receive another file removal
|
|
// request.
|
|
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
|
|
test.poll_ensure_to_sweeper_is_empty().await;
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn execute_pvf_requests() {
|
|
let mut test = Builder::default().build();
|
|
let mut host = test.host_handle();
|
|
|
|
let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf1".to_vec(),
|
|
Priority::Normal,
|
|
result_tx,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf1".to_vec(),
|
|
Priority::Critical,
|
|
result_tx,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (result_tx, result_rx_pvf_2) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(2),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf2".to_vec(),
|
|
Priority::Normal,
|
|
result_tx,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(1),
|
|
result: Ok(Duration::default()),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
let result_tx_pvf_1_1 = assert_matches!(
|
|
test.poll_and_recv_to_execute_queue().await,
|
|
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
|
|
);
|
|
let result_tx_pvf_1_2 = assert_matches!(
|
|
test.poll_and_recv_to_execute_queue().await,
|
|
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
|
|
);
|
|
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(2),
|
|
result: Ok(Duration::default()),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
let result_tx_pvf_2 = assert_matches!(
|
|
test.poll_and_recv_to_execute_queue().await,
|
|
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
|
|
);
|
|
|
|
result_tx_pvf_1_1
|
|
.send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)))
|
|
.unwrap();
|
|
assert_matches!(
|
|
result_rx_pvf_1_1.now_or_never().unwrap().unwrap(),
|
|
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
|
|
);
|
|
|
|
result_tx_pvf_1_2
|
|
.send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)))
|
|
.unwrap();
|
|
assert_matches!(
|
|
result_rx_pvf_1_2.now_or_never().unwrap().unwrap(),
|
|
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
|
|
);
|
|
|
|
result_tx_pvf_2
|
|
.send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)))
|
|
.unwrap();
|
|
assert_matches!(
|
|
result_rx_pvf_2.now_or_never().unwrap().unwrap(),
|
|
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn precheck_pvf() {
|
|
let mut test = Builder::default().build();
|
|
let mut host = test.host_handle();
|
|
|
|
// First, test a simple precheck request.
|
|
let (result_tx, result_rx) = oneshot::channel();
|
|
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
|
|
|
|
// The queue received the prepare request.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
// Send `Ok` right away and poll the host.
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(1),
|
|
result: Ok(Duration::default()),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
// No pending execute requests.
|
|
test.poll_ensure_to_execute_queue_is_empty().await;
|
|
// Received the precheck result.
|
|
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
|
|
|
|
// Send multiple requests for the same PVF.
|
|
let mut precheck_receivers = Vec::new();
|
|
for _ in 0..3 {
|
|
let (result_tx, result_rx) = oneshot::channel();
|
|
host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
|
|
precheck_receivers.push(result_rx);
|
|
}
|
|
// Received prepare request.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(2),
|
|
result: Err(PrepareError::TimedOut),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
test.poll_ensure_to_execute_queue_is_empty().await;
|
|
for result_rx in precheck_receivers {
|
|
assert_matches!(
|
|
result_rx.now_or_never().unwrap().unwrap(),
|
|
Err(PrepareError::TimedOut)
|
|
);
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_prepare_done() {
|
|
let mut test = Builder::default().build();
|
|
let mut host = test.host_handle();
|
|
|
|
// Test mixed cases of receiving execute and precheck requests
|
|
// for the same PVF.
|
|
|
|
// Send PVF for the execution and request the prechecking for it.
|
|
let (result_tx, result_rx_execute) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf2".to_vec(),
|
|
Priority::Critical,
|
|
result_tx,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
|
|
let (result_tx, result_rx) = oneshot::channel();
|
|
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
|
|
|
|
// Suppose the preparation failed, the execution queue is empty and both
|
|
// "clients" receive their results.
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(1),
|
|
result: Err(PrepareError::TimedOut),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
test.poll_ensure_to_execute_queue_is_empty().await;
|
|
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut));
|
|
assert_matches!(
|
|
result_rx_execute.now_or_never().unwrap().unwrap(),
|
|
Err(ValidationError::InternalError(_))
|
|
);
|
|
|
|
// Reversed case: first send multiple precheck requests, then ask for an execution.
|
|
let mut precheck_receivers = Vec::new();
|
|
for _ in 0..3 {
|
|
let (result_tx, result_rx) = oneshot::channel();
|
|
host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
|
|
precheck_receivers.push(result_rx);
|
|
}
|
|
|
|
let (result_tx, _result_rx_execute) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(2),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf2".to_vec(),
|
|
Priority::Critical,
|
|
result_tx,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
// Received prepare request.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(2),
|
|
result: Ok(Duration::default()),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
// The execute queue receives new request, preckecking is finished and we can
|
|
// fetch results.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_execute_queue().await,
|
|
execute::ToQueue::Enqueue { .. }
|
|
);
|
|
for result_rx in precheck_receivers {
|
|
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
|
|
}
|
|
}
|
|
|
|
// Test that multiple prechecking requests do not trigger preparation retries if the first one
|
|
// failed.
|
|
#[tokio::test]
|
|
async fn test_precheck_prepare_no_retry() {
|
|
let mut test = Builder::default().build();
|
|
let mut host = test.host_handle();
|
|
|
|
// Submit a precheck request that fails.
|
|
let (result_tx, result_rx) = oneshot::channel();
|
|
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
|
|
|
|
// The queue received the prepare request.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
// Send a PrepareError.
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(1),
|
|
result: Err(PrepareError::TimedOut),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// The result should contain the error.
|
|
let result = test.poll_and_recv_result(result_rx).await;
|
|
assert_matches!(result, Err(PrepareError::TimedOut));
|
|
|
|
// Submit another precheck request.
|
|
let (result_tx_2, result_rx_2) = oneshot::channel();
|
|
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap();
|
|
|
|
// Assert the prepare queue is empty.
|
|
test.poll_ensure_to_prepare_queue_is_empty().await;
|
|
|
|
// The result should contain the original error.
|
|
let result = test.poll_and_recv_result(result_rx_2).await;
|
|
assert_matches!(result, Err(PrepareError::TimedOut));
|
|
|
|
// Pause for enough time to reset the cooldown for this failed prepare request.
|
|
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
|
|
|
|
// Submit another precheck request.
|
|
let (result_tx_3, result_rx_3) = oneshot::channel();
|
|
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap();
|
|
|
|
// Assert the prepare queue is empty - we do not retry for precheck requests.
|
|
test.poll_ensure_to_prepare_queue_is_empty().await;
|
|
|
|
// The result should still contain the original error.
|
|
let result = test.poll_and_recv_result(result_rx_3).await;
|
|
assert_matches!(result, Err(PrepareError::TimedOut));
|
|
}
|
|
|
|
// Test that multiple execution requests trigger preparation retries if the first one failed due
|
|
// to a potentially non-reproducible error.
|
|
#[tokio::test]
|
|
async fn test_execute_prepare_retry() {
|
|
let mut test = Builder::default().build();
|
|
let mut host = test.host_handle();
|
|
|
|
// Submit a execute request that fails.
|
|
let (result_tx, result_rx) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf".to_vec(),
|
|
Priority::Critical,
|
|
result_tx,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// The queue received the prepare request.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
// Send a PrepareError.
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(1),
|
|
result: Err(PrepareError::TimedOut),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// The result should contain the error.
|
|
let result = test.poll_and_recv_result(result_rx).await;
|
|
assert_matches!(result, Err(ValidationError::InternalError(_)));
|
|
|
|
// Submit another execute request. We shouldn't try to prepare again, yet.
|
|
let (result_tx_2, result_rx_2) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf".to_vec(),
|
|
Priority::Critical,
|
|
result_tx_2,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Assert the prepare queue is empty.
|
|
test.poll_ensure_to_prepare_queue_is_empty().await;
|
|
|
|
// The result should contain the original error.
|
|
let result = test.poll_and_recv_result(result_rx_2).await;
|
|
assert_matches!(result, Err(ValidationError::InternalError(_)));
|
|
|
|
// Pause for enough time to reset the cooldown for this failed prepare request.
|
|
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
|
|
|
|
// Submit another execute request.
|
|
let (result_tx_3, result_rx_3) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf".to_vec(),
|
|
Priority::Critical,
|
|
result_tx_3,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Assert the prepare queue contains the request.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(1),
|
|
result: Ok(Duration::default()),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// Preparation should have been retried and succeeded this time.
|
|
let result_tx_3 = assert_matches!(
|
|
test.poll_and_recv_to_execute_queue().await,
|
|
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
|
|
);
|
|
|
|
// Send an error for the execution here, just so we can check the result receiver is still
|
|
// alive.
|
|
result_tx_3
|
|
.send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)))
|
|
.unwrap();
|
|
assert_matches!(
|
|
result_rx_3.now_or_never().unwrap().unwrap(),
|
|
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
|
|
);
|
|
}
|
|
|
|
// Test that multiple execution requests don't trigger preparation retries if the first one
|
|
// failed due to a reproducible error (e.g. Prevalidation).
|
|
#[tokio::test]
|
|
async fn test_execute_prepare_no_retry() {
|
|
let mut test = Builder::default().build();
|
|
let mut host = test.host_handle();
|
|
|
|
// Submit an execute request that fails.
|
|
let (result_tx, result_rx) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf".to_vec(),
|
|
Priority::Critical,
|
|
result_tx,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// The queue received the prepare request.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
// Send a PrepareError.
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(1),
|
|
result: Err(PrepareError::Prevalidation("reproducible error".into())),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// The result should contain the error.
|
|
let result = test.poll_and_recv_result(result_rx).await;
|
|
assert_matches!(
|
|
result,
|
|
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
|
|
);
|
|
|
|
// Submit another execute request.
|
|
let (result_tx_2, result_rx_2) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf".to_vec(),
|
|
Priority::Critical,
|
|
result_tx_2,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Assert the prepare queue is empty.
|
|
test.poll_ensure_to_prepare_queue_is_empty().await;
|
|
|
|
// The result should contain the original error.
|
|
let result = test.poll_and_recv_result(result_rx_2).await;
|
|
assert_matches!(
|
|
result,
|
|
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
|
|
);
|
|
|
|
// Pause for enough time to reset the cooldown for this failed prepare request.
|
|
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
|
|
|
|
// Submit another execute request.
|
|
let (result_tx_3, result_rx_3) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf".to_vec(),
|
|
Priority::Critical,
|
|
result_tx_3,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Assert the prepare queue is empty - we do not retry for prevalidation errors.
|
|
test.poll_ensure_to_prepare_queue_is_empty().await;
|
|
|
|
// The result should still contain the original error.
|
|
let result = test.poll_and_recv_result(result_rx_3).await;
|
|
assert_matches!(
|
|
result,
|
|
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
|
|
);
|
|
}
|
|
|
|
// Test that multiple heads-up requests trigger preparation retries if the first one failed.
|
|
#[tokio::test]
|
|
async fn test_heads_up_prepare_retry() {
|
|
let mut test = Builder::default().build();
|
|
let mut host = test.host_handle();
|
|
|
|
// Submit a heads-up request that fails.
|
|
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
|
|
|
|
// The queue received the prepare request.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
// Send a PrepareError.
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(1),
|
|
result: Err(PrepareError::TimedOut),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// Submit another heads-up request.
|
|
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
|
|
|
|
// Assert the prepare queue is empty.
|
|
test.poll_ensure_to_prepare_queue_is_empty().await;
|
|
|
|
// Pause for enough time to reset the cooldown for this failed prepare request.
|
|
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
|
|
|
|
// Submit another heads-up request.
|
|
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
|
|
|
|
// Assert the prepare queue contains the request.
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn cancellation() {
|
|
let mut test = Builder::default().build();
|
|
let mut host = test.host_handle();
|
|
|
|
let (result_tx, result_rx) = oneshot::channel();
|
|
host.execute_pvf(
|
|
Pvf::from_discriminator(1),
|
|
TEST_EXECUTION_TIMEOUT,
|
|
b"pvf1".to_vec(),
|
|
Priority::Normal,
|
|
result_tx,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_matches!(
|
|
test.poll_and_recv_to_prepare_queue().await,
|
|
prepare::ToQueue::Enqueue { .. }
|
|
);
|
|
|
|
test.from_prepare_queue_tx
|
|
.send(prepare::FromQueue {
|
|
artifact_id: artifact_id(1),
|
|
result: Ok(Duration::default()),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
drop(result_rx);
|
|
|
|
test.poll_ensure_to_execute_queue_is_empty().await;
|
|
}
|
|
}
|