From c021f854a2044edbd760f3916a2c1faab7290893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 3 Apr 2020 22:33:52 +0200 Subject: [PATCH] Ensure that table router is always built (#952) * Ensure that table router is always build This pr ensures that the table router is always build, aka the future is resolved. This is important, as the table router internally spawns tasks to handle gossip messages. Handling gossip messages is not only required on parachain validators, but also on relay chain validators to receive collations. Tests are added to ensure that the assumptions hold. * Fix compilation * Switch to closures * Remove empty line * Revert "Remove empty line" This reverts commit 0d4aaba1780aec1c8d61e1d5dcf7768918af02d9. * Revert "Switch to closures" This reverts commit d128c4ecc02c911552a3bfd2142b5a4f7b1338ba. * Hybrid approach * Rename test * Make trait crate local --- polkadot/primitives/src/parachain.rs | 2 +- polkadot/runtime/common/src/parachains.rs | 2 +- polkadot/runtime/common/src/registrar.rs | 2 +- polkadot/runtime/test-runtime/src/lib.rs | 2 +- polkadot/validation/src/lib.rs | 3 +- .../validation/src/validation_service/mod.rs | 516 ++++++++++++++---- 6 files changed, 405 insertions(+), 122 deletions(-) diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index de05063387..db5562bb72 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -28,7 +28,7 @@ use serde::{Serialize, Deserialize}; #[cfg(feature = "std")] use primitives::bytes; use primitives::RuntimeDebug; -use runtime_primitives::traits::{Block as BlockT}; +use runtime_primitives::traits::Block as BlockT; use inherents::InherentIdentifier; use application_crypto::KeyTypeId; diff --git a/polkadot/runtime/common/src/parachains.rs b/polkadot/runtime/common/src/parachains.rs index d9fd0ecb10..11f1ac5e60 100644 --- a/polkadot/runtime/common/src/parachains.rs +++ b/polkadot/runtime/common/src/parachains.rs @@ -337,7 +337,7 @@ decl_storage! { pub RelayDispatchQueue: map hasher(twox_64_concat) ParaId => Vec; /// Size of the dispatch queues. Separated from actual data in order to avoid costly /// decoding when checking receipt validity. First item in tuple is the count of messages - /// second if the total length (in bytes) of the message payloads. + /// second if the total length (in bytes) of the message payloads. pub RelayDispatchQueueSize: map hasher(twox_64_concat) ParaId => (u32, u32); /// The ordered list of ParaIds that have a `RelayDispatchQueue` entry. NeedsDispatch: Vec; diff --git a/polkadot/runtime/common/src/registrar.rs b/polkadot/runtime/common/src/registrar.rs index 5722d9155d..c60930f466 100644 --- a/polkadot/runtime/common/src/registrar.rs +++ b/polkadot/runtime/common/src/registrar.rs @@ -498,7 +498,7 @@ decl_event!{ } impl Module { - /// Ensures that the given `ParaId` corresponds to a registered parathread, and returns a descriptor if so. + /// Ensures that the given `ParaId` corresponds to a registered parathread, and returns a descriptor if so. pub fn ensure_thread_id(id: ParaId) -> Option { Paras::get(id).and_then(|info| if let Scheduling::Dynamic = info.scheduling { Some(info) diff --git a/polkadot/runtime/test-runtime/src/lib.rs b/polkadot/runtime/test-runtime/src/lib.rs index 8230c4ad65..fa22284e76 100644 --- a/polkadot/runtime/test-runtime/src/lib.rs +++ b/polkadot/runtime/test-runtime/src/lib.rs @@ -53,7 +53,7 @@ use frame_support::{ weights::DispatchInfo, }; use pallet_transaction_payment_rpc_runtime_api::RuntimeDispatchInfo; -use session::{historical as session_historical}; +use session::historical as session_historical; #[cfg(feature = "std")] pub use staking::StakerStatus; diff --git a/polkadot/validation/src/lib.rs b/polkadot/validation/src/lib.rs index 65fb6c1d46..438e95ae66 100644 --- a/polkadot/validation/src/lib.rs +++ b/polkadot/validation/src/lib.rs @@ -54,7 +54,7 @@ pub use self::shared_table::{ pub use self::validation_service::{ServiceHandle, ServiceBuilder}; #[cfg(not(target_os = "unknown"))] -pub use parachain::wasm_executor::{run_worker as run_validation_worker}; +pub use parachain::wasm_executor::run_worker as run_validation_worker; mod dynamic_inclusion; mod error; @@ -108,6 +108,7 @@ pub trait Network { /// Instantiate a table router using the given shared table. /// Also pass through any outgoing messages to be broadcast to peers. + #[must_use] fn build_table_router( &self, table: Arc, diff --git a/polkadot/validation/src/validation_service/mod.rs b/polkadot/validation/src/validation_service/mod.rs index e4208d695a..987f0e063e 100644 --- a/polkadot/validation/src/validation_service/mod.rs +++ b/polkadot/validation/src/validation_service/mod.rs @@ -26,21 +26,20 @@ //! //! These attestation sessions are kept live until they are periodically garbage-collected. -use std::{time::{Duration, Instant}, sync::Arc}; +use std::{time::{Duration, Instant}, sync::Arc, pin::Pin}; use std::collections::HashMap; +use crate::pipeline::FullOutput; use sc_client_api::{BlockchainEvents, BlockBackend}; -use sp_blockchain::HeaderBackend; -use block_builder::BlockBuilderApi; use consensus::SelectChain; -use futures::{future::ready, prelude::*, task::{Spawn, SpawnExt}}; +use futures::{prelude::*, task::{Spawn, SpawnExt}}; use polkadot_primitives::{Block, Hash, BlockId}; use polkadot_primitives::parachain::{ - Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, SigningContext, + Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, + CollationInfo, SigningContext, }; -use babe_primitives::BabeApi; use keystore::KeyStorePtr; -use sp_api::{ApiExt, ProvideRuntimeApi}; +use sp_api::{ProvideRuntimeApi, ApiExt}; use runtime_primitives::traits::HashFor; use availability_store::Store as AvailabilityStore; @@ -140,13 +139,10 @@ impl ServiceBuilder where C: Collators + Send + Sync + Unpin + 'static, C::Collation: Send + Unpin + 'static, P: BlockchainEvents + BlockBackend, - P: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, - P::Api: ParachainHost + - BlockBuilderApi + - BabeApi + - ApiExt, + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost, N: Network + Send + Sync + 'static, - N::TableRouter: Send + 'static, + N::TableRouter: Send + 'static + Sync, N::BuildTableRouter: Send + Unpin + 'static, ::SendLocalCollation: Send, SC: SelectChain + 'static, @@ -171,10 +167,10 @@ impl ServiceBuilder where let mut parachain_validation = ParachainValidationInstances { client: self.client.clone(), network: self.network, - collators: self.collators, spawner: self.spawner, availability_store: self.availability_store, live_instances: HashMap::new(), + collation_fetch: DefaultCollationFetch(self.collators), }; let client = self.client; @@ -236,6 +232,57 @@ impl ServiceBuilder where } } +/// Abstraction over `collation_fetch`. +pub(crate) trait CollationFetch { + /// Error type used by `collation_fetch`. + type Error: std::fmt::Debug; + + /// Fetch a collation for the given `parachain`. + fn collation_fetch

( + self, + parachain: ParaId, + relay_parent: Hash, + client: Arc

, + max_block_data_size: Option, + n_validators: usize, + ) -> Pin> + Send>> + where + P::Api: ParachainHost, + P: ProvideRuntimeApi + Send + Sync + 'static; +} + +#[derive(Clone)] +struct DefaultCollationFetch(C); +impl CollationFetch for DefaultCollationFetch + where + C: Collators + Send + Sync + Unpin + 'static, + C::Collation: Send + Unpin + 'static, +{ + type Error = C::Error; + + fn collation_fetch

( + self, + parachain: ParaId, + relay_parent: Hash, + client: Arc

, + max_block_data_size: Option, + n_validators: usize, + ) -> Pin> + Send>> + where + P::Api: ParachainHost, + P: ProvideRuntimeApi + Send + Sync + 'static, + { + crate::collation::collation_fetch( + parachain, + relay_parent, + self.0, + client, + max_block_data_size, + n_validators, + ).boxed() + } +} + // finds the first key we are capable of signing with out of the given set of validators, // if any. fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option> { @@ -248,13 +295,11 @@ fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option { +pub(crate) struct ParachainValidationInstances { /// The client instance. client: Arc

, /// The backing network handle. network: N, - /// Parachain collators. - collators: C, /// handle to spawner spawner: SP, /// Store for extrinsic data. @@ -262,18 +307,20 @@ pub(crate) struct ParachainValidationInstances { /// Live agreements. Maps relay chain parent hashes to attestation /// instances. live_instances: HashMap, + /// Used to fetch a collation. + collation_fetch: CF, } -impl ParachainValidationInstances where - C: Collators + Send + Unpin + 'static + Sync, +impl ParachainValidationInstances where N: Network, - P: ProvideRuntimeApi + HeaderBackend + BlockBackend + Send + Sync + 'static, - P::Api: ParachainHost + BlockBuilderApi + ApiExt, - C::Collation: Send + Unpin + 'static, - N::TableRouter: Send + 'static, + N::Error: 'static, + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost, + N::TableRouter: Send + 'static + Sync, ::SendLocalCollation: Send, N::BuildTableRouter: Unpin + Send + 'static, SP: Spawn + Send + 'static, + CF: CollationFetch + Clone + Send + Sync + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 sp_api::StateBackendFor: sp_api::StateBackend>, { @@ -361,13 +408,41 @@ impl ParachainValidationInstances where max_block_data_size, )); - let router = self.network.build_table_router( + let build_router = self.network.build_table_router( table.clone(), &validators, ); - if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) { - self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index); + let availability_store = self.availability_store.clone(); + let client = self.client.clone(); + let collation_fetch = self.collation_fetch.clone(); + + let res = self.spawner.spawn(async move { + // It is important that we build the router as it launches tasks internally + // that are required to receive gossip messages. + let router = match build_router.await { + Ok(res) => res, + Err(e) => { + warn!(target: "validation", "Failed to build router: {:?}", e); + return + } + }; + + if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) { + let n_validators = validators.len(); + + launch_work( + move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators), + availability_store, + router, + n_validators, + index, + ).await; + } + }); + + if let Err(e) = res { + error!(target: "validation", "Failed to create router and launch work: {:?}", e); } let tracker = ValidationInstanceHandle { @@ -384,98 +459,305 @@ impl ParachainValidationInstances where fn retain bool>(&mut self, mut pred: F) { self.live_instances.retain(|k, _| pred(k)) } +} - // launch parachain work asynchronously. - fn launch_work( - &self, - relay_parent: Hash, - validation_para: ParaId, - build_router: N::BuildTableRouter, - max_block_data_size: Option, - n_validators: usize, - local_id: ValidatorIndex, - ) { - let (collators, client) = (self.collators.clone(), self.client.clone()); - let availability_store = self.availability_store.clone(); - - let with_router = move |router: N::TableRouter| { - // fetch a local collation from connected collators. - let collation_work = crate::collation::collation_fetch( - validation_para, - relay_parent, - collators, - client.clone(), - max_block_data_size, - n_validators, - ); - - collation_work.then(move |result| match result { - Ok((collation_info, full_output)) => { - let crate::pipeline::FullOutput { - commitments, - erasure_chunks, - available_data, - .. - } = full_output; - - let receipt = collation_info.into_receipt(commitments); - - // Apparently the `async move` block is the only way to convince - // the compiler that we are not moving values out of borrowed context. - let av_clone = availability_store.clone(); - let receipt_clone = receipt.clone(); - let erasure_chunks_clone = erasure_chunks.clone(); - let pov_block = available_data.pov_block.clone(); - - let res = async move { - if let Err(e) = av_clone.make_available( - receipt_clone.hash(), - available_data, - ).await { - warn!( - target: "validation", - "Failed to make parachain block data available: {}", - e, - ); - } - if let Err(e) = av_clone.clone().add_erasure_chunks( - receipt_clone, - n_validators as _, - erasure_chunks_clone, - ).await { - warn!(target: "validation", "Failed to add erasure chunks: {}", e); - } - } - .unit_error() - .then(move |_| { - router.local_collation( - receipt, - pov_block, - (local_id, &erasure_chunks), - ).map_err(|e| warn!(target: "validation", "Failed to send local collation: {:?}", e)) - }); - - res.boxed() - } - Err(e) => { - warn!(target: "validation", "Failed to collate candidate: {:?}", e); - Box::pin(ready(Ok(()))) - } - }) - }; - - let router_work = build_router - .map_ok(with_router) - .map_err(|e| { - warn!(target: "validation" , "Failed to build table router: {:?}", e); - }) - .and_then(|r| r) - .map(|_| ()); - - - // spawn onto thread pool. - if self.spawner.spawn(router_work).is_err() { - error!("Failed to spawn router work task"); +// launch parachain work asynchronously. +async fn launch_work( + collation_fetch: impl FnOnce() -> CFF, + availability_store: AvailabilityStore, + router: impl TableRouter, + n_validators: usize, + local_id: ValidatorIndex, +) where + E: std::fmt::Debug, + CFF: Future> + Send, +{ + // fetch a local collation from connected collators. + let (collation_info, full_output) = match collation_fetch().await { + Ok(res) => res, + Err(e) => { + warn!(target: "validation", "Failed to collate candidate: {:?}", e); + return } + }; + + let crate::pipeline::FullOutput { + commitments, + erasure_chunks, + available_data, + .. + } = full_output; + + let receipt = collation_info.into_receipt(commitments); + let pov_block = available_data.pov_block.clone(); + + if let Err(e) = availability_store.make_available( + receipt.hash(), + available_data, + ).await { + warn!( + target: "validation", + "Failed to make parachain block data available: {}", + e, + ); + } + + if let Err(e) = availability_store.clone().add_erasure_chunks( + receipt.clone(), + n_validators as _, + erasure_chunks.clone(), + ).await { + warn!(target: "validation", "Failed to add erasure chunks: {}", e); + } + + if let Err(e) = router.local_collation( + receipt, + pov_block, + (local_id, &erasure_chunks), + ).await { + warn!(target: "validation", "Failed to send local collation: {:?}", e); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::{executor::{ThreadPool, self}, future::ready, channel::mpsc}; + use availability_store::ErasureNetworking; + use polkadot_primitives::parachain::{ + PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex, + CollationInfo, DutyRoster, GlobalValidationSchedule, LocalValidationData, + Retriable, CollatorId, BlockData, Chain, AvailableData, SigningContext, + }; + use runtime_primitives::traits::Block as BlockT; + use std::pin::Pin; + use sp_keyring::sr25519::Keyring; + + /// Events fired while running mock implementations to follow execution. + enum Events { + BuildTableRouter, + CollationFetch, + LocalCollation, + } + + #[derive(Clone)] + struct MockNetwork(mpsc::UnboundedSender); + + impl Network for MockNetwork { + type Error = String; + type TableRouter = MockTableRouter; + type BuildTableRouter = Pin> + Send>>; + + fn build_table_router( + &self, + _: Arc, + _: &[ValidatorId], + ) -> Self::BuildTableRouter { + let event_sender = self.0.clone(); + async move { + event_sender.unbounded_send(Events::BuildTableRouter).expect("Send `BuildTableRouter`"); + + Ok(MockTableRouter(event_sender)) + }.boxed() + } + } + + #[derive(Clone)] + struct MockTableRouter(mpsc::UnboundedSender); + + impl TableRouter for MockTableRouter { + type Error = String; + type SendLocalCollation = Pin> + Send>>; + type FetchValidationProof = Box> + Unpin>; + + fn local_collation( + &self, + _: AbridgedCandidateReceipt, + _: PoVBlock, + _: (ValidatorIndex, &[ErasureChunk]), + ) -> Self::SendLocalCollation { + let sender = self.0.clone(); + + async move { + sender.unbounded_send(Events::LocalCollation).expect("Send `LocalCollation`"); + + Ok(()) + }.boxed() + } + + fn fetch_pov_block(&self, _: &AbridgedCandidateReceipt) -> Self::FetchValidationProof { + unimplemented!("Not required in tests") + } + } + + #[derive(Clone)] + struct MockErasureNetworking; + + impl ErasureNetworking for MockErasureNetworking { + type Error = String; + + fn fetch_erasure_chunk( + &self, + _: &Hash, + _: u32, + ) -> Pin> + Send>> { + ready(Err("Not required in tests".to_string())).boxed() + } + + fn distribute_erasure_chunk(&self, _: Hash, _: ErasureChunk) { + unimplemented!("Not required in tests") + } + } + + #[derive(Clone)] + struct MockCollationFetch(mpsc::UnboundedSender); + + impl CollationFetch for MockCollationFetch { + type Error = (); + + fn collation_fetch

( + self, + parachain: ParaId, + relay_parent: Hash, + _: Arc

, + _: Option, + n_validators: usize, + ) -> Pin> + Send>> { + let info = CollationInfo { + parachain_index: parachain, + relay_parent, + collator: Default::default(), + signature: Default::default(), + head_data: Default::default(), + pov_block_hash: Default::default(), + }; + + let available_data = AvailableData { + pov_block: PoVBlock { block_data: BlockData(Vec::new()) }, + omitted_validation: Default::default(), + }; + + let full_output = FullOutput { + available_data, + commitments: Default::default(), + erasure_chunks: Default::default(), + n_validators, + }; + + let sender = self.0; + + async move { + sender.unbounded_send(Events::CollationFetch).expect("`CollationFetch` event send"); + + Ok((info, full_output)) + }.boxed() + } + } + + #[derive(Clone)] + struct MockRuntimeApi { + validators: Vec, + duty_roster: DutyRoster, + } + + impl ProvideRuntimeApi for MockRuntimeApi { + type Api = Self; + + fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> { + self.clone().into() + } + } + + sp_api::mock_impl_runtime_apis! { + impl ParachainHost for MockRuntimeApi { + type Error = sp_blockchain::Error; + + fn validators(&self) -> Vec { self.validators.clone() } + fn duty_roster(&self) -> DutyRoster { self.duty_roster.clone() } + fn active_parachains() -> Vec<(ParaId, Option<(CollatorId, Retriable)>)> { vec![(ParaId::from(1), None)] } + fn global_validation_schedule() -> GlobalValidationSchedule { Default::default() } + fn local_validation_data(_: ParaId) -> Option { None } + fn parachain_code(_: ParaId) -> Option> { None } + fn get_heads(_: Vec<::Extrinsic>) -> Option> { + None + } + fn signing_context() -> SigningContext { + Default::default() + } + } + } + + #[test] + fn launch_work_is_executed_properly() { + let executor = ThreadPool::new().unwrap(); + let keystore = keystore::Store::new_in_memory(); + + // Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator. + keystore.write().insert_ephemeral_from_seed::(&Keyring::Bob.to_seed()) + .expect("Insert key into keystore"); + + let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())]; + let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())]; + let duty_roster = DutyRoster { validator_duty }; + + let (events_sender, events) = mpsc::unbounded(); + + let mut parachain_validation = ParachainValidationInstances { + client: Arc::new(MockRuntimeApi { validators, duty_roster }), + network: MockNetwork(events_sender.clone()), + collation_fetch: MockCollationFetch(events_sender.clone()), + spawner: executor.clone(), + availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), + live_instances: HashMap::new(), + }; + + parachain_validation.get_or_instantiate(Default::default(), &keystore, None) + .expect("Creates new validation round"); + + let mut events = executor::block_on_stream(events); + + assert!(matches!(events.next().unwrap(), Events::BuildTableRouter)); + assert!(matches!(events.next().unwrap(), Events::CollationFetch)); + assert!(matches!(events.next().unwrap(), Events::LocalCollation)); + + drop(events_sender); + drop(parachain_validation); + assert!(events.next().is_none()); + } + + #[test] + fn router_is_built_on_relay_chain_validator() { + let executor = ThreadPool::new().unwrap(); + let keystore = keystore::Store::new_in_memory(); + + // Make sure `Alice` key is in the keystore, so this mocked node will be a relay-chain validator. + keystore.write().insert_ephemeral_from_seed::(&Keyring::Alice.to_seed()) + .expect("Insert key into keystore"); + + let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())]; + let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())]; + let duty_roster = DutyRoster { validator_duty }; + + let (events_sender, events) = mpsc::unbounded(); + + let mut parachain_validation = ParachainValidationInstances { + client: Arc::new(MockRuntimeApi { validators, duty_roster }), + network: MockNetwork(events_sender.clone()), + collation_fetch: MockCollationFetch(events_sender.clone()), + spawner: executor.clone(), + availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), + live_instances: HashMap::new(), + }; + + parachain_validation.get_or_instantiate(Default::default(), &keystore, None) + .expect("Creates new validation round"); + + let mut events = executor::block_on_stream(events); + + assert!(matches!(events.next().unwrap(), Events::BuildTableRouter)); + + drop(events_sender); + drop(parachain_validation); + assert!(events.next().is_none()); } }