diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 11b6d614fe..9ad8d9a5da 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -364,7 +364,7 @@ fn run_collator_node( ($e:expr) => { match $e { Ok(x) => x, - Err(e) => return (future::err(Error::Polkadot( + Err(e) => return future::Either::Left(future::err(Error::Polkadot( format!("{:?}", e) ))), } @@ -380,11 +380,11 @@ fn run_collator_node( let parachain_context = parachain_context.clone(); let validation_network = validation_network.clone(); - let work = future::lazy(move |_| async move { + let work = future::lazy(move |_| { let api = client.runtime_api(); let status = match try_fr!(api.parachain_status(&id, para_id)) { Some(status) => status, - None => return future::ok(()), + None => return future::Either::Left(future::ok(())), }; let validators = try_fr!(api.validators(&id)); @@ -401,14 +401,14 @@ fn run_collator_node( validators, }; - if let Ok((collation, outgoing)) = collate( + let collation_work = collate( relay_parent, para_id, status, context, parachain_context, key, - ).await { + ).map_ok(move |(collation, outgoing)| { network.with_spec(move |spec, ctx| { let res = spec.add_local_collation( ctx, @@ -419,9 +419,10 @@ fn run_collator_node( ); tokio::spawn(res.boxed()); - }); - } - future::ok(()) + }) + }); + + future::Either::Right(collation_work) }); let deadlined = future::select( @@ -436,7 +437,7 @@ fn run_collator_node( } }); - let future = silenced.map(drop); + let future = silenced.map(drop); tokio::spawn(future); } diff --git a/polkadot/validation/src/validation_service/mod.rs b/polkadot/validation/src/validation_service/mod.rs index bfb25401b4..51c39069d6 100644 --- a/polkadot/validation/src/validation_service/mod.rs +++ b/polkadot/validation/src/validation_service/mod.rs @@ -33,8 +33,7 @@ use sc_client_api::{BlockchainEvents, BlockBody}; use sp_blockchain::HeaderBackend; use block_builder::BlockBuilderApi; use consensus::SelectChain; -use futures::prelude::*; -use futures::task::{Spawn, SpawnExt}; +use futures::{future::ready, prelude::*, task::{Spawn, SpawnExt}}; use polkadot_primitives::{Block, Hash, BlockId}; use polkadot_primitives::parachain::{ Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, @@ -149,6 +148,7 @@ impl ServiceBuilder where N: Network + Send + Sync + 'static, N::TableRouter: Send + 'static, N::BuildTableRouter: Send + Unpin + 'static, + ::SendLocalCollation: Send, SC: SelectChain + 'static, SP: Spawn + Send + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 @@ -265,12 +265,13 @@ pub(crate) struct ParachainValidationInstances { } impl ParachainValidationInstances where - C: Collators + Send + Unpin + 'static, + C: Collators + Send + Unpin + 'static + Sync, N: Network, P: ProvideRuntimeApi + HeaderBackend + BlockBody + Send + Sync + 'static, P::Api: ParachainHost + BlockBuilderApi + ApiExt, C::Collation: Send + Unpin + 'static, N::TableRouter: Send + 'static, + ::SendLocalCollation: Send, N::BuildTableRouter: Unpin + Send + 'static, SP: Spawn + Send + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 @@ -288,9 +289,7 @@ impl ParachainValidationInstances where parent_hash: Hash, keystore: &KeyStorePtr, max_block_data_size: Option, - ) - -> Result - { + ) -> Result { use primitives::Pair; if let Some(tracker) = self.live_instances.get(&parent_hash) { @@ -381,18 +380,18 @@ impl ParachainValidationInstances where let (collators, client) = (self.collators.clone(), self.client.clone()); let availability_store = self.availability_store.clone(); - let with_router = move |router: N::TableRouter| async move { + let with_router = move |router: N::TableRouter| { // fetch a local collation from connected collators. - match crate::collation::collation_fetch( + let collation_work = crate::collation::collation_fetch( validation_para, relay_parent, collators, client.clone(), max_block_data_size, - ).await { - Ok(collation_work) => { - let (collation, outgoing_targeted, parent_head, fees_charged) = collation_work; + ); + collation_work.then(move |result| match result { + Ok((collation, outgoing_targeted, parent_head, fees_charged)) => { match crate::collation::produce_receipt_and_chunks( authorities_num, parent_head, @@ -408,40 +407,38 @@ impl ParachainValidationInstances where let chunks_clone = chunks.clone(); let receipt_clone = receipt.clone(); - if let Err(e) = av_clone.clone().add_erasure_chunks( - relay_parent.clone(), - receipt_clone, - chunks_clone, - ).await { - warn!( - target: "validation", - "Failed to add erasure chunks: {}", e - ); - } else { - let res = router.local_collation( + let res = async move { + if let Err(e) = av_clone.clone().add_erasure_chunks( + relay_parent.clone(), + receipt_clone, + chunks_clone, + ).await { + warn!(target: "validation", "Failed to add erasure chunks: {}", e); + } + } + .unit_error() + .then(move |_| { + router.local_collation( collation, receipt, outgoing_targeted, (local_id, &chunks), - ).await; + ).map_err(|e| warn!(target: "validation", "Failed to send local collation: {:?}", e)) + }); - if let Err(e) = res { - warn!( - target: "validation", - "Failed to notify network of local collation: {:?}", e - ); - } - }; + res.boxed() } Err(e) => { warn!(target: "validation", "Failed to produce a receipt: {:?}", e); + Box::pin(ready(Ok(()))) } - }; + } } Err(e) => { - warn!(target: "validation", "Failed to fetch a candidate: {:?}", e); + warn!(target: "validation", "Failed to collate candidate: {:?}", e); + Box::pin(ready(Ok(()))) } - } + }) }; let router_work = build_router @@ -449,6 +446,7 @@ impl ParachainValidationInstances where .map_err(|e| { warn!(target: "validation" , "Failed to build table router: {:?}", e); }) + .and_then(|r| r) .map(|_| ());