mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 11:41:02 +00:00
Ensure that table router is always built (#952)
* Ensure that table router is always build This pr ensures that the table router is always build, aka the future is resolved. This is important, as the table router internally spawns tasks to handle gossip messages. Handling gossip messages is not only required on parachain validators, but also on relay chain validators to receive collations. Tests are added to ensure that the assumptions hold. * Fix compilation * Switch to closures * Remove empty line * Revert "Remove empty line" This reverts commit 0d4aaba1780aec1c8d61e1d5dcf7768918af02d9. * Revert "Switch to closures" This reverts commit d128c4ecc02c911552a3bfd2142b5a4f7b1338ba. * Hybrid approach * Rename test * Make trait crate local
This commit is contained in:
@@ -28,7 +28,7 @@ use serde::{Serialize, Deserialize};
|
||||
#[cfg(feature = "std")]
|
||||
use primitives::bytes;
|
||||
use primitives::RuntimeDebug;
|
||||
use runtime_primitives::traits::{Block as BlockT};
|
||||
use runtime_primitives::traits::Block as BlockT;
|
||||
use inherents::InherentIdentifier;
|
||||
use application_crypto::KeyTypeId;
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ use frame_support::{
|
||||
weights::DispatchInfo,
|
||||
};
|
||||
use pallet_transaction_payment_rpc_runtime_api::RuntimeDispatchInfo;
|
||||
use session::{historical as session_historical};
|
||||
use session::historical as session_historical;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
pub use staking::StakerStatus;
|
||||
|
||||
@@ -54,7 +54,7 @@ pub use self::shared_table::{
|
||||
pub use self::validation_service::{ServiceHandle, ServiceBuilder};
|
||||
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
pub use parachain::wasm_executor::{run_worker as run_validation_worker};
|
||||
pub use parachain::wasm_executor::run_worker as run_validation_worker;
|
||||
|
||||
mod dynamic_inclusion;
|
||||
mod error;
|
||||
@@ -108,6 +108,7 @@ pub trait Network {
|
||||
|
||||
/// Instantiate a table router using the given shared table.
|
||||
/// Also pass through any outgoing messages to be broadcast to peers.
|
||||
#[must_use]
|
||||
fn build_table_router(
|
||||
&self,
|
||||
table: Arc<SharedTable>,
|
||||
|
||||
@@ -26,21 +26,20 @@
|
||||
//!
|
||||
//! These attestation sessions are kept live until they are periodically garbage-collected.
|
||||
|
||||
use std::{time::{Duration, Instant}, sync::Arc};
|
||||
use std::{time::{Duration, Instant}, sync::Arc, pin::Pin};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::pipeline::FullOutput;
|
||||
use sc_client_api::{BlockchainEvents, BlockBackend};
|
||||
use sp_blockchain::HeaderBackend;
|
||||
use block_builder::BlockBuilderApi;
|
||||
use consensus::SelectChain;
|
||||
use futures::{future::ready, prelude::*, task::{Spawn, SpawnExt}};
|
||||
use futures::{prelude::*, task::{Spawn, SpawnExt}};
|
||||
use polkadot_primitives::{Block, Hash, BlockId};
|
||||
use polkadot_primitives::parachain::{
|
||||
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, SigningContext,
|
||||
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
|
||||
CollationInfo, SigningContext,
|
||||
};
|
||||
use babe_primitives::BabeApi;
|
||||
use keystore::KeyStorePtr;
|
||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||
use sp_api::{ProvideRuntimeApi, ApiExt};
|
||||
use runtime_primitives::traits::HashFor;
|
||||
use availability_store::Store as AvailabilityStore;
|
||||
|
||||
@@ -140,13 +139,10 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
|
||||
C: Collators + Send + Sync + Unpin + 'static,
|
||||
C::Collation: Send + Unpin + 'static,
|
||||
P: BlockchainEvents<Block> + BlockBackend<Block>,
|
||||
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
|
||||
P::Api: ParachainHost<Block> +
|
||||
BlockBuilderApi<Block> +
|
||||
BabeApi<Block> +
|
||||
ApiExt<Block, Error = sp_blockchain::Error>,
|
||||
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
||||
N: Network + Send + Sync + 'static,
|
||||
N::TableRouter: Send + 'static,
|
||||
N::TableRouter: Send + 'static + Sync,
|
||||
N::BuildTableRouter: Send + Unpin + 'static,
|
||||
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
|
||||
SC: SelectChain<Block> + 'static,
|
||||
@@ -171,10 +167,10 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
|
||||
let mut parachain_validation = ParachainValidationInstances {
|
||||
client: self.client.clone(),
|
||||
network: self.network,
|
||||
collators: self.collators,
|
||||
spawner: self.spawner,
|
||||
availability_store: self.availability_store,
|
||||
live_instances: HashMap::new(),
|
||||
collation_fetch: DefaultCollationFetch(self.collators),
|
||||
};
|
||||
|
||||
let client = self.client;
|
||||
@@ -236,6 +232,57 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
|
||||
}
|
||||
}
|
||||
|
||||
/// Abstraction over `collation_fetch`.
|
||||
pub(crate) trait CollationFetch {
|
||||
/// Error type used by `collation_fetch`.
|
||||
type Error: std::fmt::Debug;
|
||||
|
||||
/// Fetch a collation for the given `parachain`.
|
||||
fn collation_fetch<P>(
|
||||
self,
|
||||
parachain: ParaId,
|
||||
relay_parent: Hash,
|
||||
client: Arc<P>,
|
||||
max_block_data_size: Option<u64>,
|
||||
n_validators: usize,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
|
||||
where
|
||||
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
||||
P: ProvideRuntimeApi<Block> + Send + Sync + 'static;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DefaultCollationFetch<C>(C);
|
||||
impl<C> CollationFetch for DefaultCollationFetch<C>
|
||||
where
|
||||
C: Collators + Send + Sync + Unpin + 'static,
|
||||
C::Collation: Send + Unpin + 'static,
|
||||
{
|
||||
type Error = C::Error;
|
||||
|
||||
fn collation_fetch<P>(
|
||||
self,
|
||||
parachain: ParaId,
|
||||
relay_parent: Hash,
|
||||
client: Arc<P>,
|
||||
max_block_data_size: Option<u64>,
|
||||
n_validators: usize,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
|
||||
where
|
||||
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
||||
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||
{
|
||||
crate::collation::collation_fetch(
|
||||
parachain,
|
||||
relay_parent,
|
||||
self.0,
|
||||
client,
|
||||
max_block_data_size,
|
||||
n_validators,
|
||||
).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
// finds the first key we are capable of signing with out of the given set of validators,
|
||||
// if any.
|
||||
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
|
||||
@@ -248,13 +295,11 @@ fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc
|
||||
}
|
||||
|
||||
/// Constructs parachain-agreement instances.
|
||||
pub(crate) struct ParachainValidationInstances<C, N, P, SP> {
|
||||
pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
|
||||
/// The client instance.
|
||||
client: Arc<P>,
|
||||
/// The backing network handle.
|
||||
network: N,
|
||||
/// Parachain collators.
|
||||
collators: C,
|
||||
/// handle to spawner
|
||||
spawner: SP,
|
||||
/// Store for extrinsic data.
|
||||
@@ -262,18 +307,20 @@ pub(crate) struct ParachainValidationInstances<C, N, P, SP> {
|
||||
/// Live agreements. Maps relay chain parent hashes to attestation
|
||||
/// instances.
|
||||
live_instances: HashMap<Hash, ValidationInstanceHandle>,
|
||||
/// Used to fetch a collation.
|
||||
collation_fetch: CF,
|
||||
}
|
||||
|
||||
impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
C: Collators + Send + Unpin + 'static + Sync,
|
||||
impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
|
||||
N: Network,
|
||||
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
|
||||
P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
||||
C::Collation: Send + Unpin + 'static,
|
||||
N::TableRouter: Send + 'static,
|
||||
N::Error: 'static,
|
||||
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
|
||||
N::TableRouter: Send + 'static + Sync,
|
||||
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
|
||||
N::BuildTableRouter: Unpin + Send + 'static,
|
||||
SP: Spawn + Send + 'static,
|
||||
CF: CollationFetch + Clone + Send + Sync + 'static,
|
||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
|
||||
{
|
||||
@@ -361,13 +408,41 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
max_block_data_size,
|
||||
));
|
||||
|
||||
let router = self.network.build_table_router(
|
||||
let build_router = self.network.build_table_router(
|
||||
table.clone(),
|
||||
&validators,
|
||||
);
|
||||
|
||||
if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) {
|
||||
self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index);
|
||||
let availability_store = self.availability_store.clone();
|
||||
let client = self.client.clone();
|
||||
let collation_fetch = self.collation_fetch.clone();
|
||||
|
||||
let res = self.spawner.spawn(async move {
|
||||
// It is important that we build the router as it launches tasks internally
|
||||
// that are required to receive gossip messages.
|
||||
let router = match build_router.await {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
warn!(target: "validation", "Failed to build router: {:?}", e);
|
||||
return
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) {
|
||||
let n_validators = validators.len();
|
||||
|
||||
launch_work(
|
||||
move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
|
||||
availability_store,
|
||||
router,
|
||||
n_validators,
|
||||
index,
|
||||
).await;
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = res {
|
||||
error!(target: "validation", "Failed to create router and launch work: {:?}", e);
|
||||
}
|
||||
|
||||
let tracker = ValidationInstanceHandle {
|
||||
@@ -384,33 +459,28 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
fn retain<F: FnMut(&Hash) -> bool>(&mut self, mut pred: F) {
|
||||
self.live_instances.retain(|k, _| pred(k))
|
||||
}
|
||||
}
|
||||
|
||||
// launch parachain work asynchronously.
|
||||
fn launch_work(
|
||||
&self,
|
||||
relay_parent: Hash,
|
||||
validation_para: ParaId,
|
||||
build_router: N::BuildTableRouter,
|
||||
max_block_data_size: Option<u64>,
|
||||
async fn launch_work<CFF, E>(
|
||||
collation_fetch: impl FnOnce() -> CFF,
|
||||
availability_store: AvailabilityStore,
|
||||
router: impl TableRouter,
|
||||
n_validators: usize,
|
||||
local_id: ValidatorIndex,
|
||||
) {
|
||||
let (collators, client) = (self.collators.clone(), self.client.clone());
|
||||
let availability_store = self.availability_store.clone();
|
||||
|
||||
let with_router = move |router: N::TableRouter| {
|
||||
) where
|
||||
E: std::fmt::Debug,
|
||||
CFF: Future<Output = Result<(CollationInfo, FullOutput), E>> + Send,
|
||||
{
|
||||
// fetch a local collation from connected collators.
|
||||
let collation_work = crate::collation::collation_fetch(
|
||||
validation_para,
|
||||
relay_parent,
|
||||
collators,
|
||||
client.clone(),
|
||||
max_block_data_size,
|
||||
n_validators,
|
||||
);
|
||||
let (collation_info, full_output) = match collation_fetch().await {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
|
||||
return
|
||||
}
|
||||
};
|
||||
|
||||
collation_work.then(move |result| match result {
|
||||
Ok((collation_info, full_output)) => {
|
||||
let crate::pipeline::FullOutput {
|
||||
commitments,
|
||||
erasure_chunks,
|
||||
@@ -419,17 +489,10 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
} = full_output;
|
||||
|
||||
let receipt = collation_info.into_receipt(commitments);
|
||||
|
||||
// Apparently the `async move` block is the only way to convince
|
||||
// the compiler that we are not moving values out of borrowed context.
|
||||
let av_clone = availability_store.clone();
|
||||
let receipt_clone = receipt.clone();
|
||||
let erasure_chunks_clone = erasure_chunks.clone();
|
||||
let pov_block = available_data.pov_block.clone();
|
||||
|
||||
let res = async move {
|
||||
if let Err(e) = av_clone.make_available(
|
||||
receipt_clone.hash(),
|
||||
if let Err(e) = availability_store.make_available(
|
||||
receipt.hash(),
|
||||
available_data,
|
||||
).await {
|
||||
warn!(
|
||||
@@ -438,44 +501,263 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
e,
|
||||
);
|
||||
}
|
||||
if let Err(e) = av_clone.clone().add_erasure_chunks(
|
||||
receipt_clone,
|
||||
|
||||
if let Err(e) = availability_store.clone().add_erasure_chunks(
|
||||
receipt.clone(),
|
||||
n_validators as _,
|
||||
erasure_chunks_clone,
|
||||
erasure_chunks.clone(),
|
||||
).await {
|
||||
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
|
||||
}
|
||||
}
|
||||
.unit_error()
|
||||
.then(move |_| {
|
||||
router.local_collation(
|
||||
|
||||
if let Err(e) = router.local_collation(
|
||||
receipt,
|
||||
pov_block,
|
||||
(local_id, &erasure_chunks),
|
||||
).map_err(|e| warn!(target: "validation", "Failed to send local collation: {:?}", e))
|
||||
});
|
||||
).await {
|
||||
warn!(target: "validation", "Failed to send local collation: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
res.boxed()
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::{executor::{ThreadPool, self}, future::ready, channel::mpsc};
|
||||
use availability_store::ErasureNetworking;
|
||||
use polkadot_primitives::parachain::{
|
||||
PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex,
|
||||
CollationInfo, DutyRoster, GlobalValidationSchedule, LocalValidationData,
|
||||
Retriable, CollatorId, BlockData, Chain, AvailableData, SigningContext,
|
||||
};
|
||||
use runtime_primitives::traits::Block as BlockT;
|
||||
use std::pin::Pin;
|
||||
use sp_keyring::sr25519::Keyring;
|
||||
|
||||
/// Events fired while running mock implementations to follow execution.
|
||||
enum Events {
|
||||
BuildTableRouter,
|
||||
CollationFetch,
|
||||
LocalCollation,
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
|
||||
Box::pin(ready(Ok(())))
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MockNetwork(mpsc::UnboundedSender<Events>);
|
||||
|
||||
impl Network for MockNetwork {
|
||||
type Error = String;
|
||||
type TableRouter = MockTableRouter;
|
||||
type BuildTableRouter = Pin<Box<dyn Future<Output = Result<MockTableRouter, String>> + Send>>;
|
||||
|
||||
fn build_table_router(
|
||||
&self,
|
||||
_: Arc<SharedTable>,
|
||||
_: &[ValidatorId],
|
||||
) -> Self::BuildTableRouter {
|
||||
let event_sender = self.0.clone();
|
||||
async move {
|
||||
event_sender.unbounded_send(Events::BuildTableRouter).expect("Send `BuildTableRouter`");
|
||||
|
||||
Ok(MockTableRouter(event_sender))
|
||||
}.boxed()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MockTableRouter(mpsc::UnboundedSender<Events>);
|
||||
|
||||
impl TableRouter for MockTableRouter {
|
||||
type Error = String;
|
||||
type SendLocalCollation = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
|
||||
type FetchValidationProof = Box<dyn Future<Output = Result<PoVBlock, String>> + Unpin>;
|
||||
|
||||
fn local_collation(
|
||||
&self,
|
||||
_: AbridgedCandidateReceipt,
|
||||
_: PoVBlock,
|
||||
_: (ValidatorIndex, &[ErasureChunk]),
|
||||
) -> Self::SendLocalCollation {
|
||||
let sender = self.0.clone();
|
||||
|
||||
async move {
|
||||
sender.unbounded_send(Events::LocalCollation).expect("Send `LocalCollation`");
|
||||
|
||||
Ok(())
|
||||
}.boxed()
|
||||
}
|
||||
|
||||
fn fetch_pov_block(&self, _: &AbridgedCandidateReceipt) -> Self::FetchValidationProof {
|
||||
unimplemented!("Not required in tests")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MockErasureNetworking;
|
||||
|
||||
impl ErasureNetworking for MockErasureNetworking {
|
||||
type Error = String;
|
||||
|
||||
fn fetch_erasure_chunk(
|
||||
&self,
|
||||
_: &Hash,
|
||||
_: u32,
|
||||
) -> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>> {
|
||||
ready(Err("Not required in tests".to_string())).boxed()
|
||||
}
|
||||
|
||||
fn distribute_erasure_chunk(&self, _: Hash, _: ErasureChunk) {
|
||||
unimplemented!("Not required in tests")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MockCollationFetch(mpsc::UnboundedSender<Events>);
|
||||
|
||||
impl CollationFetch for MockCollationFetch {
|
||||
type Error = ();
|
||||
|
||||
fn collation_fetch<P>(
|
||||
self,
|
||||
parachain: ParaId,
|
||||
relay_parent: Hash,
|
||||
_: Arc<P>,
|
||||
_: Option<u64>,
|
||||
n_validators: usize,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), ()>> + Send>> {
|
||||
let info = CollationInfo {
|
||||
parachain_index: parachain,
|
||||
relay_parent,
|
||||
collator: Default::default(),
|
||||
signature: Default::default(),
|
||||
head_data: Default::default(),
|
||||
pov_block_hash: Default::default(),
|
||||
};
|
||||
|
||||
let router_work = build_router
|
||||
.map_ok(with_router)
|
||||
.map_err(|e| {
|
||||
warn!(target: "validation" , "Failed to build table router: {:?}", e);
|
||||
})
|
||||
.and_then(|r| r)
|
||||
.map(|_| ());
|
||||
let available_data = AvailableData {
|
||||
pov_block: PoVBlock { block_data: BlockData(Vec::new()) },
|
||||
omitted_validation: Default::default(),
|
||||
};
|
||||
|
||||
let full_output = FullOutput {
|
||||
available_data,
|
||||
commitments: Default::default(),
|
||||
erasure_chunks: Default::default(),
|
||||
n_validators,
|
||||
};
|
||||
|
||||
// spawn onto thread pool.
|
||||
if self.spawner.spawn(router_work).is_err() {
|
||||
error!("Failed to spawn router work task");
|
||||
let sender = self.0;
|
||||
|
||||
async move {
|
||||
sender.unbounded_send(Events::CollationFetch).expect("`CollationFetch` event send");
|
||||
|
||||
Ok((info, full_output))
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MockRuntimeApi {
|
||||
validators: Vec<ValidatorId>,
|
||||
duty_roster: DutyRoster,
|
||||
}
|
||||
|
||||
impl ProvideRuntimeApi<Block> for MockRuntimeApi {
|
||||
type Api = Self;
|
||||
|
||||
fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
|
||||
self.clone().into()
|
||||
}
|
||||
}
|
||||
|
||||
sp_api::mock_impl_runtime_apis! {
|
||||
impl ParachainHost<Block> for MockRuntimeApi {
|
||||
type Error = sp_blockchain::Error;
|
||||
|
||||
fn validators(&self) -> Vec<ValidatorId> { self.validators.clone() }
|
||||
fn duty_roster(&self) -> DutyRoster { self.duty_roster.clone() }
|
||||
fn active_parachains() -> Vec<(ParaId, Option<(CollatorId, Retriable)>)> { vec![(ParaId::from(1), None)] }
|
||||
fn global_validation_schedule() -> GlobalValidationSchedule { Default::default() }
|
||||
fn local_validation_data(_: ParaId) -> Option<LocalValidationData> { None }
|
||||
fn parachain_code(_: ParaId) -> Option<Vec<u8>> { None }
|
||||
fn get_heads(_: Vec<<Block as BlockT>::Extrinsic>) -> Option<Vec<AbridgedCandidateReceipt>> {
|
||||
None
|
||||
}
|
||||
fn signing_context() -> SigningContext {
|
||||
Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn launch_work_is_executed_properly() {
|
||||
let executor = ThreadPool::new().unwrap();
|
||||
let keystore = keystore::Store::new_in_memory();
|
||||
|
||||
// Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator.
|
||||
keystore.write().insert_ephemeral_from_seed::<ValidatorPair>(&Keyring::Bob.to_seed())
|
||||
.expect("Insert key into keystore");
|
||||
|
||||
let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())];
|
||||
let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())];
|
||||
let duty_roster = DutyRoster { validator_duty };
|
||||
|
||||
let (events_sender, events) = mpsc::unbounded();
|
||||
|
||||
let mut parachain_validation = ParachainValidationInstances {
|
||||
client: Arc::new(MockRuntimeApi { validators, duty_roster }),
|
||||
network: MockNetwork(events_sender.clone()),
|
||||
collation_fetch: MockCollationFetch(events_sender.clone()),
|
||||
spawner: executor.clone(),
|
||||
availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking),
|
||||
live_instances: HashMap::new(),
|
||||
};
|
||||
|
||||
parachain_validation.get_or_instantiate(Default::default(), &keystore, None)
|
||||
.expect("Creates new validation round");
|
||||
|
||||
let mut events = executor::block_on_stream(events);
|
||||
|
||||
assert!(matches!(events.next().unwrap(), Events::BuildTableRouter));
|
||||
assert!(matches!(events.next().unwrap(), Events::CollationFetch));
|
||||
assert!(matches!(events.next().unwrap(), Events::LocalCollation));
|
||||
|
||||
drop(events_sender);
|
||||
drop(parachain_validation);
|
||||
assert!(events.next().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn router_is_built_on_relay_chain_validator() {
|
||||
let executor = ThreadPool::new().unwrap();
|
||||
let keystore = keystore::Store::new_in_memory();
|
||||
|
||||
// Make sure `Alice` key is in the keystore, so this mocked node will be a relay-chain validator.
|
||||
keystore.write().insert_ephemeral_from_seed::<ValidatorPair>(&Keyring::Alice.to_seed())
|
||||
.expect("Insert key into keystore");
|
||||
|
||||
let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())];
|
||||
let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())];
|
||||
let duty_roster = DutyRoster { validator_duty };
|
||||
|
||||
let (events_sender, events) = mpsc::unbounded();
|
||||
|
||||
let mut parachain_validation = ParachainValidationInstances {
|
||||
client: Arc::new(MockRuntimeApi { validators, duty_roster }),
|
||||
network: MockNetwork(events_sender.clone()),
|
||||
collation_fetch: MockCollationFetch(events_sender.clone()),
|
||||
spawner: executor.clone(),
|
||||
availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking),
|
||||
live_instances: HashMap::new(),
|
||||
};
|
||||
|
||||
parachain_validation.get_or_instantiate(Default::default(), &keystore, None)
|
||||
.expect("Creates new validation round");
|
||||
|
||||
let mut events = executor::block_on_stream(events);
|
||||
|
||||
assert!(matches!(events.next().unwrap(), Events::BuildTableRouter));
|
||||
|
||||
drop(events_sender);
|
||||
drop(parachain_validation);
|
||||
assert!(events.next().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user