mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 13:21:01 +00:00
PVF host prechecking support v2 (#4123)
* pvf host: store only compiled artifacts on disk * Correctly handle failed artifacts * Serialize result of PVF preparation uniquely * Set the artifact state depending on the result * Return the result of PVF preparation directly * Move PrepareError to the error module * Update doc comments * Update misleading comment * pvf host: turn off parallel compilation * pvf host: implement precheck requests * Fix warnings * Unnecessary clone * Add a note about timed out outcome * Revert the pool outcome handling behavior * Move the prepare result type into error mod * Test prepare done * fmt * Add an explanation to wasmtime config * Split pvf host test * Add precheck to dictionary Co-authored-by: Sergei Shulepov <sergei@parity.io>
This commit is contained in:
@@ -14,7 +14,7 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use crate::error::PrepareError;
|
use crate::{error::PrepareError, host::PrepareResultSender};
|
||||||
use always_assert::always;
|
use always_assert::always;
|
||||||
use async_std::path::{Path, PathBuf};
|
use async_std::path::{Path, PathBuf};
|
||||||
use parity_scale_codec::{Decode, Encode};
|
use parity_scale_codec::{Decode, Encode};
|
||||||
@@ -106,7 +106,7 @@ pub enum ArtifactState {
|
|||||||
last_time_needed: SystemTime,
|
last_time_needed: SystemTime,
|
||||||
},
|
},
|
||||||
/// A task to prepare this artifact is scheduled.
|
/// A task to prepare this artifact is scheduled.
|
||||||
Preparing,
|
Preparing { waiting_for_response: Vec<PrepareResultSender> },
|
||||||
/// The code couldn't be compiled due to an error. Such artifacts
|
/// The code couldn't be compiled due to an error. Such artifacts
|
||||||
/// never reach the executor and stay in the host's memory.
|
/// never reach the executor and stay in the host's memory.
|
||||||
FailedToProcess(PrepareError),
|
FailedToProcess(PrepareError),
|
||||||
@@ -145,9 +145,16 @@ impl Artifacts {
|
|||||||
///
|
///
|
||||||
/// This function must be used only for brand-new artifacts and should never be used for
|
/// This function must be used only for brand-new artifacts and should never be used for
|
||||||
/// replacing existing ones.
|
/// replacing existing ones.
|
||||||
pub fn insert_preparing(&mut self, artifact_id: ArtifactId) {
|
pub fn insert_preparing(
|
||||||
|
&mut self,
|
||||||
|
artifact_id: ArtifactId,
|
||||||
|
waiting_for_response: Vec<PrepareResultSender>,
|
||||||
|
) {
|
||||||
// See the precondition.
|
// See the precondition.
|
||||||
always!(self.artifacts.insert(artifact_id, ArtifactState::Preparing).is_none());
|
always!(self
|
||||||
|
.artifacts
|
||||||
|
.insert(artifact_id, ArtifactState::Preparing { waiting_for_response })
|
||||||
|
.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert an artifact with the given ID as "prepared".
|
/// Insert an artifact with the given ID as "prepared".
|
||||||
|
|||||||
@@ -16,6 +16,9 @@
|
|||||||
|
|
||||||
use parity_scale_codec::{Decode, Encode};
|
use parity_scale_codec::{Decode, Encode};
|
||||||
|
|
||||||
|
/// Result of PVF preparation performed by the validation host.
|
||||||
|
pub type PrepareResult = Result<(), PrepareError>;
|
||||||
|
|
||||||
/// An error that occurred during the prepare part of the PVF pipeline.
|
/// An error that occurred during the prepare part of the PVF pipeline.
|
||||||
#[derive(Debug, Clone, Encode, Decode)]
|
#[derive(Debug, Clone, Encode, Decode)]
|
||||||
pub enum PrepareError {
|
pub enum PrepareError {
|
||||||
@@ -23,6 +26,8 @@ pub enum PrepareError {
|
|||||||
Prevalidation(String),
|
Prevalidation(String),
|
||||||
/// Compilation failed for the given PVF.
|
/// Compilation failed for the given PVF.
|
||||||
Preparation(String),
|
Preparation(String),
|
||||||
|
/// Failed to prepare the PVF due to the time limit.
|
||||||
|
TimedOut,
|
||||||
/// This state indicates that the process assigned to prepare the artifact wasn't responsible
|
/// This state indicates that the process assigned to prepare the artifact wasn't responsible
|
||||||
/// or were killed. This state is reported by the validation host (not by the worker).
|
/// or were killed. This state is reported by the validation host (not by the worker).
|
||||||
DidNotMakeIt,
|
DidNotMakeIt,
|
||||||
@@ -74,7 +79,8 @@ impl From<PrepareError> for ValidationError {
|
|||||||
let error_str = match error {
|
let error_str = match error {
|
||||||
PrepareError::Prevalidation(err) => err,
|
PrepareError::Prevalidation(err) => err,
|
||||||
PrepareError::Preparation(err) => err,
|
PrepareError::Preparation(err) => err,
|
||||||
PrepareError::DidNotMakeIt => "preparation timeout".to_owned(),
|
PrepareError::TimedOut => "preparation timeout".to_owned(),
|
||||||
|
PrepareError::DidNotMakeIt => "communication error".to_owned(),
|
||||||
};
|
};
|
||||||
ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(error_str))
|
ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(error_str))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,7 +68,13 @@ const CONFIG: Config = Config {
|
|||||||
native_stack_max: 256 * 1024 * 1024,
|
native_stack_max: 256 * 1024 * 1024,
|
||||||
}),
|
}),
|
||||||
canonicalize_nans: true,
|
canonicalize_nans: true,
|
||||||
parallel_compilation: true,
|
// Rationale for turning the multi-threaded compilation off is to make the preparation time
|
||||||
|
// easily reproducible and as deterministic as possible.
|
||||||
|
//
|
||||||
|
// Currently the prepare queue doesn't distinguish between precheck and prepare requests.
|
||||||
|
// On the one hand, it simplifies the code, on the other, however, slows down compile times
|
||||||
|
// for execute requests. This behavior may change in future.
|
||||||
|
parallel_compilation: false,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ use crate::{
|
|||||||
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
|
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
|
||||||
execute,
|
execute,
|
||||||
metrics::Metrics,
|
metrics::Metrics,
|
||||||
prepare, Priority, Pvf, ValidationError, LOG_TARGET,
|
prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET,
|
||||||
};
|
};
|
||||||
use always_assert::never;
|
use always_assert::never;
|
||||||
use async_std::path::{Path, PathBuf};
|
use async_std::path::{Path, PathBuf};
|
||||||
@@ -41,6 +41,9 @@ use std::{
|
|||||||
/// An alias to not spell the type for the oneshot sender for the PVF execution result.
|
/// An alias to not spell the type for the oneshot sender for the PVF execution result.
|
||||||
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
|
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
|
||||||
|
|
||||||
|
/// Transmission end used for sending the PVF preparation result.
|
||||||
|
pub(crate) type PrepareResultSender = oneshot::Sender<PrepareResult>;
|
||||||
|
|
||||||
/// A handle to the async process serving the validation host requests.
|
/// A handle to the async process serving the validation host requests.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ValidationHost {
|
pub struct ValidationHost {
|
||||||
@@ -48,6 +51,24 @@ pub struct ValidationHost {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ValidationHost {
|
impl ValidationHost {
|
||||||
|
/// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time limit.
|
||||||
|
/// The result of execution will be sent to the provided result sender.
|
||||||
|
///
|
||||||
|
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
|
||||||
|
/// situations this function should return immediately.
|
||||||
|
///
|
||||||
|
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
|
||||||
|
pub async fn precheck_pvf(
|
||||||
|
&mut self,
|
||||||
|
pvf: Pvf,
|
||||||
|
result_tx: PrepareResultSender,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
self.to_host_tx
|
||||||
|
.send(ToHost::PrecheckPvf { pvf, result_tx })
|
||||||
|
.await
|
||||||
|
.map_err(|_| "the inner loop hung up".to_string())
|
||||||
|
}
|
||||||
|
|
||||||
/// Execute PVF with the given code, execution timeout, parameters and priority.
|
/// Execute PVF with the given code, execution timeout, parameters and priority.
|
||||||
/// The result of execution will be sent to the provided result sender.
|
/// The result of execution will be sent to the provided result sender.
|
||||||
///
|
///
|
||||||
@@ -84,6 +105,10 @@ impl ValidationHost {
|
|||||||
}
|
}
|
||||||
|
|
||||||
enum ToHost {
|
enum ToHost {
|
||||||
|
PrecheckPvf {
|
||||||
|
pvf: Pvf,
|
||||||
|
result_tx: PrepareResultSender,
|
||||||
|
},
|
||||||
ExecutePvf {
|
ExecutePvf {
|
||||||
pvf: Pvf,
|
pvf: Pvf,
|
||||||
execution_timeout: Duration,
|
execution_timeout: Duration,
|
||||||
@@ -376,6 +401,9 @@ async fn handle_to_host(
|
|||||||
to_host: ToHost,
|
to_host: ToHost,
|
||||||
) -> Result<(), Fatal> {
|
) -> Result<(), Fatal> {
|
||||||
match to_host {
|
match to_host {
|
||||||
|
ToHost::PrecheckPvf { pvf, result_tx } => {
|
||||||
|
handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
|
||||||
|
},
|
||||||
ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => {
|
ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => {
|
||||||
handle_execute_pvf(
|
handle_execute_pvf(
|
||||||
cache_path,
|
cache_path,
|
||||||
@@ -399,6 +427,34 @@ async fn handle_to_host(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_precheck_pvf(
|
||||||
|
artifacts: &mut Artifacts,
|
||||||
|
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
|
||||||
|
pvf: Pvf,
|
||||||
|
result_sender: PrepareResultSender,
|
||||||
|
) -> Result<(), Fatal> {
|
||||||
|
let artifact_id = pvf.as_artifact_id();
|
||||||
|
|
||||||
|
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
|
||||||
|
match state {
|
||||||
|
ArtifactState::Prepared { last_time_needed } => {
|
||||||
|
*last_time_needed = SystemTime::now();
|
||||||
|
let _ = result_sender.send(Ok(()));
|
||||||
|
},
|
||||||
|
ArtifactState::Preparing { waiting_for_response } =>
|
||||||
|
waiting_for_response.push(result_sender),
|
||||||
|
ArtifactState::FailedToProcess(result) => {
|
||||||
|
let _ = result_sender.send(PrepareResult::Err(result.clone()));
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
artifacts.insert_preparing(artifact_id, vec![result_sender]);
|
||||||
|
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_execute_pvf(
|
async fn handle_execute_pvf(
|
||||||
cache_path: &Path,
|
cache_path: &Path,
|
||||||
artifacts: &mut Artifacts,
|
artifacts: &mut Artifacts,
|
||||||
@@ -429,7 +485,7 @@ async fn handle_execute_pvf(
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
},
|
},
|
||||||
ArtifactState::Preparing => {
|
ArtifactState::Preparing { waiting_for_response: _ } => {
|
||||||
send_prepare(
|
send_prepare(
|
||||||
prepare_queue,
|
prepare_queue,
|
||||||
prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() },
|
prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() },
|
||||||
@@ -445,7 +501,7 @@ async fn handle_execute_pvf(
|
|||||||
} else {
|
} else {
|
||||||
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
|
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
|
||||||
//
|
//
|
||||||
artifacts.insert_preparing(artifact_id.clone());
|
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
|
||||||
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
|
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
|
||||||
|
|
||||||
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
|
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
|
||||||
@@ -468,7 +524,7 @@ async fn handle_heads_up(
|
|||||||
ArtifactState::Prepared { last_time_needed, .. } => {
|
ArtifactState::Prepared { last_time_needed, .. } => {
|
||||||
*last_time_needed = now;
|
*last_time_needed = now;
|
||||||
},
|
},
|
||||||
ArtifactState::Preparing => {
|
ArtifactState::Preparing { waiting_for_response: _ } => {
|
||||||
// Already preparing. We don't need to send a priority amend either because
|
// Already preparing. We don't need to send a priority amend either because
|
||||||
// it can't get any lower than the background.
|
// it can't get any lower than the background.
|
||||||
},
|
},
|
||||||
@@ -476,7 +532,7 @@ async fn handle_heads_up(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// The artifact is unknown: register it and put a background job into the prepare queue.
|
// The artifact is unknown: register it and put a background job into the prepare queue.
|
||||||
artifacts.insert_preparing(artifact_id.clone());
|
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
|
||||||
|
|
||||||
send_prepare(
|
send_prepare(
|
||||||
prepare_queue,
|
prepare_queue,
|
||||||
@@ -524,9 +580,15 @@ async fn handle_prepare_done(
|
|||||||
never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
|
never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
|
||||||
return Ok(())
|
return Ok(())
|
||||||
},
|
},
|
||||||
Some(state @ ArtifactState::Preparing) => state,
|
Some(state @ ArtifactState::Preparing { waiting_for_response: _ }) => state,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let ArtifactState::Preparing { waiting_for_response } = state {
|
||||||
|
for result_sender in waiting_for_response.drain(..) {
|
||||||
|
let _ = result_sender.send(result.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// It's finally time to dispatch all the execution requests that were waiting for this artifact
|
// It's finally time to dispatch all the execution requests that were waiting for this artifact
|
||||||
// to be prepared.
|
// to be prepared.
|
||||||
let pending_requests = awaiting_prepare.take(&artifact_id);
|
let pending_requests = awaiting_prepare.take(&artifact_id);
|
||||||
@@ -634,6 +696,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::{InvalidCandidate, PrepareError};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
|
|
||||||
@@ -904,8 +967,6 @@ mod tests {
|
|||||||
|
|
||||||
#[async_std::test]
|
#[async_std::test]
|
||||||
async fn execute_pvf_requests() {
|
async fn execute_pvf_requests() {
|
||||||
use crate::error::InvalidCandidate;
|
|
||||||
|
|
||||||
let mut test = Builder::default().build();
|
let mut test = Builder::default().build();
|
||||||
let mut host = test.host_handle();
|
let mut host = test.host_handle();
|
||||||
|
|
||||||
@@ -1002,6 +1063,140 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_std::test]
|
||||||
|
async fn precheck_pvf() {
|
||||||
|
let mut test = Builder::default().build();
|
||||||
|
let mut host = test.host_handle();
|
||||||
|
|
||||||
|
// First, test a simple precheck request.
|
||||||
|
let (result_tx, result_rx) = oneshot::channel();
|
||||||
|
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
|
||||||
|
|
||||||
|
// The queue received the prepare request.
|
||||||
|
assert_matches!(
|
||||||
|
test.poll_and_recv_to_prepare_queue().await,
|
||||||
|
prepare::ToQueue::Enqueue { .. }
|
||||||
|
);
|
||||||
|
// Send `Ok` right away and poll the host.
|
||||||
|
test.from_prepare_queue_tx
|
||||||
|
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// No pending execute requests.
|
||||||
|
test.poll_ensure_to_execute_queue_is_empty().await;
|
||||||
|
// Received the precheck result.
|
||||||
|
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(()));
|
||||||
|
|
||||||
|
// Send multiple requests for the same pvf.
|
||||||
|
let mut precheck_receivers = Vec::new();
|
||||||
|
for _ in 0..3 {
|
||||||
|
let (result_tx, result_rx) = oneshot::channel();
|
||||||
|
host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
|
||||||
|
precheck_receivers.push(result_rx);
|
||||||
|
}
|
||||||
|
// Received prepare request.
|
||||||
|
assert_matches!(
|
||||||
|
test.poll_and_recv_to_prepare_queue().await,
|
||||||
|
prepare::ToQueue::Enqueue { .. }
|
||||||
|
);
|
||||||
|
test.from_prepare_queue_tx
|
||||||
|
.send(prepare::FromQueue {
|
||||||
|
artifact_id: artifact_id(2),
|
||||||
|
result: Err(PrepareError::TimedOut),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
test.poll_ensure_to_execute_queue_is_empty().await;
|
||||||
|
for result_rx in precheck_receivers {
|
||||||
|
assert_matches!(
|
||||||
|
result_rx.now_or_never().unwrap().unwrap(),
|
||||||
|
Err(PrepareError::TimedOut)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_std::test]
|
||||||
|
async fn test_prepare_done() {
|
||||||
|
let mut test = Builder::default().build();
|
||||||
|
let mut host = test.host_handle();
|
||||||
|
|
||||||
|
// Test mixed cases of receiving execute and precheck requests
|
||||||
|
// for the same pvf.
|
||||||
|
|
||||||
|
// Send PVF for the execution and request the prechecking for it.
|
||||||
|
let (result_tx, result_rx_execute) = oneshot::channel();
|
||||||
|
host.execute_pvf(
|
||||||
|
Pvf::from_discriminator(1),
|
||||||
|
TEST_EXECUTION_TIMEOUT,
|
||||||
|
b"pvf2".to_vec(),
|
||||||
|
Priority::Critical,
|
||||||
|
result_tx,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
test.poll_and_recv_to_prepare_queue().await,
|
||||||
|
prepare::ToQueue::Enqueue { .. }
|
||||||
|
);
|
||||||
|
|
||||||
|
let (result_tx, result_rx) = oneshot::channel();
|
||||||
|
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
|
||||||
|
|
||||||
|
// Suppose the preparation failed, the execution queue is empty and both
|
||||||
|
// "clients" receive their results.
|
||||||
|
test.from_prepare_queue_tx
|
||||||
|
.send(prepare::FromQueue {
|
||||||
|
artifact_id: artifact_id(1),
|
||||||
|
result: Err(PrepareError::TimedOut),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
test.poll_ensure_to_execute_queue_is_empty().await;
|
||||||
|
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut));
|
||||||
|
assert_matches!(
|
||||||
|
result_rx_execute.now_or_never().unwrap().unwrap(),
|
||||||
|
Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(_)))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Reversed case: first send multiple precheck requests, then ask for an execution.
|
||||||
|
let mut precheck_receivers = Vec::new();
|
||||||
|
for _ in 0..3 {
|
||||||
|
let (result_tx, result_rx) = oneshot::channel();
|
||||||
|
host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
|
||||||
|
precheck_receivers.push(result_rx);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (result_tx, _result_rx_execute) = oneshot::channel();
|
||||||
|
host.execute_pvf(
|
||||||
|
Pvf::from_discriminator(2),
|
||||||
|
TEST_EXECUTION_TIMEOUT,
|
||||||
|
b"pvf2".to_vec(),
|
||||||
|
Priority::Critical,
|
||||||
|
result_tx,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// Received prepare request.
|
||||||
|
assert_matches!(
|
||||||
|
test.poll_and_recv_to_prepare_queue().await,
|
||||||
|
prepare::ToQueue::Enqueue { .. }
|
||||||
|
);
|
||||||
|
test.from_prepare_queue_tx
|
||||||
|
.send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) })
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// The execute queue receives new request, preckecking is finished and we can
|
||||||
|
// fetch results.
|
||||||
|
assert_matches!(
|
||||||
|
test.poll_and_recv_to_execute_queue().await,
|
||||||
|
execute::ToQueue::Enqueue { .. }
|
||||||
|
);
|
||||||
|
for result_rx in precheck_receivers {
|
||||||
|
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_std::test]
|
#[async_std::test]
|
||||||
async fn cancellation() {
|
async fn cancellation() {
|
||||||
let mut test = Builder::default().build();
|
let mut test = Builder::default().build();
|
||||||
|
|||||||
@@ -92,7 +92,7 @@ pub mod testing;
|
|||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub use sp_tracing;
|
pub use sp_tracing;
|
||||||
|
|
||||||
pub use error::{InvalidCandidate, ValidationError};
|
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
|
||||||
pub use priority::Priority;
|
pub use priority::Priority;
|
||||||
pub use pvf::Pvf;
|
pub use pvf::Pvf;
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
use super::worker::{self, Outcome};
|
use super::worker::{self, Outcome};
|
||||||
use crate::{
|
use crate::{
|
||||||
error::PrepareError,
|
error::{PrepareError, PrepareResult},
|
||||||
metrics::Metrics,
|
metrics::Metrics,
|
||||||
worker_common::{IdleWorker, WorkerHandle},
|
worker_common::{IdleWorker, WorkerHandle},
|
||||||
LOG_TARGET,
|
LOG_TARGET,
|
||||||
@@ -87,7 +87,7 @@ pub enum FromPool {
|
|||||||
rip: bool,
|
rip: bool,
|
||||||
/// [`Ok`] indicates that compiled artifact is successfully stored on disk.
|
/// [`Ok`] indicates that compiled artifact is successfully stored on disk.
|
||||||
/// Otherwise, an [error](PrepareError) is supplied.
|
/// Otherwise, an [error](PrepareError) is supplied.
|
||||||
result: Result<(), PrepareError>,
|
result: PrepareResult,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// The given worker ceased to exist.
|
/// The given worker ceased to exist.
|
||||||
@@ -341,6 +341,20 @@ fn handle_mux(
|
|||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
Outcome::TimedOut => {
|
||||||
|
if attempt_retire(metrics, spawned, worker) {
|
||||||
|
reply(
|
||||||
|
from_pool,
|
||||||
|
FromPool::Concluded {
|
||||||
|
worker,
|
||||||
|
rip: true,
|
||||||
|
result: Err(PrepareError::TimedOut),
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,9 +17,7 @@
|
|||||||
//! A queue that handles requests for PVF preparation.
|
//! A queue that handles requests for PVF preparation.
|
||||||
|
|
||||||
use super::pool::{self, Worker};
|
use super::pool::{self, Worker};
|
||||||
use crate::{
|
use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET};
|
||||||
artifacts::ArtifactId, error::PrepareError, metrics::Metrics, Priority, Pvf, LOG_TARGET,
|
|
||||||
};
|
|
||||||
use always_assert::{always, never};
|
use always_assert::{always, never};
|
||||||
use async_std::path::PathBuf;
|
use async_std::path::PathBuf;
|
||||||
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
|
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
|
||||||
@@ -44,8 +42,9 @@ pub struct FromQueue {
|
|||||||
/// Identifier of an artifact.
|
/// Identifier of an artifact.
|
||||||
pub(crate) artifact_id: ArtifactId,
|
pub(crate) artifact_id: ArtifactId,
|
||||||
/// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact
|
/// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact
|
||||||
/// is successfully stored on disk. Otherwise, an [error](PrepareError) is supplied.
|
/// is successfully stored on disk. Otherwise, an [error](crate::error::PrepareError)
|
||||||
pub(crate) result: Result<(), PrepareError>,
|
/// is supplied.
|
||||||
|
pub(crate) result: PrepareResult,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@@ -327,7 +326,7 @@ async fn handle_worker_concluded(
|
|||||||
queue: &mut Queue,
|
queue: &mut Queue,
|
||||||
worker: Worker,
|
worker: Worker,
|
||||||
rip: bool,
|
rip: bool,
|
||||||
result: Result<(), PrepareError>,
|
result: PrepareResult,
|
||||||
) -> Result<(), Fatal> {
|
) -> Result<(), Fatal> {
|
||||||
queue.metrics.prepare_concluded();
|
queue.metrics.prepare_concluded();
|
||||||
|
|
||||||
@@ -529,6 +528,7 @@ pub fn start(
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::error::PrepareError;
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use futures::{future::BoxFuture, FutureExt};
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
use slotmap::SlotMap;
|
use slotmap::SlotMap;
|
||||||
|
|||||||
@@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
artifacts::CompiledArtifact,
|
artifacts::CompiledArtifact,
|
||||||
error::PrepareError,
|
error::{PrepareError, PrepareResult},
|
||||||
worker_common::{
|
worker_common::{
|
||||||
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
|
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
|
||||||
tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
|
tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
|
||||||
@@ -28,8 +28,6 @@ use async_std::{
|
|||||||
os::unix::net::UnixStream,
|
os::unix::net::UnixStream,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
};
|
};
|
||||||
use futures::FutureExt as _;
|
|
||||||
use futures_timer::Delay;
|
|
||||||
use parity_scale_codec::{Decode, Encode};
|
use parity_scale_codec::{Decode, Encode};
|
||||||
use sp_core::hexdisplay::HexDisplay;
|
use sp_core::hexdisplay::HexDisplay;
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
@@ -51,15 +49,15 @@ pub async fn spawn(
|
|||||||
|
|
||||||
pub enum Outcome {
|
pub enum Outcome {
|
||||||
/// The worker has finished the work assigned to it.
|
/// The worker has finished the work assigned to it.
|
||||||
Concluded { worker: IdleWorker, result: Result<(), PrepareError> },
|
Concluded { worker: IdleWorker, result: PrepareResult },
|
||||||
/// The host tried to reach the worker but failed. This is most likely because the worked was
|
/// The host tried to reach the worker but failed. This is most likely because the worked was
|
||||||
/// killed by the system.
|
/// killed by the system.
|
||||||
Unreachable,
|
Unreachable,
|
||||||
/// The execution was interrupted abruptly and the worker is not available anymore. For example,
|
/// The worker failed to finish the job until the given deadline.
|
||||||
/// this could've happen because the worker hadn't finished the work until the given deadline.
|
|
||||||
///
|
///
|
||||||
/// Note that in this case the artifact file is written (unless there was an error writing the
|
/// The worker is no longer usable and should be killed.
|
||||||
/// the artifact).
|
TimedOut,
|
||||||
|
/// The execution was interrupted abruptly and the worker is not available anymore.
|
||||||
///
|
///
|
||||||
/// This doesn't return an idle worker instance, thus this worker is no longer usable.
|
/// This doesn't return an idle worker instance, thus this worker is no longer usable.
|
||||||
DidNotMakeIt,
|
DidNotMakeIt,
|
||||||
@@ -106,77 +104,78 @@ pub async fn start_work(
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum Selected {
|
enum Selected {
|
||||||
Done(Result<(), PrepareError>),
|
Done(PrepareResult),
|
||||||
IoErr,
|
IoErr,
|
||||||
Deadline,
|
Deadline,
|
||||||
}
|
}
|
||||||
|
|
||||||
let selected = futures::select! {
|
let selected =
|
||||||
res = framed_recv(&mut stream).fuse() => {
|
match async_std::future::timeout(COMPILATION_TIMEOUT, framed_recv(&mut stream)).await {
|
||||||
match res {
|
Ok(Ok(response_bytes)) => {
|
||||||
Ok(response_bytes) => {
|
// Received bytes from worker within the time limit.
|
||||||
// By convention we expect encoded `Result<(), PrepareError>`.
|
// By convention we expect encoded `PrepareResult`.
|
||||||
if let Ok(result) =
|
if let Ok(result) = PrepareResult::decode(&mut response_bytes.as_slice()) {
|
||||||
<Result<(), PrepareError>>::decode(&mut response_bytes.clone().as_slice())
|
if result.is_ok() {
|
||||||
{
|
tracing::debug!(
|
||||||
if result.is_ok() {
|
|
||||||
tracing::debug!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
worker_pid = %pid,
|
|
||||||
"promoting WIP artifact {} to {}",
|
|
||||||
tmp_file.display(),
|
|
||||||
artifact_path.display(),
|
|
||||||
);
|
|
||||||
|
|
||||||
async_std::fs::rename(&tmp_file, &artifact_path)
|
|
||||||
.await
|
|
||||||
.map(|_| Selected::Done(result))
|
|
||||||
.unwrap_or_else(|err| {
|
|
||||||
tracing::warn!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
worker_pid = %pid,
|
|
||||||
"failed to rename the artifact from {} to {}: {:?}",
|
|
||||||
tmp_file.display(),
|
|
||||||
artifact_path.display(),
|
|
||||||
err,
|
|
||||||
);
|
|
||||||
Selected::IoErr
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
Selected::Done(result)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// We received invalid bytes from the worker.
|
|
||||||
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
|
|
||||||
tracing::warn!(
|
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
worker_pid = %pid,
|
worker_pid = %pid,
|
||||||
"received unexpected response from the prepare worker: {}",
|
"promoting WIP artifact {} to {}",
|
||||||
HexDisplay::from(&bound_bytes),
|
tmp_file.display(),
|
||||||
|
artifact_path.display(),
|
||||||
);
|
);
|
||||||
Selected::IoErr
|
|
||||||
|
async_std::fs::rename(&tmp_file, &artifact_path)
|
||||||
|
.await
|
||||||
|
.map(|_| Selected::Done(result))
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
tracing::warn!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
worker_pid = %pid,
|
||||||
|
"failed to rename the artifact from {} to {}: {:?}",
|
||||||
|
tmp_file.display(),
|
||||||
|
artifact_path.display(),
|
||||||
|
err,
|
||||||
|
);
|
||||||
|
Selected::IoErr
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Selected::Done(result)
|
||||||
}
|
}
|
||||||
},
|
} else {
|
||||||
Err(err) => {
|
// We received invalid bytes from the worker.
|
||||||
|
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
worker_pid = %pid,
|
worker_pid = %pid,
|
||||||
"failed to recv a prepare response: {:?}",
|
"received unexpected response from the prepare worker: {}",
|
||||||
err,
|
HexDisplay::from(&bound_bytes),
|
||||||
);
|
);
|
||||||
Selected::IoErr
|
Selected::IoErr
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
},
|
Ok(Err(err)) => {
|
||||||
_ = Delay::new(COMPILATION_TIMEOUT).fuse() => Selected::Deadline,
|
// Communication error within the time limit.
|
||||||
};
|
tracing::warn!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
worker_pid = %pid,
|
||||||
|
"failed to recv a prepare response: {:?}",
|
||||||
|
err,
|
||||||
|
);
|
||||||
|
Selected::IoErr
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
// Timed out.
|
||||||
|
Selected::Deadline
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
match selected {
|
match selected {
|
||||||
Selected::Done(result) => {
|
Selected::Done(result) => {
|
||||||
renice(pid, NICENESS_FOREGROUND);
|
renice(pid, NICENESS_FOREGROUND);
|
||||||
Outcome::Concluded { worker: IdleWorker { stream, pid }, result }
|
Outcome::Concluded { worker: IdleWorker { stream, pid }, result }
|
||||||
},
|
},
|
||||||
Selected::IoErr | Selected::Deadline => Outcome::DidNotMakeIt,
|
Selected::Deadline => Outcome::TimedOut,
|
||||||
|
Selected::IoErr => Outcome::DidNotMakeIt,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -194,6 +194,7 @@ PoS/MS
|
|||||||
PoV/MS
|
PoV/MS
|
||||||
PoW/MS
|
PoW/MS
|
||||||
PR
|
PR
|
||||||
|
precheck
|
||||||
preconfigured
|
preconfigured
|
||||||
preimage/MS
|
preimage/MS
|
||||||
preopen
|
preopen
|
||||||
|
|||||||
Reference in New Issue
Block a user