Use Substrate Block Proposer (#1156)

* Use Substrate block builder.

* Clean up metrics.

* Lock.

* Lock.

* Switch to newest basic authorship interface

* Update Substrate reference and polkadot spec_version

* Let's improve

Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Tomasz Drwięga
2020-05-30 11:06:08 +02:00
committed by GitHub
parent a3e4893a7c
commit 44c0ec5cf8
5 changed files with 246 additions and 334 deletions
+208 -141
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -88,7 +88,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
spec_name: create_runtime_str!("polkadot"), spec_name: create_runtime_str!("polkadot"),
impl_name: create_runtime_str!("parity-polkadot"), impl_name: create_runtime_str!("parity-polkadot"),
authoring_version: 0, authoring_version: 0,
spec_version: 1, spec_version: 2,
impl_version: 0, impl_version: 0,
apis: RUNTIME_API_VERSIONS, apis: RUNTIME_API_VERSIONS,
transaction_version: 0, transaction_version: 0,
-3
View File
@@ -293,8 +293,6 @@ macro_rules! new_full {
let (builder, mut import_setup, inherent_data_providers, mut rpc_setup) = let (builder, mut import_setup, inherent_data_providers, mut rpc_setup) =
new_full_start!($config, $runtime, $dispatch); new_full_start!($config, $runtime, $dispatch);
let backend = builder.backend().clone();
let service = builder let service = builder
.with_finality_proof_provider(|client, backend| { .with_finality_proof_provider(|client, backend| {
let provider = client as Arc<dyn grandpa::StorageAndProofProvider<_, _>>; let provider = client as Arc<dyn grandpa::StorageAndProofProvider<_, _>>;
@@ -403,7 +401,6 @@ macro_rules! new_full {
service.transaction_pool(), service.transaction_pool(),
validation_service_handle, validation_service_handle,
slot_duration, slot_duration,
backend,
service.prometheus_registry().as_ref(), service.prometheus_registry().as_ref(),
); );
+1 -1
View File
@@ -34,8 +34,8 @@ bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
runtime_babe = { package = "pallet-babe", git = "https://github.com/paritytech/substrate", branch = "master" } runtime_babe = { package = "pallet-babe", git = "https://github.com/paritytech/substrate", branch = "master" }
babe-primitives = { package = "sp-consensus-babe", git = "https://github.com/paritytech/substrate", branch = "master" } babe-primitives = { package = "sp-consensus-babe", git = "https://github.com/paritytech/substrate", branch = "master" }
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" } keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
sc-proposer-metrics = { git = "https://github.com/paritytech/substrate", branch = "master" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https://github.com/paritytech/substrate", branch = "master" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https://github.com/paritytech/substrate", branch = "master" }
sc-basic-authorship = { git = "https://github.com/paritytech/substrate", branch = "master" }
[dev-dependencies] [dev-dependencies]
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
+36 -188
View File
@@ -26,24 +26,21 @@ use std::{
}; };
use sp_blockchain::HeaderBackend; use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi; use block_builder::{BlockBuilderApi, BlockBuilderProvider};
use codec::Encode;
use consensus::{Proposal, RecordProof}; use consensus::{Proposal, RecordProof};
use polkadot_primitives::{Hash, Block, BlockId, Header}; use polkadot_primitives::{Block, Header};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
ParachainHost, AttestedCandidate, NEW_HEADS_IDENTIFIER, ParachainHost, NEW_HEADS_IDENTIFIER,
}; };
use runtime_primitives::traits::{DigestFor, HashFor}; use runtime_primitives::traits::{DigestFor, HashFor};
use futures_timer::Delay; use futures_timer::Delay;
use txpool_api::{TransactionPool, InPoolTransaction}; use txpool_api::TransactionPool;
use futures::prelude::*; use futures::prelude::*;
use inherents::InherentData; use inherents::InherentData;
use sp_timestamp::TimestampInherentData; use sp_timestamp::TimestampInherentData;
use log::{info, debug, warn, trace};
use sp_api::{ApiExt, ProvideRuntimeApi}; use sp_api::{ApiExt, ProvideRuntimeApi};
use prometheus_endpoint::Registry as PrometheusRegistry; use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_proposer_metrics::MetricsLink as PrometheusMetrics;
use crate::{ use crate::{
Error, Error,
@@ -51,17 +48,11 @@ use crate::{
validation_service::ServiceHandle, validation_service::ServiceHandle,
}; };
// block size limit.
pub(crate) const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
// Polkadot proposer factory. // Polkadot proposer factory.
pub struct ProposerFactory<Client, TxPool, Backend> { pub struct ProposerFactory<Client, TxPool, Backend> {
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
service_handle: ServiceHandle, service_handle: ServiceHandle,
babe_slot_duration: u64, babe_slot_duration: u64,
backend: Arc<Backend>, factory: sc_basic_authorship::ProposerFactory<TxPool, Backend, Client>,
metrics: PrometheusMetrics,
} }
impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> { impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> {
@@ -71,16 +62,17 @@ impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> {
transaction_pool: Arc<TxPool>, transaction_pool: Arc<TxPool>,
service_handle: ServiceHandle, service_handle: ServiceHandle,
babe_slot_duration: u64, babe_slot_duration: u64,
backend: Arc<Backend>,
prometheus: Option<&PrometheusRegistry>, prometheus: Option<&PrometheusRegistry>,
) -> Self { ) -> Self {
ProposerFactory { let factory = sc_basic_authorship::ProposerFactory::new(
client, client,
transaction_pool, transaction_pool,
service_handle: service_handle, prometheus,
);
ProposerFactory {
service_handle,
babe_slot_duration, babe_slot_duration,
backend, factory,
metrics: PrometheusMetrics::new(prometheus),
} }
} }
} }
@@ -89,7 +81,7 @@ impl<Client, TxPool, Backend> consensus::Environment<Block>
for ProposerFactory<Client, TxPool, Backend> for ProposerFactory<Client, TxPool, Backend>
where where
TxPool: TransactionPool<Block=Block> + 'static, TxPool: TransactionPool<Block=Block> + 'static,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static, Client: BlockBuilderProvider<Backend, Block, Client> + ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> Client::Api: ParachainHost<Block> + BlockBuilderApi<Block>
+ ApiExt<Block, Error = sp_blockchain::Error>, + ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend< Backend: sc_client_api::Backend<
@@ -110,45 +102,35 @@ where
parent_header: &Header, parent_header: &Header,
) -> Self::CreateProposer { ) -> Self::CreateProposer {
let parent_hash = parent_header.hash(); let parent_hash = parent_header.hash();
let parent_id = BlockId::hash(parent_hash);
let client = self.client.clone();
let transaction_pool = self.transaction_pool.clone();
let backend = self.backend.clone();
let slot_duration = self.babe_slot_duration.clone(); let slot_duration = self.babe_slot_duration.clone();
let metrics = self.metrics.clone(); let proposer = self.factory.init(parent_header).into_inner();
let maybe_proposer = self.service_handle let maybe_proposer = self.service_handle
.clone() .clone()
.get_validation_instance(parent_hash) .get_validation_instance(parent_hash)
.and_then(move |tracker| future::ready(Ok(Proposer { .and_then(move |tracker| future::ready(proposer
client, .map_err(Into::into)
tracker, .map(|proposer| Proposer {
parent_id, tracker,
transaction_pool, slot_duration,
slot_duration, proposer,
backend, })
metrics, ));
})));
Box::pin(maybe_proposer) Box::pin(maybe_proposer)
} }
} }
/// The Polkadot proposer logic. /// The Polkadot proposer logic.
pub struct Proposer<Client, TxPool, Backend> { pub struct Proposer<Client, TxPool: TransactionPool<Block=Block>, Backend> {
client: Arc<Client>,
parent_id: BlockId,
tracker: crate::validation_service::ValidationInstanceHandle, tracker: crate::validation_service::ValidationInstanceHandle,
transaction_pool: Arc<TxPool>,
slot_duration: u64, slot_duration: u64,
backend: Arc<Backend>, proposer: sc_basic_authorship::Proposer<Backend, Block, Client, TxPool>,
metrics: PrometheusMetrics,
} }
impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where
TxPool: TransactionPool<Block=Block> + 'static, TxPool: TransactionPool<Block=Block> + 'static,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static, Client: BlockBuilderProvider<Backend, Block, Client> + ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>, Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static, Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159 // Rust bug: https://github.com/rust-lang/rust/issues/24159
@@ -163,7 +145,8 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
> >
>; >;
fn propose(&mut self, fn propose(
self,
inherent_data: InherentData, inherent_data: InherentData,
inherent_digests: DigestFor<Block>, inherent_digests: DigestFor<Block>,
max_duration: Duration, max_duration: Duration,
@@ -180,15 +163,7 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR), Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
); );
let parent_id = self.parent_id.clone();
let client = self.client.clone();
let transaction_pool = self.transaction_pool.clone();
let table = self.tracker.table().clone();
let backend = self.backend.clone();
let metrics = self.metrics.clone();
async move { async move {
let block_timer = metrics.report(|metrics| metrics.block_constructed.start_timer());
let enough_candidates = dynamic_inclusion.acceptable_in( let enough_candidates = dynamic_inclusion.acceptable_in(
now, now,
initial_included, initial_included,
@@ -200,22 +175,6 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
}; };
let deadline_diff = max_duration - max_duration / 3; let deadline_diff = max_duration - max_duration / 3;
let deadline = match Instant::now().checked_add(deadline_diff) {
None => return Err(Error::DeadlineComputeFailure(deadline_diff)),
Some(d) => d,
};
let data = CreateProposalData {
parent_id,
client,
transaction_pool,
inherent_data: Some(inherent_data),
inherent_digests,
// leave some time for the proposal finalisation
deadline,
record_proof,
backend,
};
// set up delay until next allowed timestamp. // set up delay until next allowed timestamp.
let current_timestamp = current_timestamp(); let current_timestamp = current_timestamp();
@@ -226,18 +185,18 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
Delay::new(enough_candidates).await; Delay::new(enough_candidates).await;
let result = tokio::task::spawn_blocking( let proposed_candidates = self.tracker.table().proposed_set();
move || {
let proposed_candidates = table.proposed_set();
data.propose_with(proposed_candidates)
}
).await?;
drop(block_timer); let mut inherent_data = inherent_data;
let transactions = result.as_ref().map(|proposal| proposal.block.extrinsics.len()).unwrap_or_default(); inherent_data.put_data(NEW_HEADS_IDENTIFIER, &proposed_candidates)
metrics.report(|metrics| metrics.number_of_transactions.set(transactions as u64)); .map_err(Error::InherentError)?;
result self.proposer.propose(
inherent_data,
inherent_digests.clone(),
deadline_diff,
record_proof
).await.map_err(Into::into)
}.boxed() }.boxed()
} }
} }
@@ -247,114 +206,3 @@ fn current_timestamp() -> u64 {
.expect("now always later than unix epoch; qed") .expect("now always later than unix epoch; qed")
.as_millis() as u64 .as_millis() as u64
} }
/// Inner data of the create proposal.
struct CreateProposalData<Client, TxPool, Backend> {
parent_id: BlockId,
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
inherent_data: Option<InherentData>,
inherent_digests: DigestFor<Block>,
deadline: Instant,
record_proof: RecordProof,
backend: Arc<Backend>,
}
impl<Client, TxPool, Backend> CreateProposalData<Client, TxPool, Backend> where
TxPool: TransactionPool<Block=Block>,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HashFor<Block>> + Send,
{
fn propose_with(
mut self,
candidates: Vec<AttestedCandidate>,
) -> Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error> {
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
const MAX_TRANSACTIONS: usize = 1200;
let mut inherent_data = self.inherent_data
.take()
.expect("CreateProposal is not polled after finishing; qed");
inherent_data.put_data(NEW_HEADS_IDENTIFIER, &candidates)
.map_err(Error::InherentError)?;
let runtime_api = self.client.runtime_api();
let mut block_builder = block_builder::BlockBuilder::new(
&*self.client,
self.client.expect_block_hash_from_id(&self.parent_id)?,
self.client.expect_block_number_from_id(&self.parent_id)?,
self.record_proof,
self.inherent_digests.clone(),
&*self.backend,
)?;
{
let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?;
for inherent in inherents {
match block_builder.push(inherent) {
Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
if e.exhausted_resources() => {
warn!("⚠️ Dropping non-mandatory inherent from overweight block.");
}
Err(e) => {
warn!("❗️ Inherent extrinsic returned unexpected error: {}. Dropping.", e);
}
Ok(_) => {}
}
}
let mut unqueue_invalid = Vec::new();
let mut pending_size = 0;
let ready_iter = self.transaction_pool.ready();
for ready in ready_iter.take(MAX_TRANSACTIONS) {
let encoded_size = ready.data().encode().len();
if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE {
break;
}
if Instant::now() > self.deadline {
debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
break;
}
match block_builder.push(ready.data().clone()) {
Ok(()) => {
debug!("[{:?}] Pushed to the block.", ready.hash());
pending_size += encoded_size;
}
Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
if e.exhausted_resources() =>
{
debug!("Block is full, proceed with proposing.");
break;
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(ready.hash().clone());
}
}
}
self.transaction_pool.remove_invalid(&unqueue_invalid);
}
let (new_block, storage_changes, proof) = block_builder.build()?.into_inner();
info!("🎁 Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]",
new_block.header.number,
Hash::from(new_block.header.hash()),
new_block.header.parent_hash,
new_block.extrinsics.iter()
.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
.collect::<Vec<_>>()
.join(", ")
);
Ok(Proposal { block: new_block, storage_changes, proof })
}
}