asyncify the big collator future (#40)

* Rewrite collator future

* Add future to tests
This commit is contained in:
Ashley
2020-01-16 17:28:58 +01:00
committed by Bastian Köcher
parent 73bcef09ab
commit b827f3355e
+76 -97
View File
@@ -41,9 +41,9 @@ use codec::{Decode, Encode};
use log::{error, trace}; use log::{error, trace};
use futures::{future, task::Spawn, Future, FutureExt, TryFutureExt}; use futures::{task::Spawn, Future};
use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration}; use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration, pin::Pin};
use parking_lot::Mutex; use parking_lot::Mutex;
@@ -105,34 +105,33 @@ where
+ Sync + Sync
+ 'static, + 'static,
{ {
type ProduceCandidate = Box< type ProduceCandidate = Pin<Box<
dyn Future<Output = Result<(BlockData, parachain::HeadData, OutgoingMessages), InvalidHead>> dyn Future<Output=Result<(BlockData, parachain::HeadData, OutgoingMessages), InvalidHead>>
+ Send + Send,
+ Unpin, >>;
>;
fn produce_candidate<I: IntoIterator<Item = (ParaId, Message)>>( fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&mut self, &mut self,
_relay_chain_parent: PHash, _relay_chain_parent: PHash,
status: ParachainStatus, status: ParachainStatus,
_: I, _: I,
) -> Self::ProduceCandidate { ) -> Self::ProduceCandidate {
trace!(target: "cumulus-collator", "Producing candidate");
let factory = self.proposer_factory.clone(); let factory = self.proposer_factory.clone();
let inherent_providers = self.inherent_data_providers.clone(); let inherent_providers = self.inherent_data_providers.clone();
let block_import = self.block_import.clone(); let block_import = self.block_import.clone();
let res = future::ready( Box::pin(async move {
HeadData::<Block>::decode(&mut &status.head_data.0[..]).map_err(|e| { trace!(target: "cumulus-collator", "Producing candidate");
let last_head = HeadData::<Block>::decode(&mut &status.head_data.0[..]).map_err(|e| {
error!(target: "cumulus-collator", "Could not decode the head data: {:?}", e); error!(target: "cumulus-collator", "Could not decode the head data: {:?}", e);
InvalidHead InvalidHead
}), })?;
)
.and_then(move |last_head| {
let parent_state_root = *last_head.header.state_root(); let parent_state_root = *last_head.header.state_root();
let proposer_inherent_data = factory let mut proposer = factory
.lock() .lock()
.init(&last_head.header) .init(&last_head.header)
.map_err(|e| { .map_err(|e| {
@@ -142,73 +141,60 @@ where
e, e,
); );
InvalidHead InvalidHead
}) })?;
.and_then(|proposer| {
inherent_providers
.create_inherent_data()
.map_err(|e| {
error!(
target: "cumulus-collator",
"Failed to create inherent data: {:?}",
e,
);
InvalidHead
})
.map(move |inherent_data| (proposer, inherent_data))
});
future::ready(proposer_inherent_data).and_then(move |(mut proposer, inherent_data)| { let inherent_data = inherent_providers
proposer .create_inherent_data()
.propose( .map_err(|e| {
inherent_data, error!(
Default::default(), target: "cumulus-collator",
//TODO: Fix this. "Failed to create inherent data: {:?}",
Duration::from_secs(6), e,
RecordProof::Yes, );
) InvalidHead
.map_err(|e| { })?;
error!(
target: "cumulus-collator",
"Proposing failed: {:?}",
e,
);
InvalidHead
})
.and_then(move |proposal| {
let Proposal {
block,
storage_changes,
proof,
} = proposal;
let res = match proof {
Some(proof) => {
let (header, extrinsics) = block.deconstruct();
// Create the parachain block data for the validators. let Proposal {
Ok(( block,
ParachainBlockData::<Block>::new( storage_changes,
header, proof,
extrinsics, } = proposer
proof.iter_nodes().collect(), .propose(
parent_state_root, inherent_data,
), Default::default(),
storage_changes, //TODO: Fix this.
)) Duration::from_secs(6),
} RecordProof::Yes,
None => { )
error!( .await
target: "cumulus-collator", .map_err(|e| {
"Proposer did not return the requested proof.", error!(
); target: "cumulus-collator",
Err(InvalidHead) "Proposing failed: {:?}",
} e,
}; );
InvalidHead
})?;
let proof = proof
.ok_or_else(|| {
error!(
target: "cumulus-collator",
"Proposer did not return the requested proof.",
);
InvalidHead
})?;
let (header, extrinsics) = block.deconstruct();
// Create the parachain block data for the validators.
let b = ParachainBlockData::<Block>::new(
header,
extrinsics,
proof.iter_nodes().collect(),
parent_state_root,
);
future::ready(res)
})
})
})
.and_then(move |(b, storage_changes)| {
let block_import_params = BlockImportParams { let block_import_params = BlockImportParams {
origin: BlockOrigin::Own, origin: BlockOrigin::Own,
header: b.header().clone(), header: b.header().clone(),
@@ -226,7 +212,7 @@ where
storage_changes: Some(storage_changes), storage_changes: Some(storage_changes),
}; };
let res = if let Err(err) = block_import if let Err(err) = block_import
.lock() .lock()
.import_block(block_import_params, Default::default()) .import_block(block_import_params, Default::default())
{ {
@@ -236,14 +222,9 @@ where
b.header().parent_hash(), b.header().parent_hash(),
err, err,
); );
Err(InvalidHead) return Err(InvalidHead);
} else { }
Ok(b)
};
future::ready(res)
})
.map_ok(|b| {
let block_data = BlockData(b.encode()); let block_data = BlockData(b.encode());
let head_data = HeadData::<Block> { let head_data = HeadData::<Block> {
header: b.into_header(), header: b.into_header(),
@@ -252,18 +233,16 @@ where
outgoing_messages: Vec::new(), outgoing_messages: Vec::new(),
}; };
( let candidate = (
block_data, block_data,
parachain::HeadData(head_data.encode()), parachain::HeadData(head_data.encode()),
messages, messages,
) );
})
.then(|r| {
trace!(target: "cumulus-collator", "Produced candidate: {:?}", r);
future::ready(r)
});
Box::new(res) trace!(target: "cumulus-collator", "Produced candidate: {:?}", candidate);
Ok(candidate)
})
} }
} }
@@ -398,7 +377,7 @@ mod tests {
}; };
use test_runtime::{Block, Header}; use test_runtime::{Block, Header};
use futures::Stream; use futures::{Stream, future};
#[derive(Debug)] #[derive(Debug)]
struct Error; struct Error;