From b827f3355e43ff09b0223db054e423d1c7f79f0d Mon Sep 17 00:00:00 2001 From: Ashley Date: Thu, 16 Jan 2020 17:28:58 +0100 Subject: [PATCH] `async`ify the big collator future (#40) * Rewrite collator future * Add future to tests --- cumulus/collator/src/lib.rs | 173 ++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 97 deletions(-) diff --git a/cumulus/collator/src/lib.rs b/cumulus/collator/src/lib.rs index 7f70b9bb44..7769c76fd8 100644 --- a/cumulus/collator/src/lib.rs +++ b/cumulus/collator/src/lib.rs @@ -41,9 +41,9 @@ use codec::{Decode, Encode}; use log::{error, trace}; -use futures::{future, task::Spawn, Future, FutureExt, TryFutureExt}; +use futures::{task::Spawn, Future}; -use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration}; +use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration, pin::Pin}; use parking_lot::Mutex; @@ -105,34 +105,33 @@ where + Sync + 'static, { - type ProduceCandidate = Box< - dyn Future> - + Send - + Unpin, - >; + type ProduceCandidate = Pin> + + Send, + >>; - fn produce_candidate>( + fn produce_candidate>( &mut self, _relay_chain_parent: PHash, status: ParachainStatus, _: I, ) -> Self::ProduceCandidate { - trace!(target: "cumulus-collator", "Producing candidate"); - let factory = self.proposer_factory.clone(); let inherent_providers = self.inherent_data_providers.clone(); let block_import = self.block_import.clone(); - let res = future::ready( - HeadData::::decode(&mut &status.head_data.0[..]).map_err(|e| { + Box::pin(async move { + trace!(target: "cumulus-collator", "Producing candidate"); + + let last_head = HeadData::::decode(&mut &status.head_data.0[..]).map_err(|e| { error!(target: "cumulus-collator", "Could not decode the head data: {:?}", e); InvalidHead - }), - ) - .and_then(move |last_head| { + })?; + + let parent_state_root = *last_head.header.state_root(); - let proposer_inherent_data = factory + let mut proposer = factory .lock() .init(&last_head.header) .map_err(|e| { @@ -142,73 +141,60 @@ where e, ); InvalidHead - }) - .and_then(|proposer| { - inherent_providers - .create_inherent_data() - .map_err(|e| { - error!( - target: "cumulus-collator", - "Failed to create inherent data: {:?}", - e, - ); - InvalidHead - }) - .map(move |inherent_data| (proposer, inherent_data)) - }); + })?; - future::ready(proposer_inherent_data).and_then(move |(mut proposer, inherent_data)| { - proposer - .propose( - inherent_data, - Default::default(), - //TODO: Fix this. - Duration::from_secs(6), - RecordProof::Yes, - ) - .map_err(|e| { - error!( - target: "cumulus-collator", - "Proposing failed: {:?}", - e, - ); - InvalidHead - }) - .and_then(move |proposal| { - let Proposal { - block, - storage_changes, - proof, - } = proposal; - let res = match proof { - Some(proof) => { - let (header, extrinsics) = block.deconstruct(); + let inherent_data = inherent_providers + .create_inherent_data() + .map_err(|e| { + error!( + target: "cumulus-collator", + "Failed to create inherent data: {:?}", + e, + ); + InvalidHead + })?; - // Create the parachain block data for the validators. - Ok(( - ParachainBlockData::::new( - header, - extrinsics, - proof.iter_nodes().collect(), - parent_state_root, - ), - storage_changes, - )) - } - None => { - error!( - target: "cumulus-collator", - "Proposer did not return the requested proof.", - ); - Err(InvalidHead) - } - }; + let Proposal { + block, + storage_changes, + proof, + } = proposer + .propose( + inherent_data, + Default::default(), + //TODO: Fix this. + Duration::from_secs(6), + RecordProof::Yes, + ) + .await + .map_err(|e| { + error!( + target: "cumulus-collator", + "Proposing failed: {:?}", + e, + ); + InvalidHead + })?; + + let proof = proof + .ok_or_else(|| { + error!( + target: "cumulus-collator", + "Proposer did not return the requested proof.", + ); + InvalidHead + })?; + + let (header, extrinsics) = block.deconstruct(); + + // Create the parachain block data for the validators. + let b = ParachainBlockData::::new( + header, + extrinsics, + proof.iter_nodes().collect(), + parent_state_root, + ); - future::ready(res) - }) - }) - }) - .and_then(move |(b, storage_changes)| { let block_import_params = BlockImportParams { origin: BlockOrigin::Own, header: b.header().clone(), @@ -226,7 +212,7 @@ where storage_changes: Some(storage_changes), }; - let res = if let Err(err) = block_import + if let Err(err) = block_import .lock() .import_block(block_import_params, Default::default()) { @@ -236,14 +222,9 @@ where b.header().parent_hash(), err, ); - Err(InvalidHead) - } else { - Ok(b) - }; + return Err(InvalidHead); + } - future::ready(res) - }) - .map_ok(|b| { let block_data = BlockData(b.encode()); let head_data = HeadData:: { header: b.into_header(), @@ -252,18 +233,16 @@ where outgoing_messages: Vec::new(), }; - ( + let candidate = ( block_data, parachain::HeadData(head_data.encode()), messages, - ) - }) - .then(|r| { - trace!(target: "cumulus-collator", "Produced candidate: {:?}", r); - future::ready(r) - }); + ); - Box::new(res) + trace!(target: "cumulus-collator", "Produced candidate: {:?}", candidate); + + Ok(candidate) + }) } } @@ -398,7 +377,7 @@ mod tests { }; use test_runtime::{Block, Header}; - use futures::Stream; + use futures::{Stream, future}; #[derive(Debug)] struct Error;