refactor out validation hosts to pool struct (#972)

* refactor out validation hosts to pool struct

* make web-wasm compatible

* typo

* remove now-unused static hosts
This commit is contained in:
Robert Habermeier
2020-04-04 15:15:54 -04:00
committed by GitHub
parent a2a4f4c755
commit 15a83079ba
11 changed files with 122 additions and 48 deletions
-1
View File
@@ -4260,7 +4260,6 @@ dependencies = [
"adder", "adder",
"derive_more 0.99.3", "derive_more 0.99.3",
"halt", "halt",
"lazy_static",
"log 0.4.8", "log 0.4.8",
"parity-scale-codec", "parity-scale-codec",
"parking_lot 0.10.0", "parking_lot 0.10.0",
+3
View File
@@ -299,6 +299,7 @@ fn consensus_instances_cleaned_up() {
signing_context, signing_context,
AvailabilityStore::new_in_memory(service.clone()), AvailabilityStore::new_in_memory(service.clone()),
None, None,
None,
)); ));
pool.spawner().spawn_local(worker_task).unwrap(); pool.spawner().spawn_local(worker_task).unwrap();
@@ -329,6 +330,7 @@ fn collation_is_received_with_dropped_router() {
signing_context, signing_context,
AvailabilityStore::new_in_memory(service.clone()), AvailabilityStore::new_in_memory(service.clone()),
None, None,
None,
)); ));
pool.spawner().spawn_local(worker_task).unwrap(); pool.spawner().spawn_local(worker_task).unwrap();
@@ -550,6 +552,7 @@ fn fetches_pov_block_from_gossip() {
signing_context, signing_context,
AvailabilityStore::new_in_memory(service.clone()), AvailabilityStore::new_in_memory(service.clone()),
None, None,
None,
)); ));
let spawner = pool.spawner(); let spawner = pool.spawner();
-2
View File
@@ -16,7 +16,6 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch =
sp-externalities = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true } sp-externalities = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true } sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true } sp-io = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
lazy_static = { version = "1.4.0", optional = true }
parking_lot = { version = "0.10.0", optional = true } parking_lot = { version = "0.10.0", optional = true }
log = { version = "0.4.8", optional = true } log = { version = "0.4.8", optional = true }
@@ -38,7 +37,6 @@ std = [
"sp-std/std", "sp-std/std",
"shared_memory", "shared_memory",
"sp-core/std", "sp-core/std",
"lazy_static",
"parking_lot", "parking_lot",
"log", "log",
"sp-runtime-interface/std", "sp-runtime-interface/std",
+26 -11
View File
@@ -28,7 +28,7 @@ use sp_core::traits::CallInWasm;
use sp_wasm_interface::HostFunctions as _; use sp_wasm_interface::HostFunctions as _;
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
pub use validation_host::{run_worker, EXECUTION_TIMEOUT_SEC}; pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC};
mod validation_host; mod validation_host;
@@ -48,16 +48,31 @@ impl ParachainExt {
} }
} }
/// A stub validation-pool defined when compiling for WASM.
#[cfg(target_os = "unknown")]
#[derive(Clone)]
pub struct ValidationPool {
_inner: (), // private field means not publicly-instantiable
}
#[cfg(target_os = "unknown")]
impl ValidationPool {
/// Create a new `ValidationPool`.
pub fn new() -> Self {
ValidationPool { _inner: () }
}
}
/// WASM code execution mode. /// WASM code execution mode.
/// ///
/// > Note: When compiling for WASM, the `Remote` variants are not available. /// > Note: When compiling for WASM, the `Remote` variants are not available.
pub enum ExecutionMode { pub enum ExecutionMode<'a> {
/// Execute in-process. The execution can not be interrupted or aborted. /// Execute in-process. The execution can not be interrupted or aborted.
Local, Local,
/// Remote execution in a spawned process. /// Remote execution in a spawned process.
Remote, Remote(&'a ValidationPool),
/// Remote execution in a spawned test runner. /// Remote execution in a spawned test runner.
RemoteTest, RemoteTest(&'a ValidationPool),
} }
/// Error type for the wasm executor /// Error type for the wasm executor
@@ -115,27 +130,27 @@ pub fn validate_candidate<E: Externalities + 'static>(
validation_code: &[u8], validation_code: &[u8],
params: ValidationParams, params: ValidationParams,
ext: E, ext: E,
options: ExecutionMode, options: ExecutionMode<'_>,
) -> Result<ValidationResult, Error> { ) -> Result<ValidationResult, Error> {
match options { match options {
ExecutionMode::Local => { ExecutionMode::Local => {
validate_candidate_internal(validation_code, &params.encode(), ext) validate_candidate_internal(validation_code, &params.encode(), ext)
}, },
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
ExecutionMode::Remote => { ExecutionMode::Remote(pool) => {
validation_host::validate_candidate(validation_code, params, ext, false) pool.validate_candidate(validation_code, params, ext, false)
}, },
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
ExecutionMode::RemoteTest => { ExecutionMode::RemoteTest(pool) => {
validation_host::validate_candidate(validation_code, params, ext, true) pool.validate_candidate(validation_code, params, ext, true)
}, },
#[cfg(target_os = "unknown")] #[cfg(target_os = "unknown")]
ExecutionMode::Remote => ExecutionMode::Remote(pool) =>
Err(Error::System(Box::<dyn std::error::Error + Send + Sync>::from( Err(Error::System(Box::<dyn std::error::Error + Send + Sync>::from(
"Remote validator not available".to_string() "Remote validator not available".to_string()
) as Box<_>)), ) as Box<_>)),
#[cfg(target_os = "unknown")] #[cfg(target_os = "unknown")]
ExecutionMode::RemoteTest => ExecutionMode::RemoteTest(pool) =>
Err(Error::System(Box::<dyn std::error::Error + Send + Sync>::from( Err(Error::System(Box::<dyn std::error::Error + Send + Sync>::from(
"Remote validator not available".to_string() "Remote validator not available".to_string()
) as Box<_>)), ) as Box<_>)),
@@ -33,8 +33,6 @@ const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
const WORKER_ARG: &'static str = "validation-worker"; const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
const NUM_HOSTS: usize = 8;
/// Execution timeout in seconds; /// Execution timeout in seconds;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
pub const EXECUTION_TIMEOUT_SEC: u64 = 30; pub const EXECUTION_TIMEOUT_SEC: u64 = 30;
@@ -69,8 +67,42 @@ enum Event {
WorkerReady = 2, WorkerReady = 2,
} }
lazy_static::lazy_static! { /// A pool of hosts.
static ref HOSTS: [Mutex<ValidationHost>; NUM_HOSTS] = Default::default(); #[derive(Clone)]
pub struct ValidationPool {
hosts: Arc<Vec<Mutex<ValidationHost>>>,
}
const DEFAULT_NUM_HOSTS: usize = 8;
impl ValidationPool {
/// Creates a validation pool with the default configuration.
pub fn new() -> ValidationPool {
ValidationPool {
hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()),
}
}
/// Validate a candidate under the given validation code using the next
/// free validation host.
///
/// This will fail if the validation code is not a proper parachain validation module.
pub fn validate_candidate<E: Externalities>(
&self,
validation_code: &[u8],
params: ValidationParams,
externalities: E,
test_mode: bool,
) -> Result<ValidationResult, Error> {
for host in self.hosts.iter() {
if let Some(mut host) = host.try_lock() {
return host.validate_candidate(validation_code, params, externalities, test_mode);
}
}
// all workers are busy, just wait for the first one
self.hosts[0].lock().validate_candidate(validation_code, params, externalities, test_mode)
}
} }
/// Validation worker process entry point. Runs a loop waiting for candidates to validate /// Validation worker process entry point. Runs a loop waiting for candidates to validate
@@ -184,25 +216,6 @@ struct ValidationHost {
id: u32, id: u32,
} }
/// Validate a candidate under the given validation code.
///
/// This will fail if the validation code is not a proper parachain validation module.
pub fn validate_candidate<E: Externalities>(
validation_code: &[u8],
params: ValidationParams,
externalities: E,
test_mode: bool,
) -> Result<ValidationResult, Error> {
for host in HOSTS.iter() {
if let Some(mut host) = host.try_lock() {
return host.validate_candidate(validation_code, params, externalities, test_mode);
}
}
// all workers are busy, just wait for the first one
HOSTS[0].lock().validate_candidate(validation_code, params, externalities, test_mode)
}
impl Drop for ValidationHost { impl Drop for ValidationHost {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(ref mut worker) = &mut self.worker { if let Some(ref mut worker) = &mut self.worker {
+8 -3
View File
@@ -70,6 +70,8 @@ pub fn execute_good_on_parent() {
add: 512, add: 512,
}; };
let pool = parachain::wasm_executor::ValidationPool::new();
let ret = parachain::wasm_executor::validate_candidate( let ret = parachain::wasm_executor::validate_candidate(
TEST_CODE, TEST_CODE,
ValidationParams { ValidationParams {
@@ -77,7 +79,7 @@ pub fn execute_good_on_parent() {
block_data: block_data.encode(), block_data: block_data.encode(),
}, },
DummyExt, DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest, parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
).unwrap(); ).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data[..]).unwrap(); let new_head = HeadData::decode(&mut &ret.head_data[..]).unwrap();
@@ -92,6 +94,7 @@ fn execute_good_chain_on_parent() {
let mut number = 0; let mut number = 0;
let mut parent_hash = [0; 32]; let mut parent_hash = [0; 32];
let mut last_state = 0; let mut last_state = 0;
let pool = parachain::wasm_executor::ValidationPool::new();
for add in 0..10 { for add in 0..10 {
let parent_head = HeadData { let parent_head = HeadData {
@@ -112,7 +115,7 @@ fn execute_good_chain_on_parent() {
block_data: block_data.encode(), block_data: block_data.encode(),
}, },
DummyExt, DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest, parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
).unwrap(); ).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data[..]).unwrap(); let new_head = HeadData::decode(&mut &ret.head_data[..]).unwrap();
@@ -129,6 +132,8 @@ fn execute_good_chain_on_parent() {
#[test] #[test]
fn execute_bad_on_parent() { fn execute_bad_on_parent() {
let pool = parachain::wasm_executor::ValidationPool::new();
let parent_head = HeadData { let parent_head = HeadData {
number: 0, number: 0,
parent_hash: [0; 32], parent_hash: [0; 32],
@@ -147,6 +152,6 @@ fn execute_bad_on_parent() {
block_data: block_data.encode(), block_data: block_data.encode(),
}, },
DummyExt, DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest, parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
).unwrap_err(); ).unwrap_err();
} }
@@ -25,6 +25,8 @@ const INFINITE_LOOP_CODE: &[u8] = halt::WASM_BINARY;
#[test] #[test]
fn terminates_on_timeout() { fn terminates_on_timeout() {
let pool = parachain::wasm_executor::ValidationPool::new();
let result = parachain::wasm_executor::validate_candidate( let result = parachain::wasm_executor::validate_candidate(
INFINITE_LOOP_CODE, INFINITE_LOOP_CODE,
ValidationParams { ValidationParams {
@@ -32,7 +34,7 @@ fn terminates_on_timeout() {
block_data: Vec::new(), block_data: Vec::new(),
}, },
DummyExt, DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest, parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
); );
match result { match result {
Err(parachain::wasm_executor::Error::Timeout) => {}, Err(parachain::wasm_executor::Error::Timeout) => {},
@@ -45,7 +47,11 @@ fn terminates_on_timeout() {
#[test] #[test]
fn parallel_execution() { fn parallel_execution() {
let pool = parachain::wasm_executor::ValidationPool::new();
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let pool2 = pool.clone();
let thread = std::thread::spawn(move || let thread = std::thread::spawn(move ||
parachain::wasm_executor::validate_candidate( parachain::wasm_executor::validate_candidate(
INFINITE_LOOP_CODE, INFINITE_LOOP_CODE,
@@ -54,7 +60,7 @@ fn parallel_execution() {
block_data: Vec::new(), block_data: Vec::new(),
}, },
DummyExt, DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest, parachain::wasm_executor::ExecutionMode::RemoteTest(&pool2),
).ok()); ).ok());
let _ = parachain::wasm_executor::validate_candidate( let _ = parachain::wasm_executor::validate_candidate(
INFINITE_LOOP_CODE, INFINITE_LOOP_CODE,
@@ -63,7 +69,7 @@ fn parallel_execution() {
block_data: Vec::new(), block_data: Vec::new(),
}, },
DummyExt, DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest, parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
); );
thread.join().unwrap(); thread.join().unwrap();
// total time should be < 2 x EXECUTION_TIMEOUT_SEC // total time should be < 2 x EXECUTION_TIMEOUT_SEC
+2
View File
@@ -59,6 +59,7 @@ pub trait Collators: Clone {
/// A future which resolves when a collation is available. /// A future which resolves when a collation is available.
pub async fn collation_fetch<C: Collators, P>( pub async fn collation_fetch<C: Collators, P>(
validation_pool: Option<crate::pipeline::ValidationPool>,
parachain: ParaId, parachain: ParaId,
relay_parent: Hash, relay_parent: Hash,
collators: C, collators: C,
@@ -76,6 +77,7 @@ pub async fn collation_fetch<C: Collators, P>(
let collation = collators.collate(parachain, relay_parent).await?; let collation = collators.collate(parachain, relay_parent).await?;
let Collation { info, pov } = collation; let Collation { info, pov } = collation;
let res = crate::pipeline::full_output_validation_with_api( let res = crate::pipeline::full_output_validation_with_api(
validation_pool.as_ref(),
&*client, &*client,
&info, &info,
&pov, &pov,
+10 -1
View File
@@ -36,6 +36,8 @@ use sp_api::ProvideRuntimeApi;
use parking_lot::Mutex; use parking_lot::Mutex;
use crate::Error; use crate::Error;
pub use parachain::wasm_executor::ValidationPool;
/// Does basic checks of a collation. Provide the encoded PoV-block. /// Does basic checks of a collation. Provide the encoded PoV-block.
pub fn basic_checks( pub fn basic_checks(
collation: &CollationInfo, collation: &CollationInfo,
@@ -227,6 +229,7 @@ impl<'a> ValidatedCandidate<'a> {
/// Does full checks of a collation, with provided PoV-block and contextual data. /// Does full checks of a collation, with provided PoV-block and contextual data.
pub fn validate<'a>( pub fn validate<'a>(
validation_pool: Option<&'_ ValidationPool>,
collation: &'a CollationInfo, collation: &'a CollationInfo,
pov_block: &'a PoVBlock, pov_block: &'a PoVBlock,
local_validation: &'a LocalValidationData, local_validation: &'a LocalValidationData,
@@ -251,12 +254,16 @@ pub fn validate<'a>(
per_byte: 0, per_byte: 0,
}; };
let execution_mode = validation_pool
.map(ExecutionMode::Remote)
.unwrap_or(ExecutionMode::Local);
let ext = Externalities::new(local_validation.balance, fee_schedule); let ext = Externalities::new(local_validation.balance, fee_schedule);
match wasm_executor::validate_candidate( match wasm_executor::validate_candidate(
&validation_code, &validation_code,
params, params,
ext.clone(), ext.clone(),
ExecutionMode::Remote, execution_mode,
) { ) {
Ok(result) => { Ok(result) => {
if result.head_data == collation.head_data.0 { if result.head_data == collation.head_data.0 {
@@ -306,6 +313,7 @@ where
/// Does full-pipeline validation of a collation with provided contextual parameters. /// Does full-pipeline validation of a collation with provided contextual parameters.
pub fn full_output_validation_with_api<P>( pub fn full_output_validation_with_api<P>(
validation_pool: Option<&ValidationPool>,
api: &P, api: &P,
collation: &CollationInfo, collation: &CollationInfo,
pov_block: &PoVBlock, pov_block: &PoVBlock,
@@ -330,6 +338,7 @@ pub fn full_output_validation_with_api<P>(
&encoded_pov, &encoded_pov,
) )
.and_then(|()| validate( .and_then(|()| validate(
validation_pool,
&collation, &collation,
&pov_block, &pov_block,
&local_validation, &local_validation,
+14 -1
View File
@@ -39,7 +39,7 @@ use self::includable::IncludabilitySender;
use primitives::Pair; use primitives::Pair;
use sp_api::ProvideRuntimeApi; use sp_api::ProvideRuntimeApi;
use crate::pipeline::FullOutput; use crate::pipeline::{FullOutput, ValidationPool};
use crate::Error; use crate::Error;
mod includable; mod includable;
@@ -132,6 +132,7 @@ struct SharedTableInner {
trackers: Vec<IncludabilitySender>, trackers: Vec<IncludabilitySender>,
availability_store: AvailabilityStore, availability_store: AvailabilityStore,
validated: HashMap<Hash, ValidationWork>, validated: HashMap<Hash, ValidationWork>,
validation_pool: Option<ValidationPool>,
} }
impl SharedTableInner { impl SharedTableInner {
@@ -193,6 +194,7 @@ impl SharedTableInner {
}; };
work.map(|work| ParachainWork { work.map(|work| ParachainWork {
validation_pool: self.validation_pool.clone(),
availability_store: self.availability_store.clone(), availability_store: self.availability_store.clone(),
relay_parent: context.signing_context.parent_hash.clone(), relay_parent: context.signing_context.parent_hash.clone(),
work, work,
@@ -259,6 +261,7 @@ impl Validated {
/// Future that performs parachain validation work. /// Future that performs parachain validation work.
pub struct ParachainWork<Fetch> { pub struct ParachainWork<Fetch> {
validation_pool: Option<ValidationPool>,
work: Work<Fetch>, work: Work<Fetch>,
relay_parent: Hash, relay_parent: Hash,
availability_store: AvailabilityStore, availability_store: AvailabilityStore,
@@ -283,9 +286,11 @@ impl<Fetch: Future + Unpin> ParachainWork<Fetch> {
let n_validators = self.n_validators; let n_validators = self.n_validators;
let expected_relay_parent = self.relay_parent; let expected_relay_parent = self.relay_parent;
let pool = self.validation_pool.clone();
let validate = move |pov_block: &PoVBlock, candidate: &AbridgedCandidateReceipt| { let validate = move |pov_block: &PoVBlock, candidate: &AbridgedCandidateReceipt| {
let collation_info = candidate.to_collation_info(); let collation_info = candidate.to_collation_info();
let full_output = crate::pipeline::full_output_validation_with_api( let full_output = crate::pipeline::full_output_validation_with_api(
pool.as_ref(),
&*api, &*api,
&collation_info, &collation_info,
pov_block, pov_block,
@@ -416,6 +421,7 @@ impl SharedTable {
signing_context: SigningContext, signing_context: SigningContext,
availability_store: AvailabilityStore, availability_store: AvailabilityStore,
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
validation_pool: Option<ValidationPool>,
) -> Self { ) -> Self {
SharedTable { SharedTable {
context: Arc::new(TableContext { groups, key, signing_context, validators: validators.clone(), }), context: Arc::new(TableContext { groups, key, signing_context, validators: validators.clone(), }),
@@ -425,6 +431,7 @@ impl SharedTable {
validated: HashMap::new(), validated: HashMap::new(),
trackers: Vec::new(), trackers: Vec::new(),
availability_store, availability_store,
validation_pool,
})) }))
} }
} }
@@ -685,6 +692,7 @@ mod tests {
signing_context.clone(), signing_context.clone(),
AvailabilityStore::new_in_memory(DummyErasureNetworking), AvailabilityStore::new_in_memory(DummyErasureNetworking),
None, None,
None,
); );
let mut candidate = AbridgedCandidateReceipt::default(); let mut candidate = AbridgedCandidateReceipt::default();
@@ -741,6 +749,7 @@ mod tests {
signing_context.clone(), signing_context.clone(),
AvailabilityStore::new_in_memory(DummyErasureNetworking), AvailabilityStore::new_in_memory(DummyErasureNetworking),
None, None,
None,
); );
let mut candidate = AbridgedCandidateReceipt::default(); let mut candidate = AbridgedCandidateReceipt::default();
@@ -798,6 +807,7 @@ mod tests {
availability_store: store.clone(), availability_store: store.clone(),
max_block_data_size: None, max_block_data_size: None,
n_validators, n_validators,
validation_pool: None,
}; };
for i in 0..n_validators { for i in 0..n_validators {
@@ -867,6 +877,7 @@ mod tests {
availability_store: store.clone(), availability_store: store.clone(),
max_block_data_size: None, max_block_data_size: None,
n_validators, n_validators,
validation_pool: None,
}; };
let validated = block_on(producer.prime_with(|_, _| Ok( let validated = block_on(producer.prime_with(|_, _| Ok(
@@ -921,6 +932,7 @@ mod tests {
signing_context.clone(), signing_context.clone(),
AvailabilityStore::new_in_memory(DummyErasureNetworking), AvailabilityStore::new_in_memory(DummyErasureNetworking),
None, None,
None,
); );
let mut candidate = AbridgedCandidateReceipt::default(); let mut candidate = AbridgedCandidateReceipt::default();
@@ -988,6 +1000,7 @@ mod tests {
signing_context.clone(), signing_context.clone(),
AvailabilityStore::new_in_memory(DummyErasureNetworking), AvailabilityStore::new_in_memory(DummyErasureNetworking),
None, None,
None,
); );
let mut candidate = AbridgedCandidateReceipt::default(); let mut candidate = AbridgedCandidateReceipt::default();
@@ -47,6 +47,7 @@ use log::{warn, error, info, debug, trace};
use super::{Network, Collators, SharedTable, TableRouter}; use super::{Network, Collators, SharedTable, TableRouter};
use crate::Error; use crate::Error;
use crate::pipeline::ValidationPool;
/// A handle to spawn background tasks onto. /// A handle to spawn background tasks onto.
pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>; pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
@@ -164,13 +165,15 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
NotifyImport(sc_client_api::BlockImportNotification<Block>), NotifyImport(sc_client_api::BlockImportNotification<Block>),
} }
let validation_pool = Some(ValidationPool::new());
let mut parachain_validation = ParachainValidationInstances { let mut parachain_validation = ParachainValidationInstances {
client: self.client.clone(), client: self.client.clone(),
network: self.network, network: self.network,
spawner: self.spawner, spawner: self.spawner,
availability_store: self.availability_store, availability_store: self.availability_store,
live_instances: HashMap::new(), live_instances: HashMap::new(),
collation_fetch: DefaultCollationFetch(self.collators), validation_pool: validation_pool.clone(),
collation_fetch: DefaultCollationFetch(self.collators, validation_pool),
}; };
let client = self.client; let client = self.client;
@@ -252,7 +255,7 @@ pub(crate) trait CollationFetch {
} }
#[derive(Clone)] #[derive(Clone)]
struct DefaultCollationFetch<C>(C); struct DefaultCollationFetch<C>(C, Option<ValidationPool>);
impl<C> CollationFetch for DefaultCollationFetch<C> impl<C> CollationFetch for DefaultCollationFetch<C>
where where
C: Collators + Send + Sync + Unpin + 'static, C: Collators + Send + Sync + Unpin + 'static,
@@ -272,10 +275,12 @@ impl<C> CollationFetch for DefaultCollationFetch<C>
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>, P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static, P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
{ {
let DefaultCollationFetch(collators, validation_pool) = self;
crate::collation::collation_fetch( crate::collation::collation_fetch(
validation_pool,
parachain, parachain,
relay_parent, relay_parent,
self.0, collators,
client, client,
max_block_data_size, max_block_data_size,
n_validators, n_validators,
@@ -307,6 +312,9 @@ pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
/// Live agreements. Maps relay chain parent hashes to attestation /// Live agreements. Maps relay chain parent hashes to attestation
/// instances. /// instances.
live_instances: HashMap<Hash, ValidationInstanceHandle>, live_instances: HashMap<Hash, ValidationInstanceHandle>,
/// The underlying validation pool of processes to use.
/// Only `None` in tests.
validation_pool: Option<ValidationPool>,
/// Used to fetch a collation. /// Used to fetch a collation.
collation_fetch: CF, collation_fetch: CF,
} }
@@ -406,6 +414,7 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
signing_context, signing_context,
self.availability_store.clone(), self.availability_store.clone(),
max_block_data_size, max_block_data_size,
self.validation_pool.clone(),
)); ));
let build_router = self.network.build_table_router( let build_router = self.network.build_table_router(
@@ -709,6 +718,7 @@ mod tests {
spawner: executor.clone(), spawner: executor.clone(),
availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking),
live_instances: HashMap::new(), live_instances: HashMap::new(),
validation_pool: None,
}; };
parachain_validation.get_or_instantiate(Default::default(), &keystore, None) parachain_validation.get_or_instantiate(Default::default(), &keystore, None)
@@ -747,6 +757,7 @@ mod tests {
spawner: executor.clone(), spawner: executor.clone(),
availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking),
live_instances: HashMap::new(), live_instances: HashMap::new(),
validation_pool: None,
}; };
parachain_validation.get_or_instantiate(Default::default(), &keystore, None) parachain_validation.get_or_instantiate(Default::default(), &keystore, None)