make SelectChain async (#9128)

* make SelectChain async

* make JustificationImport async
This commit is contained in:
André Silva
2021-06-20 12:01:09 +01:00
committed by GitHub
parent 650fc2f9c9
commit a26ec52450
18 changed files with 800 additions and 454 deletions
@@ -220,16 +220,16 @@ impl<B: BlockT> BlockImportWorker<B> {
metrics,
};
// Let's initialize `justification_import`
if let Some(justification_import) = worker.justification_import.as_mut() {
for (hash, number) in justification_import.on_start() {
worker.result_sender.request_justification(&hash, number);
}
}
let delay_between_blocks = Duration::default();
let future = async move {
// Let's initialize `justification_import`
if let Some(justification_import) = worker.justification_import.as_mut() {
for (hash, number) in justification_import.on_start().await {
worker.result_sender.request_justification(&hash, number);
}
}
let block_import_process = block_import_process(
block_import,
verifier,
@@ -254,15 +254,18 @@ impl<B: BlockT> BlockImportWorker<B> {
// Make sure to first process all justifications
while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
match justification {
Some(ImportJustification(who, hash, number, justification)) =>
worker.import_justification(who, hash, number, justification),
Some(ImportJustification(who, hash, number, justification)) => {
worker
.import_justification(who, hash, number, justification)
.await
}
None => {
log::debug!(
target: "block-import",
"Stopping block import because justification channel was closed!",
);
return
},
return;
}
}
}
@@ -278,7 +281,7 @@ impl<B: BlockT> BlockImportWorker<B> {
(future, justification_sender, block_import_sender)
}
fn import_justification(
async fn import_justification(
&mut self,
who: Origin,
hash: B::Hash,
@@ -286,8 +289,11 @@ impl<B: BlockT> BlockImportWorker<B> {
justification: Justification,
) {
let started = wasm_timer::Instant::now();
let success = self.justification_import.as_mut().map(|justification_import| {
justification_import.import_justification(hash, number, justification)
let success = match self.justification_import.as_mut() {
Some(justification_import) => justification_import
.import_justification(hash, number, justification)
.await
.map_err(|e| {
debug!(
target: "sync",
@@ -298,14 +304,19 @@ impl<B: BlockT> BlockImportWorker<B> {
who,
);
e
}).is_ok()
}).unwrap_or(false);
})
.is_ok(),
None => false,
};
if let Some(metrics) = self.metrics.as_ref() {
metrics.justification_import_time.observe(started.elapsed().as_secs_f64());
metrics
.justification_import_time
.observe(started.elapsed().as_secs_f64());
}
self.result_sender.justification_imported(who, &hash, number, success);
self.result_sender
.justification_imported(who, &hash, number, success);
}
}
@@ -472,10 +483,15 @@ mod tests {
}
}
#[async_trait::async_trait]
impl JustificationImport<Block> for () {
type Error = crate::Error;
fn import_justification(
async fn on_start(&mut self) -> Vec<(Hash, BlockNumber)> {
Vec::new()
}
async fn import_justification(
&mut self,
_hash: Hash,
_number: BlockNumber,