mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 21:01:05 +00:00
Use PVF code paired with executor params wherever possible (#6742)
This commit is contained in:
@@ -104,14 +104,14 @@ pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, sc_executor_common::error
|
||||
/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk.
|
||||
pub fn prepare(
|
||||
blob: RuntimeBlob,
|
||||
executor_params: ExecutorParams,
|
||||
executor_params: &ExecutorParams,
|
||||
) -> Result<Vec<u8>, sc_executor_common::error::WasmError> {
|
||||
let semantics = params_to_wasmtime_semantics(executor_params)
|
||||
.map_err(|e| sc_executor_common::error::WasmError::Other(e))?;
|
||||
sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
|
||||
}
|
||||
|
||||
fn params_to_wasmtime_semantics(par: ExecutorParams) -> Result<Semantics, String> {
|
||||
fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result<Semantics, String> {
|
||||
let mut sem = DEFAULT_CONFIG.semantics.clone();
|
||||
let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() {
|
||||
stack_limit
|
||||
@@ -186,7 +186,7 @@ impl Executor {
|
||||
TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?;
|
||||
|
||||
let mut config = DEFAULT_CONFIG.clone();
|
||||
config.semantics = params_to_wasmtime_semantics(params)?;
|
||||
config.semantics = params_to_wasmtime_semantics(¶ms)?;
|
||||
|
||||
Ok(Self { thread_pool, spawner, config })
|
||||
}
|
||||
|
||||
@@ -523,7 +523,7 @@ async fn handle_execute_pvf(
|
||||
artifact: ArtifactPathId::new(artifact_id, cache_path),
|
||||
execution_timeout,
|
||||
params,
|
||||
executor_params: pvf_with_params.executor_params(),
|
||||
executor_params: (*pvf_with_params.executor_params()).clone(),
|
||||
result_tx,
|
||||
},
|
||||
)
|
||||
@@ -534,7 +534,7 @@ async fn handle_execute_pvf(
|
||||
artifact_id,
|
||||
execution_timeout,
|
||||
params,
|
||||
pvf_with_params.executor_params(),
|
||||
(*pvf_with_params.executor_params()).clone(),
|
||||
result_tx,
|
||||
);
|
||||
},
|
||||
@@ -556,7 +556,7 @@ async fn handle_execute_pvf(
|
||||
waiting_for_response: Vec::new(),
|
||||
num_failures: *num_failures,
|
||||
};
|
||||
let executor_params = pvf_with_params.executor_params().clone();
|
||||
let executor_params = (*pvf_with_params.executor_params()).clone();
|
||||
send_prepare(
|
||||
prepare_queue,
|
||||
prepare::ToQueue::Enqueue {
|
||||
@@ -584,7 +584,7 @@ async fn handle_execute_pvf(
|
||||
} else {
|
||||
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
|
||||
// PVF.
|
||||
let executor_params = pvf_with_params.executor_params();
|
||||
let executor_params = (*pvf_with_params.executor_params()).clone();
|
||||
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
|
||||
send_prepare(
|
||||
prepare_queue,
|
||||
|
||||
@@ -110,7 +110,7 @@ pub use sp_tracing;
|
||||
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
|
||||
pub use prepare::PrepareStats;
|
||||
pub use priority::Priority;
|
||||
pub use pvf::{Pvf, PvfWithExecutorParams};
|
||||
pub use pvf::PvfWithExecutorParams;
|
||||
|
||||
pub use host::{start, Config, ValidationHost};
|
||||
pub use metrics::Metrics;
|
||||
|
||||
@@ -18,6 +18,7 @@ use super::worker::{self, Outcome};
|
||||
use crate::{
|
||||
error::{PrepareError, PrepareResult},
|
||||
metrics::Metrics,
|
||||
pvf::PvfWithExecutorParams,
|
||||
worker_common::{IdleWorker, WorkerHandle},
|
||||
LOG_TARGET,
|
||||
};
|
||||
@@ -25,12 +26,10 @@ use always_assert::never;
|
||||
use futures::{
|
||||
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
|
||||
};
|
||||
use polkadot_primitives::vstaging::ExecutorParams;
|
||||
use slotmap::HopSlotMap;
|
||||
use std::{
|
||||
fmt,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
task::Poll,
|
||||
time::Duration,
|
||||
};
|
||||
@@ -68,9 +67,8 @@ pub enum ToPool {
|
||||
/// sent until either `Concluded` or `Rip` message is received.
|
||||
StartWork {
|
||||
worker: Worker,
|
||||
code: Arc<Vec<u8>>,
|
||||
pvf_with_params: PvfWithExecutorParams,
|
||||
artifact_path: PathBuf,
|
||||
executor_params: ExecutorParams,
|
||||
preparation_timeout: Duration,
|
||||
},
|
||||
}
|
||||
@@ -216,7 +214,7 @@ fn handle_to_pool(
|
||||
metrics.prepare_worker().on_begin_spawn();
|
||||
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
|
||||
},
|
||||
ToPool::StartWork { worker, code, artifact_path, executor_params, preparation_timeout } => {
|
||||
ToPool::StartWork { worker, pvf_with_params, artifact_path, preparation_timeout } => {
|
||||
if let Some(data) = spawned.get_mut(worker) {
|
||||
if let Some(idle) = data.idle.take() {
|
||||
let preparation_timer = metrics.time_preparation();
|
||||
@@ -225,10 +223,9 @@ fn handle_to_pool(
|
||||
metrics.clone(),
|
||||
worker,
|
||||
idle,
|
||||
code,
|
||||
pvf_with_params,
|
||||
cache_path.to_owned(),
|
||||
artifact_path,
|
||||
executor_params,
|
||||
preparation_timeout,
|
||||
preparation_timer,
|
||||
)
|
||||
@@ -275,20 +272,18 @@ async fn start_work_task<Timer>(
|
||||
metrics: Metrics,
|
||||
worker: Worker,
|
||||
idle: IdleWorker,
|
||||
code: Arc<Vec<u8>>,
|
||||
pvf_with_params: PvfWithExecutorParams,
|
||||
cache_path: PathBuf,
|
||||
artifact_path: PathBuf,
|
||||
executor_params: ExecutorParams,
|
||||
preparation_timeout: Duration,
|
||||
_preparation_timer: Option<Timer>,
|
||||
) -> PoolEvent {
|
||||
let outcome = worker::start_work(
|
||||
&metrics,
|
||||
idle,
|
||||
code,
|
||||
pvf_with_params,
|
||||
&cache_path,
|
||||
artifact_path,
|
||||
executor_params,
|
||||
preparation_timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -445,9 +445,8 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal
|
||||
&mut queue.to_pool_tx,
|
||||
pool::ToPool::StartWork {
|
||||
worker,
|
||||
code: job_data.pvf_with_params.code(),
|
||||
pvf_with_params: job_data.pvf_with_params.clone(),
|
||||
artifact_path,
|
||||
executor_params: job_data.pvf_with_params.executor_params(),
|
||||
preparation_timeout: job_data.preparation_timeout,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -24,6 +24,7 @@ use crate::{
|
||||
error::{PrepareError, PrepareResult},
|
||||
metrics::Metrics,
|
||||
prepare::PrepareStats,
|
||||
pvf::PvfWithExecutorParams,
|
||||
worker_common::{
|
||||
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
|
||||
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
|
||||
@@ -34,12 +35,12 @@ use crate::{
|
||||
use cpu_time::ProcessTime;
|
||||
use futures::{pin_mut, select_biased, FutureExt};
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
use polkadot_primitives::vstaging::ExecutorParams;
|
||||
|
||||
use sp_core::hexdisplay::HexDisplay;
|
||||
use std::{
|
||||
panic,
|
||||
path::{Path, PathBuf},
|
||||
sync::{mpsc::channel, Arc},
|
||||
sync::mpsc::channel,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{io, net::UnixStream};
|
||||
@@ -83,10 +84,9 @@ pub enum Outcome {
|
||||
pub async fn start_work(
|
||||
metrics: &Metrics,
|
||||
worker: IdleWorker,
|
||||
code: Arc<Vec<u8>>,
|
||||
pvf_with_params: PvfWithExecutorParams,
|
||||
cache_path: &Path,
|
||||
artifact_path: PathBuf,
|
||||
executor_params: ExecutorParams,
|
||||
preparation_timeout: Duration,
|
||||
) -> Outcome {
|
||||
let IdleWorker { stream, pid } = worker;
|
||||
@@ -100,7 +100,7 @@ pub async fn start_work(
|
||||
|
||||
with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
|
||||
if let Err(err) =
|
||||
send_request(&mut stream, code, &tmp_file, &executor_params, preparation_timeout).await
|
||||
send_request(&mut stream, pvf_with_params, &tmp_file, preparation_timeout).await
|
||||
{
|
||||
gum::warn!(
|
||||
target: LOG_TARGET,
|
||||
@@ -273,22 +273,27 @@ where
|
||||
|
||||
async fn send_request(
|
||||
stream: &mut UnixStream,
|
||||
code: Arc<Vec<u8>>,
|
||||
pvf_with_params: PvfWithExecutorParams,
|
||||
tmp_file: &Path,
|
||||
executor_params: &ExecutorParams,
|
||||
preparation_timeout: Duration,
|
||||
) -> io::Result<()> {
|
||||
framed_send(stream, &code).await?;
|
||||
framed_send(stream, &pvf_with_params.encode()).await?;
|
||||
framed_send(stream, path_to_bytes(tmp_file)).await?;
|
||||
framed_send(stream, &executor_params.encode()).await?;
|
||||
framed_send(stream, &preparation_timeout.encode()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_request(
|
||||
stream: &mut UnixStream,
|
||||
) -> io::Result<(Vec<u8>, PathBuf, ExecutorParams, Duration)> {
|
||||
let code = framed_recv(stream).await?;
|
||||
) -> io::Result<(PvfWithExecutorParams, PathBuf, Duration)> {
|
||||
let pvf_with_params = framed_recv(stream).await?;
|
||||
let pvf_with_params =
|
||||
PvfWithExecutorParams::decode(&mut &pvf_with_params[..]).map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("prepare pvf recv_request: failed to decode PvfWithExecutorParams: {}", e),
|
||||
)
|
||||
})?;
|
||||
let tmp_file = framed_recv(stream).await?;
|
||||
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
|
||||
io::Error::new(
|
||||
@@ -296,13 +301,6 @@ async fn recv_request(
|
||||
"prepare pvf recv_request: non utf-8 artifact path".to_string(),
|
||||
)
|
||||
})?;
|
||||
let executor_params_enc = framed_recv(stream).await?;
|
||||
let executor_params = ExecutorParams::decode(&mut &executor_params_enc[..]).map_err(|_| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"prepare pvf recv_request: failed to decode ExecutorParams".to_string(),
|
||||
)
|
||||
})?;
|
||||
let preparation_timeout = framed_recv(stream).await?;
|
||||
let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| {
|
||||
io::Error::new(
|
||||
@@ -310,7 +308,7 @@ async fn recv_request(
|
||||
format!("prepare pvf recv_request: failed to decode duration: {:?}", e),
|
||||
)
|
||||
})?;
|
||||
Ok((code, tmp_file, executor_params, preparation_timeout))
|
||||
Ok((pvf_with_params, tmp_file, preparation_timeout))
|
||||
}
|
||||
|
||||
async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
|
||||
@@ -362,8 +360,7 @@ pub fn worker_entrypoint(socket_path: &str) {
|
||||
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
|
||||
loop {
|
||||
let worker_pid = std::process::id();
|
||||
let (code, dest, executor_params, preparation_timeout) =
|
||||
recv_request(&mut stream).await?;
|
||||
let (pvf_with_params, dest, preparation_timeout) = recv_request(&mut stream).await?;
|
||||
gum::debug!(
|
||||
target: LOG_TARGET,
|
||||
%worker_pid,
|
||||
@@ -388,7 +385,7 @@ pub fn worker_entrypoint(socket_path: &str) {
|
||||
// Spawn another thread for preparation.
|
||||
let prepare_fut = rt_handle
|
||||
.spawn_blocking(move || {
|
||||
let result = prepare_artifact(&code, executor_params);
|
||||
let result = prepare_artifact(pvf_with_params);
|
||||
|
||||
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -471,16 +468,15 @@ pub fn worker_entrypoint(socket_path: &str) {
|
||||
}
|
||||
|
||||
fn prepare_artifact(
|
||||
code: &[u8],
|
||||
executor_params: ExecutorParams,
|
||||
pvf_with_params: PvfWithExecutorParams,
|
||||
) -> Result<CompiledArtifact, PrepareError> {
|
||||
panic::catch_unwind(|| {
|
||||
let blob = match crate::executor_intf::prevalidate(code) {
|
||||
let blob = match crate::executor_intf::prevalidate(&pvf_with_params.code()) {
|
||||
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
|
||||
Ok(b) => b,
|
||||
};
|
||||
|
||||
match crate::executor_intf::prepare(blob, executor_params) {
|
||||
match crate::executor_intf::prepare(blob, &pvf_with_params.executor_params()) {
|
||||
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
|
||||
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
|
||||
}
|
||||
|
||||
@@ -15,81 +15,79 @@
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::artifacts::ArtifactId;
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
use polkadot_parachain::primitives::ValidationCodeHash;
|
||||
use polkadot_primitives::vstaging::ExecutorParams;
|
||||
use sp_core::blake2_256;
|
||||
use std::{fmt, sync::Arc};
|
||||
use std::{
|
||||
cmp::{Eq, PartialEq},
|
||||
fmt,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
/// A struct that carries code of a parachain validation function and its hash.
|
||||
/// A struct that carries code of a parachain validation function, its hash, and a corresponding
|
||||
/// set of executor parameters.
|
||||
///
|
||||
/// Should be cheap to clone.
|
||||
#[derive(Clone)]
|
||||
pub struct Pvf {
|
||||
#[derive(Clone, Encode, Decode)]
|
||||
pub struct PvfWithExecutorParams {
|
||||
pub(crate) code: Arc<Vec<u8>>,
|
||||
pub(crate) code_hash: ValidationCodeHash,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Pvf {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Pvf {{ code, code_hash: {:?} }}", self.code_hash)
|
||||
}
|
||||
}
|
||||
|
||||
impl Pvf {
|
||||
/// Returns an instance of the PVF out of the given PVF code.
|
||||
pub fn from_code(code: Vec<u8>) -> Self {
|
||||
let code = Arc::new(code);
|
||||
let code_hash = blake2_256(&code).into();
|
||||
Self { code, code_hash }
|
||||
}
|
||||
|
||||
/// Creates a new PVF which artifact id can be uniquely identified by the given number.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn from_discriminator(num: u32) -> Self {
|
||||
let descriminator_buf = num.to_le_bytes().to_vec();
|
||||
Pvf::from_code(descriminator_buf)
|
||||
}
|
||||
}
|
||||
|
||||
/// Coupling PVF code with executor params
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PvfWithExecutorParams {
|
||||
pvf: Pvf,
|
||||
executor_params: Arc<ExecutorParams>,
|
||||
pub(crate) executor_params: Arc<ExecutorParams>,
|
||||
}
|
||||
|
||||
impl PvfWithExecutorParams {
|
||||
/// Creates a new PVF-ExecutorParams pair structure
|
||||
pub fn new(pvf: Pvf, executor_params: ExecutorParams) -> Self {
|
||||
Self { pvf, executor_params: Arc::new(executor_params) }
|
||||
/// Returns an instance of the PVF out of the given PVF code and executor params.
|
||||
pub fn from_code(code: Vec<u8>, executor_params: ExecutorParams) -> Self {
|
||||
let code = Arc::new(code);
|
||||
let code_hash = blake2_256(&code).into();
|
||||
let executor_params = Arc::new(executor_params);
|
||||
Self { code, code_hash, executor_params }
|
||||
}
|
||||
|
||||
/// Returns artifact ID that corresponds to the PVF with given executor params
|
||||
pub(crate) fn as_artifact_id(&self) -> ArtifactId {
|
||||
ArtifactId::new(self.pvf.code_hash, self.executor_params.hash())
|
||||
ArtifactId::new(self.code_hash, self.executor_params.hash())
|
||||
}
|
||||
|
||||
/// Returns validation code hash for the PVF
|
||||
pub(crate) fn code_hash(&self) -> ValidationCodeHash {
|
||||
self.pvf.code_hash
|
||||
self.code_hash
|
||||
}
|
||||
|
||||
/// Returns PVF code
|
||||
pub(crate) fn code(&self) -> Arc<Vec<u8>> {
|
||||
self.pvf.code.clone()
|
||||
self.code.clone()
|
||||
}
|
||||
|
||||
/// Returns executor params
|
||||
pub(crate) fn executor_params(&self) -> ExecutorParams {
|
||||
(*self.executor_params).clone()
|
||||
pub(crate) fn executor_params(&self) -> Arc<ExecutorParams> {
|
||||
self.executor_params.clone()
|
||||
}
|
||||
|
||||
/// Creates a structure for tests
|
||||
#[cfg(test)]
|
||||
pub(crate) fn from_discriminator(num: u32) -> Self {
|
||||
Self {
|
||||
pvf: Pvf::from_discriminator(num),
|
||||
executor_params: Arc::new(ExecutorParams::default()),
|
||||
}
|
||||
let descriminator_buf = num.to_le_bytes().to_vec();
|
||||
Self::from_code(descriminator_buf, ExecutorParams::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for PvfWithExecutorParams {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Pvf {{ code, code_hash: {:?}, executor_params: {:?} }}",
|
||||
self.code_hash, self.executor_params
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for PvfWithExecutorParams {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.code_hash == other.code_hash &&
|
||||
self.executor_params.hash() == other.executor_params.hash()
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for PvfWithExecutorParams {}
|
||||
|
||||
@@ -37,7 +37,7 @@ pub fn validate_candidate(
|
||||
.expect("Decompressing code failed");
|
||||
|
||||
let blob = prevalidate(&code)?;
|
||||
let artifact = prepare(blob, ExecutorParams::default())?;
|
||||
let artifact = prepare(blob, &ExecutorParams::default())?;
|
||||
let tmpdir = tempfile::tempdir()?;
|
||||
let artifact_path = tmpdir.path().join("blob");
|
||||
std::fs::write(&artifact_path, &artifact)?;
|
||||
|
||||
Reference in New Issue
Block a user