Asyncify launch_work a bit more (#777)

* Asyncify launch_work a bit more

* An error message misword

* A bit more async in collator
This commit is contained in:
Fedor Sakharov
2020-01-20 19:13:48 +03:00
committed by GitHub
parent 3a49649a51
commit 9a9860c8bd
2 changed files with 34 additions and 41 deletions
+14 -13
View File
@@ -363,13 +363,15 @@ fn run_collator_node<S, E, P, Extrinsic>(
}; };
let inner_exit = exit.clone(); let inner_exit = exit.clone();
let work = client.import_notification_stream() let work = async move {
.for_each(move |notification| { let mut notification_stream = client.import_notification_stream();
while let Some(notification) = notification_stream.next().await {
macro_rules! try_fr { macro_rules! try_fr {
($e:expr) => { ($e:expr) => {
match $e { match $e {
Ok(x) => x, Ok(x) => x,
Err(e) => return future::Either::Left(future::err(Error::Polkadot( Err(e) => return (future::err(Error::Polkadot(
format!("{:?}", e) format!("{:?}", e)
))), ))),
} }
@@ -386,11 +388,11 @@ fn run_collator_node<S, E, P, Extrinsic>(
let validation_network = validation_network.clone(); let validation_network = validation_network.clone();
let inner_exit_2 = inner_exit.clone(); let inner_exit_2 = inner_exit.clone();
let work = future::lazy(move |_| { let work = future::lazy(move |_| async move {
let api = client.runtime_api(); let api = client.runtime_api();
let status = match try_fr!(api.parachain_status(&id, para_id)) { let status = match try_fr!(api.parachain_status(&id, para_id)) {
Some(status) => status, Some(status) => status,
None => return future::Either::Left(future::ok(())), None => return future::ok(()),
}; };
let validators = try_fr!(api.validators(&id)); let validators = try_fr!(api.validators(&id));
@@ -407,14 +409,14 @@ fn run_collator_node<S, E, P, Extrinsic>(
validators, validators,
}; };
let collation_work = collate( if let Ok((collation, outgoing)) = collate(
relay_parent, relay_parent,
para_id, para_id,
status, status,
context, context,
parachain_context, parachain_context,
key, key,
).map_ok(move |(collation, outgoing)| { ).await {
network.with_spec(move |spec, ctx| { network.with_spec(move |spec, ctx| {
let res = spec.add_local_collation( let res = spec.add_local_collation(
ctx, ctx,
@@ -426,10 +428,9 @@ fn run_collator_node<S, E, P, Extrinsic>(
let exit = inner_exit_2.clone(); let exit = inner_exit_2.clone();
tokio::spawn(future::select(res.boxed(), exit).map(drop).map(|_| Ok(())).compat()); tokio::spawn(future::select(res.boxed(), exit).map(drop).map(|_| Ok(())).compat());
}) });
}); }
future::ok(())
future::Either::Right(collation_work)
}); });
let deadlined = future::select( let deadlined = future::select(
@@ -450,8 +451,8 @@ fn run_collator_node<S, E, P, Extrinsic>(
).map(drop); ).map(drop);
tokio::spawn(future.map(|_| Ok(())).compat()); tokio::spawn(future.map(|_| Ok(())).compat());
future::ready(()) }
}); }.boxed();
service.spawn_essential_task(work); service.spawn_essential_task(work);
@@ -34,7 +34,7 @@ use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi; use block_builder::BlockBuilderApi;
use consensus::SelectChain; use consensus::SelectChain;
use futures::prelude::*; use futures::prelude::*;
use futures::{future::{ready, select}, task::{Spawn, SpawnExt}}; use futures::{future::select, task::{Spawn, SpawnExt}};
use polkadot_primitives::{Block, Hash, BlockId}; use polkadot_primitives::{Block, Hash, BlockId};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
@@ -384,18 +384,18 @@ impl<C, N, P> ParachainValidationInstances<C, N, P> where
let (collators, client) = (self.collators.clone(), self.client.clone()); let (collators, client) = (self.collators.clone(), self.client.clone());
let availability_store = self.availability_store.clone(); let availability_store = self.availability_store.clone();
let with_router = move |router: N::TableRouter| { let with_router = move |router: N::TableRouter| async move {
// fetch a local collation from connected collators. // fetch a local collation from connected collators.
let collation_work = crate::collation::collation_fetch( match crate::collation::collation_fetch(
validation_para, validation_para,
relay_parent, relay_parent,
collators, collators,
client.clone(), client.clone(),
max_block_data_size, max_block_data_size,
); ).await {
Ok(collation_work) => {
let (collation, outgoing_targeted, fees_charged) = collation_work;
collation_work.map(move |result| match result {
Ok((collation, outgoing_targeted, fees_charged)) => {
match crate::collation::produce_receipt_and_chunks( match crate::collation::produce_receipt_and_chunks(
authorities_num, authorities_num,
&collation.pov, &collation.pov,
@@ -410,41 +410,33 @@ impl<C, N, P> ParachainValidationInstances<C, N, P> where
let chunks_clone = chunks.clone(); let chunks_clone = chunks.clone();
let receipt_clone = receipt.clone(); let receipt_clone = receipt.clone();
let res = async move { if let Err(e) = av_clone.clone().add_erasure_chunks(
if let Err(e) = av_clone.clone().add_erasure_chunks( relay_parent.clone(),
relay_parent.clone(), receipt_clone,
receipt_clone, chunks_clone,
chunks_clone, ).await {
).await { warn!(
warn!(target: "validation", "Failed to add erasure chunks: {}", e); target: "validation",
} "Failed to add erasure chunks: {}", e
} );
.unit_error() } else {
.boxed()
.then(move |_| {
router.local_collation( router.local_collation(
collation, collation,
receipt, receipt,
outgoing_targeted, outgoing_targeted,
(local_id, &chunks), (local_id, &chunks),
); );
ready(()) }
});
Some(res)
} }
Err(e) => { Err(e) => {
warn!(target: "validation", "Failed to produce a receipt: {:?}", e); warn!(target: "validation", "Failed to produce a receipt: {:?}", e);
None
} }
} };
} }
Err(e) => { Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e); warn!(target: "validation", "Failed to fetch a candidate: {:?}", e);
None
} }
}) }
}; };
let router = build_router let router = build_router