diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 1010698737..0fe100e342 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -3818,6 +3818,7 @@ dependencies = [ "substrate-transaction-pool 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "substrate-trie 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -6031,6 +6032,8 @@ version = "0.2.0-alpha.6" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-sync 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/polkadot/validation/Cargo.toml b/polkadot/validation/Cargo.toml index f82ac57bac..b22e37eb95 100644 --- a/polkadot/validation/Cargo.toml +++ b/polkadot/validation/Cargo.toml @@ -14,6 +14,7 @@ tokio = "0.1.22" derive_more = "0.14.1" log = "0.4.8" exit-future = "0.1.4" +tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] } codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] } availability_store = { package = "polkadot-availability-store", path = "../availability-store" } parachain = { package = "polkadot-parachain", path = "../parachain" } diff --git a/polkadot/validation/src/lib.rs b/polkadot/validation/src/lib.rs index e114a294bb..f77aa79f3b 100644 --- a/polkadot/validation/src/lib.rs +++ b/polkadot/validation/src/lib.rs @@ -34,7 +34,8 @@ use std::{ pin::Pin, sync::Arc, time::{self, Duration, Instant}, - task::{Poll, Context} + task::{Poll, Context}, + mem, }; use babe_primitives::BabeApi; @@ -498,7 +499,7 @@ impl ProposerFactory where impl consensus::Environment for ProposerFactory where C: Collators + Send + 'static, N: Network, - TxApi: PoolChainApi, + TxApi: PoolChainApi + 'static, P: ProvideRuntimeApi + HeaderBackend + BlockBody + Send + Sync + 'static, P::Api: ParachainHost + BlockBuilderApi + @@ -557,8 +558,8 @@ pub struct Proposer where } impl consensus::Proposer for Proposer where - TxApi: PoolChainApi, - C: ProvideRuntimeApi + HeaderBackend + Send + Sync, + TxApi: PoolChainApi + 'static, + C: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, C::Api: ParachainHost + BlockBuilderApi + ApiExt, { type Error = Error; @@ -616,18 +617,20 @@ impl consensus::Proposer for Proposer where }; Either::Left(CreateProposal { - parent_hash: self.parent_hash.clone(), - parent_number: self.parent_number.clone(), - parent_id: self.parent_id.clone(), - client: self.client.clone(), - transaction_pool: self.transaction_pool.clone(), - table: self.tracker.table.clone(), - believed_minimum_timestamp: believed_timestamp, - timing, - inherent_data: Some(inherent_data), - inherent_digests, - // leave some time for the proposal finalisation - deadline, + state: CreateProposalState::Pending(CreateProposalData { + parent_hash: self.parent_hash.clone(), + parent_number: self.parent_number.clone(), + parent_id: self.parent_id.clone(), + client: self.client.clone(), + transaction_pool: self.transaction_pool.clone(), + table: self.tracker.table.clone(), + believed_minimum_timestamp: believed_timestamp, + timing, + inherent_data: Some(inherent_data), + inherent_digests, + // leave some time for the proposal finalisation + deadline, + }) }) } } @@ -686,6 +689,21 @@ impl ProposalTiming { /// Future which resolves upon the creation of a proposal. pub struct CreateProposal { + state: CreateProposalState, +} + +/// Current status of the proposal future. +enum CreateProposalState { + /// Pending inclusion, with given proposal data. + Pending(CreateProposalData), + /// Represents the state when we switch from pending to fired. + Switching, + /// Block proposing has fired. + Fired(tokio_executor::blocking::Blocking>), +} + +/// Inner data of the create proposal. +struct CreateProposalData { parent_hash: Hash, parent_number: BlockNumber, parent_id: BlockId, @@ -699,12 +717,12 @@ pub struct CreateProposal { deadline: Instant, } -impl CreateProposal where +impl CreateProposalData where TxApi: PoolChainApi, C: ProvideRuntimeApi + HeaderBackend + Send + Sync, C::Api: ParachainHost + BlockBuilderApi + ApiExt, { - fn propose_with(&mut self, candidates: Vec) -> Result { + fn propose_with(mut self, candidates: Vec) -> Result { use block_builder::BlockBuilder; use runtime_primitives::traits::{Hash as HashT, BlakeTwo256}; @@ -792,22 +810,51 @@ impl CreateProposal where } impl futures03::Future for CreateProposal where - TxApi: PoolChainApi, - C: ProvideRuntimeApi + HeaderBackend + Send + Sync, + TxApi: PoolChainApi + 'static, + C: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, C::Api: ParachainHost + BlockBuilderApi + ApiExt, { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut state = CreateProposalState::Switching; + mem::swap(&mut state, &mut self.state); + // 1. try to propose if we have enough includable candidates and other // delays have concluded. - let included = self.table.includable_count(); - futures03::ready!(self.timing.poll(cx, included)); + let data = match state { + CreateProposalState::Pending(mut data) => { + let included = data.table.includable_count(); + match data.timing.poll(cx, included) { + Poll::Pending => { + self.state = CreateProposalState::Pending(data); + return Poll::Pending + }, + Poll::Ready(()) => (), + } + + data + }, + CreateProposalState::Switching => return Poll::Pending, + CreateProposalState::Fired(mut future) => { + let ret = Pin::new(&mut future).poll(cx); + self.state = CreateProposalState::Fired(future); + return ret + }, + }; // 2. propose - let proposed_candidates = self.table.proposed_set(); + let future = tokio_executor::blocking::run(move || { + let proposed_candidates = data.table.proposed_set(); + data.propose_with(proposed_candidates) + }); + self.state = CreateProposalState::Fired(future); - Poll::Ready(self.propose_with(proposed_candidates)) + match &mut self.state { + CreateProposalState::Fired(future) => Pin::new(future).poll(cx), + CreateProposalState::Switching | CreateProposalState::Pending(_) => + Poll::Pending, + } } }