mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-01 15:57:55 +00:00
Move propose_with into a dedicated blocking threadpool (#614)
This commit is contained in:
Generated
+3
@@ -3818,6 +3818,7 @@ dependencies = [
|
||||
"substrate-transaction-pool 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
|
||||
"substrate-trie 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
|
||||
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-executor 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6031,6 +6032,8 @@ version = "0.2.0-alpha.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-sync 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -14,6 +14,7 @@ tokio = "0.1.22"
|
||||
derive_more = "0.14.1"
|
||||
log = "0.4.8"
|
||||
exit-future = "0.1.4"
|
||||
tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
|
||||
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
|
||||
availability_store = { package = "polkadot-availability-store", path = "../availability-store" }
|
||||
parachain = { package = "polkadot-parachain", path = "../parachain" }
|
||||
|
||||
@@ -34,7 +34,8 @@ use std::{
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
time::{self, Duration, Instant},
|
||||
task::{Poll, Context}
|
||||
task::{Poll, Context},
|
||||
mem,
|
||||
};
|
||||
|
||||
use babe_primitives::BabeApi;
|
||||
@@ -498,7 +499,7 @@ impl<C, N, P, SC, TxApi> ProposerFactory<C, N, P, SC, TxApi> where
|
||||
impl<C, N, P, SC, TxApi> consensus::Environment<Block> for ProposerFactory<C, N, P, SC, TxApi> where
|
||||
C: Collators + Send + 'static,
|
||||
N: Network,
|
||||
TxApi: PoolChainApi<Block=Block>,
|
||||
TxApi: PoolChainApi<Block=Block> + 'static,
|
||||
P: ProvideRuntimeApi + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
|
||||
P::Api: ParachainHost<Block> +
|
||||
BlockBuilderApi<Block> +
|
||||
@@ -557,8 +558,8 @@ pub struct Proposer<C: Send + Sync, TxApi: PoolChainApi> where
|
||||
}
|
||||
|
||||
impl<C, TxApi> consensus::Proposer<Block> for Proposer<C, TxApi> where
|
||||
TxApi: PoolChainApi<Block=Block>,
|
||||
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
|
||||
TxApi: PoolChainApi<Block=Block> + 'static,
|
||||
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
|
||||
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = client_error::Error>,
|
||||
{
|
||||
type Error = Error;
|
||||
@@ -616,18 +617,20 @@ impl<C, TxApi> consensus::Proposer<Block> for Proposer<C, TxApi> where
|
||||
};
|
||||
|
||||
Either::Left(CreateProposal {
|
||||
parent_hash: self.parent_hash.clone(),
|
||||
parent_number: self.parent_number.clone(),
|
||||
parent_id: self.parent_id.clone(),
|
||||
client: self.client.clone(),
|
||||
transaction_pool: self.transaction_pool.clone(),
|
||||
table: self.tracker.table.clone(),
|
||||
believed_minimum_timestamp: believed_timestamp,
|
||||
timing,
|
||||
inherent_data: Some(inherent_data),
|
||||
inherent_digests,
|
||||
// leave some time for the proposal finalisation
|
||||
deadline,
|
||||
state: CreateProposalState::Pending(CreateProposalData {
|
||||
parent_hash: self.parent_hash.clone(),
|
||||
parent_number: self.parent_number.clone(),
|
||||
parent_id: self.parent_id.clone(),
|
||||
client: self.client.clone(),
|
||||
transaction_pool: self.transaction_pool.clone(),
|
||||
table: self.tracker.table.clone(),
|
||||
believed_minimum_timestamp: believed_timestamp,
|
||||
timing,
|
||||
inherent_data: Some(inherent_data),
|
||||
inherent_digests,
|
||||
// leave some time for the proposal finalisation
|
||||
deadline,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -686,6 +689,21 @@ impl ProposalTiming {
|
||||
|
||||
/// Future which resolves upon the creation of a proposal.
|
||||
pub struct CreateProposal<C: Send + Sync, TxApi: PoolChainApi> {
|
||||
state: CreateProposalState<C, TxApi>,
|
||||
}
|
||||
|
||||
/// Current status of the proposal future.
|
||||
enum CreateProposalState<C: Send + Sync, TxApi: PoolChainApi> {
|
||||
/// Pending inclusion, with given proposal data.
|
||||
Pending(CreateProposalData<C, TxApi>),
|
||||
/// Represents the state when we switch from pending to fired.
|
||||
Switching,
|
||||
/// Block proposing has fired.
|
||||
Fired(tokio_executor::blocking::Blocking<Result<Block, Error>>),
|
||||
}
|
||||
|
||||
/// Inner data of the create proposal.
|
||||
struct CreateProposalData<C: Send + Sync, TxApi: PoolChainApi> {
|
||||
parent_hash: Hash,
|
||||
parent_number: BlockNumber,
|
||||
parent_id: BlockId,
|
||||
@@ -699,12 +717,12 @@ pub struct CreateProposal<C: Send + Sync, TxApi: PoolChainApi> {
|
||||
deadline: Instant,
|
||||
}
|
||||
|
||||
impl<C, TxApi> CreateProposal<C, TxApi> where
|
||||
impl<C, TxApi> CreateProposalData<C, TxApi> where
|
||||
TxApi: PoolChainApi<Block=Block>,
|
||||
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
|
||||
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = client_error::Error>,
|
||||
{
|
||||
fn propose_with(&mut self, candidates: Vec<AttestedCandidate>) -> Result<Block, Error> {
|
||||
fn propose_with(mut self, candidates: Vec<AttestedCandidate>) -> Result<Block, Error> {
|
||||
use block_builder::BlockBuilder;
|
||||
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
|
||||
|
||||
@@ -792,22 +810,51 @@ impl<C, TxApi> CreateProposal<C, TxApi> where
|
||||
}
|
||||
|
||||
impl<C, TxApi> futures03::Future for CreateProposal<C, TxApi> where
|
||||
TxApi: PoolChainApi<Block=Block>,
|
||||
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
|
||||
TxApi: PoolChainApi<Block=Block> + 'static,
|
||||
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
|
||||
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = client_error::Error>,
|
||||
{
|
||||
type Output = Result<Block, Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let mut state = CreateProposalState::Switching;
|
||||
mem::swap(&mut state, &mut self.state);
|
||||
|
||||
// 1. try to propose if we have enough includable candidates and other
|
||||
// delays have concluded.
|
||||
let included = self.table.includable_count();
|
||||
futures03::ready!(self.timing.poll(cx, included));
|
||||
let data = match state {
|
||||
CreateProposalState::Pending(mut data) => {
|
||||
let included = data.table.includable_count();
|
||||
match data.timing.poll(cx, included) {
|
||||
Poll::Pending => {
|
||||
self.state = CreateProposalState::Pending(data);
|
||||
return Poll::Pending
|
||||
},
|
||||
Poll::Ready(()) => (),
|
||||
}
|
||||
|
||||
data
|
||||
},
|
||||
CreateProposalState::Switching => return Poll::Pending,
|
||||
CreateProposalState::Fired(mut future) => {
|
||||
let ret = Pin::new(&mut future).poll(cx);
|
||||
self.state = CreateProposalState::Fired(future);
|
||||
return ret
|
||||
},
|
||||
};
|
||||
|
||||
// 2. propose
|
||||
let proposed_candidates = self.table.proposed_set();
|
||||
let future = tokio_executor::blocking::run(move || {
|
||||
let proposed_candidates = data.table.proposed_set();
|
||||
data.propose_with(proposed_candidates)
|
||||
});
|
||||
self.state = CreateProposalState::Fired(future);
|
||||
|
||||
Poll::Ready(self.propose_with(proposed_candidates))
|
||||
match &mut self.state {
|
||||
CreateProposalState::Fired(future) => Pin::new(future).poll(cx),
|
||||
CreateProposalState::Switching | CreateProposalState::Pending(_) =>
|
||||
Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user