mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-29 10:17:57 +00:00
Revert async await to fix collation (#839)
* Revert 9a9860c8bd
* Make it work
This commit is contained in:
@@ -364,7 +364,7 @@ fn run_collator_node<S, P, Extrinsic>(
|
||||
($e:expr) => {
|
||||
match $e {
|
||||
Ok(x) => x,
|
||||
Err(e) => return (future::err(Error::Polkadot(
|
||||
Err(e) => return future::Either::Left(future::err(Error::Polkadot(
|
||||
format!("{:?}", e)
|
||||
))),
|
||||
}
|
||||
@@ -380,11 +380,11 @@ fn run_collator_node<S, P, Extrinsic>(
|
||||
let parachain_context = parachain_context.clone();
|
||||
let validation_network = validation_network.clone();
|
||||
|
||||
let work = future::lazy(move |_| async move {
|
||||
let work = future::lazy(move |_| {
|
||||
let api = client.runtime_api();
|
||||
let status = match try_fr!(api.parachain_status(&id, para_id)) {
|
||||
Some(status) => status,
|
||||
None => return future::ok(()),
|
||||
None => return future::Either::Left(future::ok(())),
|
||||
};
|
||||
|
||||
let validators = try_fr!(api.validators(&id));
|
||||
@@ -401,14 +401,14 @@ fn run_collator_node<S, P, Extrinsic>(
|
||||
validators,
|
||||
};
|
||||
|
||||
if let Ok((collation, outgoing)) = collate(
|
||||
let collation_work = collate(
|
||||
relay_parent,
|
||||
para_id,
|
||||
status,
|
||||
context,
|
||||
parachain_context,
|
||||
key,
|
||||
).await {
|
||||
).map_ok(move |(collation, outgoing)| {
|
||||
network.with_spec(move |spec, ctx| {
|
||||
let res = spec.add_local_collation(
|
||||
ctx,
|
||||
@@ -419,9 +419,10 @@ fn run_collator_node<S, P, Extrinsic>(
|
||||
);
|
||||
|
||||
tokio::spawn(res.boxed());
|
||||
});
|
||||
}
|
||||
future::ok(())
|
||||
})
|
||||
});
|
||||
|
||||
future::Either::Right(collation_work)
|
||||
});
|
||||
|
||||
let deadlined = future::select(
|
||||
@@ -436,7 +437,7 @@ fn run_collator_node<S, P, Extrinsic>(
|
||||
}
|
||||
});
|
||||
|
||||
let future = silenced.map(drop);
|
||||
let future = silenced.map(drop);
|
||||
|
||||
tokio::spawn(future);
|
||||
}
|
||||
|
||||
@@ -33,8 +33,7 @@ use sc_client_api::{BlockchainEvents, BlockBody};
|
||||
use sp_blockchain::HeaderBackend;
|
||||
use block_builder::BlockBuilderApi;
|
||||
use consensus::SelectChain;
|
||||
use futures::prelude::*;
|
||||
use futures::task::{Spawn, SpawnExt};
|
||||
use futures::{future::ready, prelude::*, task::{Spawn, SpawnExt}};
|
||||
use polkadot_primitives::{Block, Hash, BlockId};
|
||||
use polkadot_primitives::parachain::{
|
||||
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
|
||||
@@ -149,6 +148,7 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
|
||||
N: Network + Send + Sync + 'static,
|
||||
N::TableRouter: Send + 'static,
|
||||
N::BuildTableRouter: Send + Unpin + 'static,
|
||||
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
|
||||
SC: SelectChain<Block> + 'static,
|
||||
SP: Spawn + Send + 'static,
|
||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||
@@ -265,12 +265,13 @@ pub(crate) struct ParachainValidationInstances<C, N, P, SP> {
|
||||
}
|
||||
|
||||
impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
C: Collators + Send + Unpin + 'static,
|
||||
C: Collators + Send + Unpin + 'static + Sync,
|
||||
N: Network,
|
||||
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
|
||||
P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
||||
C::Collation: Send + Unpin + 'static,
|
||||
N::TableRouter: Send + 'static,
|
||||
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
|
||||
N::BuildTableRouter: Unpin + Send + 'static,
|
||||
SP: Spawn + Send + 'static,
|
||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||
@@ -288,9 +289,7 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
parent_hash: Hash,
|
||||
keystore: &KeyStorePtr,
|
||||
max_block_data_size: Option<u64>,
|
||||
)
|
||||
-> Result<ValidationInstanceHandle, Error>
|
||||
{
|
||||
) -> Result<ValidationInstanceHandle, Error> {
|
||||
use primitives::Pair;
|
||||
|
||||
if let Some(tracker) = self.live_instances.get(&parent_hash) {
|
||||
@@ -381,18 +380,18 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
let (collators, client) = (self.collators.clone(), self.client.clone());
|
||||
let availability_store = self.availability_store.clone();
|
||||
|
||||
let with_router = move |router: N::TableRouter| async move {
|
||||
let with_router = move |router: N::TableRouter| {
|
||||
// fetch a local collation from connected collators.
|
||||
match crate::collation::collation_fetch(
|
||||
let collation_work = crate::collation::collation_fetch(
|
||||
validation_para,
|
||||
relay_parent,
|
||||
collators,
|
||||
client.clone(),
|
||||
max_block_data_size,
|
||||
).await {
|
||||
Ok(collation_work) => {
|
||||
let (collation, outgoing_targeted, parent_head, fees_charged) = collation_work;
|
||||
);
|
||||
|
||||
collation_work.then(move |result| match result {
|
||||
Ok((collation, outgoing_targeted, parent_head, fees_charged)) => {
|
||||
match crate::collation::produce_receipt_and_chunks(
|
||||
authorities_num,
|
||||
parent_head,
|
||||
@@ -408,40 +407,38 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
let chunks_clone = chunks.clone();
|
||||
let receipt_clone = receipt.clone();
|
||||
|
||||
if let Err(e) = av_clone.clone().add_erasure_chunks(
|
||||
relay_parent.clone(),
|
||||
receipt_clone,
|
||||
chunks_clone,
|
||||
).await {
|
||||
warn!(
|
||||
target: "validation",
|
||||
"Failed to add erasure chunks: {}", e
|
||||
);
|
||||
} else {
|
||||
let res = router.local_collation(
|
||||
let res = async move {
|
||||
if let Err(e) = av_clone.clone().add_erasure_chunks(
|
||||
relay_parent.clone(),
|
||||
receipt_clone,
|
||||
chunks_clone,
|
||||
).await {
|
||||
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
|
||||
}
|
||||
}
|
||||
.unit_error()
|
||||
.then(move |_| {
|
||||
router.local_collation(
|
||||
collation,
|
||||
receipt,
|
||||
outgoing_targeted,
|
||||
(local_id, &chunks),
|
||||
).await;
|
||||
).map_err(|e| warn!(target: "validation", "Failed to send local collation: {:?}", e))
|
||||
});
|
||||
|
||||
if let Err(e) = res {
|
||||
warn!(
|
||||
target: "validation",
|
||||
"Failed to notify network of local collation: {:?}", e
|
||||
);
|
||||
}
|
||||
};
|
||||
res.boxed()
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(target: "validation", "Failed to produce a receipt: {:?}", e);
|
||||
Box::pin(ready(Ok(())))
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(target: "validation", "Failed to fetch a candidate: {:?}", e);
|
||||
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
|
||||
Box::pin(ready(Ok(())))
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let router_work = build_router
|
||||
@@ -449,6 +446,7 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
|
||||
.map_err(|e| {
|
||||
warn!(target: "validation" , "Failed to build table router: {:?}", e);
|
||||
})
|
||||
.and_then(|r| r)
|
||||
.map(|_| ());
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user