diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index c4e886a2c4..28ef198a7e 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -363,13 +363,15 @@ fn run_collator_node( }; let inner_exit = exit.clone(); - let work = client.import_notification_stream() - .for_each(move |notification| { + let work = async move { + let mut notification_stream = client.import_notification_stream(); + + while let Some(notification) = notification_stream.next().await { macro_rules! try_fr { ($e:expr) => { match $e { Ok(x) => x, - Err(e) => return future::Either::Left(future::err(Error::Polkadot( + Err(e) => return (future::err(Error::Polkadot( format!("{:?}", e) ))), } @@ -386,11 +388,11 @@ fn run_collator_node( let validation_network = validation_network.clone(); let inner_exit_2 = inner_exit.clone(); - let work = future::lazy(move |_| { + let work = future::lazy(move |_| async move { let api = client.runtime_api(); let status = match try_fr!(api.parachain_status(&id, para_id)) { Some(status) => status, - None => return future::Either::Left(future::ok(())), + None => return future::ok(()), }; let validators = try_fr!(api.validators(&id)); @@ -407,14 +409,14 @@ fn run_collator_node( validators, }; - let collation_work = collate( + if let Ok((collation, outgoing)) = collate( relay_parent, para_id, status, context, parachain_context, key, - ).map_ok(move |(collation, outgoing)| { + ).await { network.with_spec(move |spec, ctx| { let res = spec.add_local_collation( ctx, @@ -426,10 +428,9 @@ fn run_collator_node( let exit = inner_exit_2.clone(); tokio::spawn(future::select(res.boxed(), exit).map(drop).map(|_| Ok(())).compat()); - }) - }); - - future::Either::Right(collation_work) + }); + } + future::ok(()) }); let deadlined = future::select( @@ -450,8 +451,8 @@ fn run_collator_node( ).map(drop); tokio::spawn(future.map(|_| Ok(())).compat()); - future::ready(()) - }); + } + }.boxed(); service.spawn_essential_task(work); diff --git a/polkadot/validation/src/validation_service/mod.rs b/polkadot/validation/src/validation_service/mod.rs index 14dd85c72e..a0aa371fb5 100644 --- a/polkadot/validation/src/validation_service/mod.rs +++ b/polkadot/validation/src/validation_service/mod.rs @@ -34,7 +34,7 @@ use sp_blockchain::HeaderBackend; use block_builder::BlockBuilderApi; use consensus::SelectChain; use futures::prelude::*; -use futures::{future::{ready, select}, task::{Spawn, SpawnExt}}; +use futures::{future::select, task::{Spawn, SpawnExt}}; use polkadot_primitives::{Block, Hash, BlockId}; use polkadot_primitives::parachain::{ Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, @@ -384,18 +384,18 @@ impl ParachainValidationInstances where let (collators, client) = (self.collators.clone(), self.client.clone()); let availability_store = self.availability_store.clone(); - let with_router = move |router: N::TableRouter| { + let with_router = move |router: N::TableRouter| async move { // fetch a local collation from connected collators. - let collation_work = crate::collation::collation_fetch( + match crate::collation::collation_fetch( validation_para, relay_parent, collators, client.clone(), max_block_data_size, - ); + ).await { + Ok(collation_work) => { + let (collation, outgoing_targeted, fees_charged) = collation_work; - collation_work.map(move |result| match result { - Ok((collation, outgoing_targeted, fees_charged)) => { match crate::collation::produce_receipt_and_chunks( authorities_num, &collation.pov, @@ -410,41 +410,33 @@ impl ParachainValidationInstances where let chunks_clone = chunks.clone(); let receipt_clone = receipt.clone(); - let res = async move { - if let Err(e) = av_clone.clone().add_erasure_chunks( - relay_parent.clone(), - receipt_clone, - chunks_clone, - ).await { - warn!(target: "validation", "Failed to add erasure chunks: {}", e); - } - } - .unit_error() - .boxed() - .then(move |_| { + if let Err(e) = av_clone.clone().add_erasure_chunks( + relay_parent.clone(), + receipt_clone, + chunks_clone, + ).await { + warn!( + target: "validation", + "Failed to add erasure chunks: {}", e + ); + } else { router.local_collation( collation, receipt, outgoing_targeted, (local_id, &chunks), ); - ready(()) - }); - - - Some(res) + } } Err(e) => { warn!(target: "validation", "Failed to produce a receipt: {:?}", e); - None } - } + }; } Err(e) => { - warn!(target: "validation", "Failed to collate candidate: {:?}", e); - None + warn!(target: "validation", "Failed to fetch a candidate: {:?}", e); } - }) + } }; let router = build_router