Preserve artifact cache unless stale (#1918)

Co-authored-by: Marcin S <marcin@realemail.net>
This commit is contained in:
Julian Eager
2023-11-20 02:04:22 +08:00
committed by GitHub
parent 794ee98049
commit b5858936e1
22 changed files with 536 additions and 245 deletions
+61 -64
View File
@@ -32,14 +32,15 @@ use futures::{
Future, FutureExt, SinkExt, StreamExt,
};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
error::{PrecheckResult, PrepareError},
prepare::PrepareSuccess,
pvf::PvfPrepData,
};
use polkadot_node_subsystem::SubsystemResult;
use polkadot_parachain_primitives::primitives::ValidationResult;
use std::{
collections::HashMap,
path::{Path, PathBuf},
path::PathBuf,
time::{Duration, SystemTime},
};
@@ -63,7 +64,7 @@ pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
/// Transmission end used for sending the PVF preparation result.
pub(crate) type PrepareResultSender = oneshot::Sender<PrepareResult>;
pub(crate) type PrecheckResultSender = oneshot::Sender<PrecheckResult>;
/// A handle to the async process serving the validation host requests.
#[derive(Clone)]
@@ -83,7 +84,7 @@ impl ValidationHost {
pub async fn precheck_pvf(
&mut self,
pvf: PvfPrepData,
result_tx: PrepareResultSender,
result_tx: PrecheckResultSender,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::PrecheckPvf { pvf, result_tx })
@@ -133,7 +134,7 @@ impl ValidationHost {
}
enum ToHost {
PrecheckPvf { pvf: PvfPrepData, result_tx: PrepareResultSender },
PrecheckPvf { pvf: PvfPrepData, result_tx: PrecheckResultSender },
ExecutePvf(ExecutePvfInputs),
HeadsUp { active_pvfs: Vec<PvfPrepData> },
}
@@ -249,10 +250,9 @@ pub async fn start(
let run_sweeper = sweeper_task(to_sweeper_rx);
let run_host = async move {
let artifacts = Artifacts::new(&config.cache_path).await;
let artifacts = Artifacts::new_and_prune(&config.cache_path).await;
run(Inner {
cache_path: config.cache_path,
cleanup_pulse_interval: Duration::from_secs(3600),
artifact_ttl: Duration::from_secs(3600 * 24),
artifacts,
@@ -296,7 +296,6 @@ impl AwaitingPrepare {
}
struct Inner {
cache_path: PathBuf,
cleanup_pulse_interval: Duration,
artifact_ttl: Duration,
artifacts: Artifacts,
@@ -317,7 +316,6 @@ struct Fatal;
async fn run(
Inner {
cache_path,
cleanup_pulse_interval,
artifact_ttl,
mut artifacts,
@@ -361,7 +359,6 @@ async fn run(
// will notice it.
break_if_fatal!(handle_cleanup_pulse(
&cache_path,
&mut to_sweeper_tx,
&mut artifacts,
artifact_ttl,
@@ -380,7 +377,6 @@ async fn run(
// If the artifact failed before, it could be re-scheduled for preparation here if
// the preparation failure cooldown has elapsed.
break_if_fatal!(handle_to_host(
&cache_path,
&mut artifacts,
&mut to_prepare_queue_tx,
&mut to_execute_queue_tx,
@@ -402,7 +398,6 @@ async fn run(
// We could be eager in terms of reporting and plumb the result from the preparation
// worker but we don't for the sake of simplicity.
break_if_fatal!(handle_prepare_done(
&cache_path,
&mut artifacts,
&mut to_execute_queue_tx,
&mut awaiting_prepare,
@@ -414,7 +409,6 @@ async fn run(
}
async fn handle_to_host(
cache_path: &Path,
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
@@ -426,15 +420,8 @@ async fn handle_to_host(
handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
},
ToHost::ExecutePvf(inputs) => {
handle_execute_pvf(
cache_path,
artifacts,
prepare_queue,
execute_queue,
awaiting_prepare,
inputs,
)
.await?;
handle_execute_pvf(artifacts, prepare_queue, execute_queue, awaiting_prepare, inputs)
.await?;
},
ToHost::HeadsUp { active_pvfs } =>
handle_heads_up(artifacts, prepare_queue, active_pvfs).await?,
@@ -454,21 +441,21 @@ async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
pvf: PvfPrepData,
result_sender: PrepareResultSender,
result_sender: PrecheckResultSender,
) -> Result<(), Fatal> {
let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed, prepare_stats } => {
ArtifactState::Prepared { last_time_needed, .. } => {
*last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(prepare_stats.clone()));
let _ = result_sender.send(Ok(()));
},
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender),
ArtifactState::FailedToProcess { error, .. } => {
// Do not retry an artifact that previously failed preparation.
let _ = result_sender.send(PrepareResult::Err(error.clone()));
let _ = result_sender.send(PrecheckResult::Err(error.clone()));
},
}
} else {
@@ -491,7 +478,6 @@ async fn handle_precheck_pvf(
/// When preparing for execution, we use a more lenient timeout ([`LENIENT_PREPARATION_TIMEOUT`])
/// than when prechecking.
async fn handle_execute_pvf(
cache_path: &Path,
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
@@ -504,8 +490,8 @@ async fn handle_execute_pvf(
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed, .. } => {
let file_metadata = std::fs::metadata(artifact_id.path(cache_path));
ArtifactState::Prepared { ref path, last_time_needed, .. } => {
let file_metadata = std::fs::metadata(path);
if file_metadata.is_ok() {
*last_time_needed = SystemTime::now();
@@ -514,7 +500,7 @@ async fn handle_execute_pvf(
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id, cache_path),
artifact: ArtifactPathId::new(artifact_id, path),
pending_execution_request: PendingExecutionRequest {
exec_timeout,
params,
@@ -677,7 +663,6 @@ async fn handle_heads_up(
}
async fn handle_prepare_done(
cache_path: &Path,
artifacts: &mut Artifacts,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
@@ -718,7 +703,8 @@ async fn handle_prepare_done(
state
{
for result_sender in waiting_for_response.drain(..) {
let _ = result_sender.send(result.clone());
let result = result.clone().map(|_| ());
let _ = result_sender.send(result);
}
num_failures
} else {
@@ -738,16 +724,18 @@ async fn handle_prepare_done(
continue
}
// Don't send failed artifacts to the execution's queue.
if let Err(ref error) = result {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
continue
}
let path = match &result {
Ok(success) => success.path.clone(),
Err(error) => {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
continue
},
};
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id.clone(), cache_path),
artifact: ArtifactPathId::new(artifact_id.clone(), &path),
pending_execution_request: PendingExecutionRequest {
exec_timeout,
params,
@@ -760,8 +748,8 @@ async fn handle_prepare_done(
}
*state = match result {
Ok(prepare_stats) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), prepare_stats },
Ok(PrepareSuccess { path, stats: prepare_stats }) =>
ArtifactState::Prepared { path, last_time_needed: SystemTime::now(), prepare_stats },
Err(error) => {
let last_time_failed = SystemTime::now();
let num_failures = *num_failures + 1;
@@ -814,7 +802,6 @@ async fn enqueue_prepare_for_execute(
}
async fn handle_cleanup_pulse(
cache_path: &Path,
sweeper_tx: &mut mpsc::Sender<PathBuf>,
artifacts: &mut Artifacts,
artifact_ttl: Duration,
@@ -825,14 +812,13 @@ async fn handle_cleanup_pulse(
"PVF pruning: {} artifacts reached their end of life",
to_remove.len(),
);
for artifact_id in to_remove {
for (artifact_id, path) in to_remove {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
"pruning artifact",
);
let artifact_path = artifact_id.path(cache_path);
sweeper_tx.send(artifact_path).await.map_err(|_| Fatal)?;
sweeper_tx.send(path).await.map_err(|_| Fatal)?;
}
Ok(())
@@ -890,7 +876,11 @@ pub(crate) mod tests {
use crate::InvalidCandidate;
use assert_matches::assert_matches;
use futures::future::BoxFuture;
use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats};
use polkadot_node_core_pvf_common::{
error::PrepareError,
prepare::{PrepareStats, PrepareSuccess},
};
use sp_core::hexdisplay::AsBytesRef;
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
@@ -910,12 +900,16 @@ pub(crate) mod tests {
}
/// Creates a new PVF which artifact id can be uniquely identified by the given number.
fn artifact_id(descriminator: u32) -> ArtifactId {
ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(descriminator))
fn artifact_id(discriminator: u32) -> ArtifactId {
ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(discriminator))
}
fn artifact_path(descriminator: u32) -> PathBuf {
artifact_id(descriminator).path(&PathBuf::from(std::env::temp_dir())).to_owned()
fn artifact_path(discriminator: u32) -> PathBuf {
let pvf = PvfPrepData::from_discriminator(discriminator);
let checksum = blake3::hash(pvf.code().as_bytes_ref());
artifact_id(discriminator)
.path(&PathBuf::from(std::env::temp_dir()), checksum.to_hex().as_str())
.to_owned()
}
struct Builder {
@@ -953,8 +947,6 @@ pub(crate) mod tests {
impl Test {
fn new(Builder { cleanup_pulse_interval, artifact_ttl, artifacts }: Builder) -> Self {
let cache_path = PathBuf::from(std::env::temp_dir());
let (to_host_tx, to_host_rx) = mpsc::channel(10);
let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
@@ -962,7 +954,6 @@ pub(crate) mod tests {
let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10);
let run = run(Inner {
cache_path,
cleanup_pulse_interval,
artifact_ttl,
artifacts,
@@ -1111,12 +1102,18 @@ pub(crate) mod tests {
let mut builder = Builder::default();
builder.cleanup_pulse_interval = Duration::from_millis(100);
builder.artifact_ttl = Duration::from_millis(500);
builder
.artifacts
.insert_prepared(artifact_id(1), mock_now, PrepareStats::default());
builder
.artifacts
.insert_prepared(artifact_id(2), mock_now, PrepareStats::default());
builder.artifacts.insert_prepared(
artifact_id(1),
artifact_path(1),
mock_now,
PrepareStats::default(),
);
builder.artifacts.insert_prepared(
artifact_id(2),
artifact_path(2),
mock_now,
PrepareStats::default(),
);
let mut test = builder.build();
let mut host = test.host_handle();
@@ -1188,7 +1185,7 @@ pub(crate) mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(PrepareStats::default()),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
@@ -1204,7 +1201,7 @@ pub(crate) mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(PrepareStats::default()),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
@@ -1258,7 +1255,7 @@ pub(crate) mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(PrepareStats::default()),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
@@ -1371,7 +1368,7 @@ pub(crate) mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(PrepareStats::default()),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
@@ -1527,7 +1524,7 @@ pub(crate) mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(PrepareStats::default()),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();
@@ -1703,7 +1700,7 @@ pub(crate) mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(PrepareStats::default()),
result: Ok(PrepareSuccess::default()),
})
.await
.unwrap();