mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-19 14:41:02 +00:00
Validation service refactoring (#773)
* add some more docs about statement import * instantiate environment async * move attestation service into subfolder * refactor validation service architecture somewhat * remove dependence on validation service in proposer * fix a bunch of warnings * improve docs * introduce a builder for the validation service * extract block production to its own file * integrate new API into service * address review grumbles
This commit is contained in:
committed by
GitHub
parent
fb30862d23
commit
3e17fcfb3d
@@ -404,21 +404,28 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration)
|
|||||||
service.client(),
|
service.client(),
|
||||||
WrappedExecutor(service.spawn_task_handle()),
|
WrappedExecutor(service.spawn_task_handle()),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let (validation_service_handle, validation_service) = consensus::ServiceBuilder {
|
||||||
|
client: client.clone(),
|
||||||
|
network: validation_network.clone(),
|
||||||
|
collators: validation_network,
|
||||||
|
task_executor: Arc::new(WrappedExecutor(service.spawn_task_handle())),
|
||||||
|
availability_store: availability_store.clone(),
|
||||||
|
select_chain: select_chain.clone(),
|
||||||
|
keystore: service.keystore(),
|
||||||
|
max_block_data_size,
|
||||||
|
}.build();
|
||||||
|
|
||||||
|
service.spawn_essential_task(Box::pin(validation_service));
|
||||||
|
|
||||||
let proposer = consensus::ProposerFactory::new(
|
let proposer = consensus::ProposerFactory::new(
|
||||||
client.clone(),
|
client.clone(),
|
||||||
select_chain.clone(),
|
|
||||||
validation_network.clone(),
|
|
||||||
validation_network,
|
|
||||||
service.transaction_pool(),
|
service.transaction_pool(),
|
||||||
Arc::new(WrappedExecutor(service.spawn_task_handle())),
|
validation_service_handle,
|
||||||
service.keystore(),
|
|
||||||
availability_store.clone(),
|
|
||||||
slot_duration,
|
slot_duration,
|
||||||
max_block_data_size,
|
|
||||||
backend,
|
backend,
|
||||||
);
|
);
|
||||||
|
|
||||||
let client = service.client();
|
|
||||||
let select_chain = service.select_chain().ok_or(ServiceError::SelectChainRequired)?;
|
let select_chain = service.select_chain().ok_or(ServiceError::SelectChainRequired)?;
|
||||||
let can_author_with =
|
let can_author_with =
|
||||||
consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone());
|
consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone());
|
||||||
|
|||||||
@@ -354,6 +354,9 @@ impl<C: Context> Table<C> {
|
|||||||
/// Import a signed statement. Signatures should be checked for validity, and the
|
/// Import a signed statement. Signatures should be checked for validity, and the
|
||||||
/// sender should be checked to actually be an authority.
|
/// sender should be checked to actually be an authority.
|
||||||
///
|
///
|
||||||
|
/// Validity and invalidity statements are only valid if the corresponding
|
||||||
|
/// candidate has already been imported.
|
||||||
|
///
|
||||||
/// If this returns `None`, the statement was either duplicate or invalid.
|
/// If this returns `None`, the statement was either duplicate or invalid.
|
||||||
pub fn import_statement(
|
pub fn import_statement(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
|||||||
@@ -1,154 +0,0 @@
|
|||||||
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
|
|
||||||
// This file is part of Polkadot.
|
|
||||||
|
|
||||||
// Polkadot is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// Polkadot is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
//! Attestation service.
|
|
||||||
|
|
||||||
/// Attestation service. A long running service that creates and manages parachain attestation
|
|
||||||
/// instances.
|
|
||||||
///
|
|
||||||
/// This uses a handle to an underlying thread pool to dispatch heavy work
|
|
||||||
/// such as candidate verification while performing event-driven work
|
|
||||||
/// on a local event loop.
|
|
||||||
|
|
||||||
use std::{thread, time::Duration, sync::Arc};
|
|
||||||
|
|
||||||
use sc_client_api::{BlockchainEvents, BlockBody};
|
|
||||||
use sp_blockchain::HeaderBackend;
|
|
||||||
use block_builder::BlockBuilderApi;
|
|
||||||
use consensus::SelectChain;
|
|
||||||
use futures::prelude::*;
|
|
||||||
use futures::{future::{ready, select}, task::{Spawn, SpawnExt}};
|
|
||||||
use polkadot_primitives::Block;
|
|
||||||
use polkadot_primitives::parachain::ParachainHost;
|
|
||||||
use babe_primitives::BabeApi;
|
|
||||||
use keystore::KeyStorePtr;
|
|
||||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
|
||||||
use runtime_primitives::traits::HasherFor;
|
|
||||||
|
|
||||||
use tokio::{runtime::Runtime as LocalRuntime};
|
|
||||||
use log::{warn, error};
|
|
||||||
|
|
||||||
use super::{Network, Collators};
|
|
||||||
|
|
||||||
type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
|
|
||||||
|
|
||||||
/// Parachain candidate attestation service handle.
|
|
||||||
pub(crate) struct ServiceHandle {
|
|
||||||
thread: Option<thread::JoinHandle<()>>,
|
|
||||||
exit_signal: Option<::exit_future::Signal>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create and start a new instance of the attestation service.
|
|
||||||
pub(crate) fn start<C, N, P, SC>(
|
|
||||||
client: Arc<P>,
|
|
||||||
select_chain: SC,
|
|
||||||
parachain_validation: Arc<crate::ParachainValidation<C, N, P>>,
|
|
||||||
thread_pool: TaskExecutor,
|
|
||||||
keystore: KeyStorePtr,
|
|
||||||
max_block_data_size: Option<u64>,
|
|
||||||
) -> ServiceHandle
|
|
||||||
where
|
|
||||||
C: Collators + Send + Sync + Unpin + 'static,
|
|
||||||
C::Collation: Send + Unpin + 'static,
|
|
||||||
P: BlockchainEvents<Block> + BlockBody<Block>,
|
|
||||||
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
|
|
||||||
P::Api: ParachainHost<Block> +
|
|
||||||
BlockBuilderApi<Block> +
|
|
||||||
BabeApi<Block> +
|
|
||||||
ApiExt<Block, Error = sp_blockchain::Error>,
|
|
||||||
N: Network + Send + Sync + 'static,
|
|
||||||
N::TableRouter: Send + 'static,
|
|
||||||
N::BuildTableRouter: Send + Unpin + 'static,
|
|
||||||
SC: SelectChain<Block> + 'static,
|
|
||||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
|
||||||
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
|
|
||||||
{
|
|
||||||
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
|
|
||||||
|
|
||||||
let (signal, exit) = ::exit_future::signal();
|
|
||||||
let thread = thread::spawn(move || {
|
|
||||||
let mut runtime = LocalRuntime::new().expect("Could not create local runtime");
|
|
||||||
let notifications = {
|
|
||||||
let client = client.clone();
|
|
||||||
let validation = parachain_validation.clone();
|
|
||||||
|
|
||||||
let keystore = keystore.clone();
|
|
||||||
|
|
||||||
let notifications = client.import_notification_stream()
|
|
||||||
.for_each(move |notification| {
|
|
||||||
let parent_hash = notification.hash;
|
|
||||||
if notification.is_new_best {
|
|
||||||
let res = validation.get_or_instantiate(
|
|
||||||
parent_hash,
|
|
||||||
&keystore,
|
|
||||||
max_block_data_size,
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Err(e) = res {
|
|
||||||
warn!(
|
|
||||||
"Unable to start parachain validation on top of {:?}: {}",
|
|
||||||
parent_hash, e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ready(())
|
|
||||||
});
|
|
||||||
|
|
||||||
select(notifications, exit.clone())
|
|
||||||
};
|
|
||||||
|
|
||||||
let prune_old_sessions = {
|
|
||||||
let select_chain = select_chain.clone();
|
|
||||||
let interval = crate::interval(TIMER_INTERVAL)
|
|
||||||
.for_each(move |_| match select_chain.leaves() {
|
|
||||||
Ok(leaves) => {
|
|
||||||
parachain_validation.retain(|h| leaves.contains(h));
|
|
||||||
ready(())
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error fetching leaves from client: {:?}", e);
|
|
||||||
ready(())
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
select(interval, exit.clone()).map(|_| ())
|
|
||||||
};
|
|
||||||
|
|
||||||
runtime.spawn(notifications);
|
|
||||||
if let Err(_) = thread_pool.spawn(prune_old_sessions) {
|
|
||||||
error!("Failed to spawn old sessions pruning task");
|
|
||||||
}
|
|
||||||
|
|
||||||
runtime.block_on(exit);
|
|
||||||
});
|
|
||||||
|
|
||||||
ServiceHandle {
|
|
||||||
thread: Some(thread),
|
|
||||||
exit_signal: Some(signal),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for ServiceHandle {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if let Some(signal) = self.exit_signal.take() {
|
|
||||||
let _ = signal.fire();
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(thread) = self.thread.take() {
|
|
||||||
thread.join().expect("The service thread has panicked");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,354 @@
|
|||||||
|
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Polkadot.
|
||||||
|
|
||||||
|
// Polkadot is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Polkadot is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! The block production pipeline of Polkadot.
|
||||||
|
//!
|
||||||
|
//! The `ProposerFactory` exported by this module will be wrapped by some
|
||||||
|
//! consensus engine, and triggered when it is time to create a block.
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
pin::Pin,
|
||||||
|
sync::Arc,
|
||||||
|
time::{self, Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use sp_blockchain::HeaderBackend;
|
||||||
|
use block_builder::BlockBuilderApi;
|
||||||
|
use codec::Encode;
|
||||||
|
use consensus::{Proposal, RecordProof};
|
||||||
|
use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header};
|
||||||
|
use polkadot_primitives::parachain::{
|
||||||
|
ParachainHost, AttestedCandidate, NEW_HEADS_IDENTIFIER,
|
||||||
|
};
|
||||||
|
use runtime_primitives::traits::{DigestFor, HasherFor};
|
||||||
|
use futures_timer::Delay;
|
||||||
|
use txpool_api::{TransactionPool, InPoolTransaction};
|
||||||
|
|
||||||
|
use futures::prelude::*;
|
||||||
|
use inherents::InherentData;
|
||||||
|
use sp_timestamp::TimestampInherentData;
|
||||||
|
use log::{info, debug, trace};
|
||||||
|
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||||
|
|
||||||
|
use crate::validation_service::ServiceHandle;
|
||||||
|
use crate::dynamic_inclusion::DynamicInclusion;
|
||||||
|
use crate::Error;
|
||||||
|
|
||||||
|
// Polkadot proposer factory.
|
||||||
|
pub struct ProposerFactory<Client, TxPool, Backend> {
|
||||||
|
client: Arc<Client>,
|
||||||
|
transaction_pool: Arc<TxPool>,
|
||||||
|
service_handle: ServiceHandle,
|
||||||
|
babe_slot_duration: u64,
|
||||||
|
backend: Arc<Backend>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> {
|
||||||
|
/// Create a new proposer factory.
|
||||||
|
pub fn new(
|
||||||
|
client: Arc<Client>,
|
||||||
|
transaction_pool: Arc<TxPool>,
|
||||||
|
service_handle: ServiceHandle,
|
||||||
|
babe_slot_duration: u64,
|
||||||
|
backend: Arc<Backend>,
|
||||||
|
) -> Self {
|
||||||
|
ProposerFactory {
|
||||||
|
client,
|
||||||
|
transaction_pool,
|
||||||
|
service_handle: service_handle,
|
||||||
|
babe_slot_duration,
|
||||||
|
backend,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Client, TxPool, Backend> consensus::Environment<Block>
|
||||||
|
for ProposerFactory<Client, TxPool, Backend>
|
||||||
|
where
|
||||||
|
TxPool: TransactionPool<Block=Block> + 'static,
|
||||||
|
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
|
||||||
|
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block>
|
||||||
|
+ ApiExt<Block, Error = sp_blockchain::Error>,
|
||||||
|
Backend: sc_client_api::Backend<
|
||||||
|
Block,
|
||||||
|
State = sp_api::StateBackendFor<Client, Block>
|
||||||
|
> + 'static,
|
||||||
|
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||||
|
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
|
||||||
|
{
|
||||||
|
type CreateProposer = Pin<Box<
|
||||||
|
dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + 'static
|
||||||
|
>>;
|
||||||
|
type Proposer = Proposer<Client, TxPool, Backend>;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn init(
|
||||||
|
&mut self,
|
||||||
|
parent_header: &Header,
|
||||||
|
) -> Self::CreateProposer {
|
||||||
|
let parent_hash = parent_header.hash();
|
||||||
|
let parent_number = parent_header.number;
|
||||||
|
let parent_id = BlockId::hash(parent_hash);
|
||||||
|
|
||||||
|
let client = self.client.clone();
|
||||||
|
let transaction_pool = self.transaction_pool.clone();
|
||||||
|
let backend = self.backend.clone();
|
||||||
|
let slot_duration = self.babe_slot_duration.clone();
|
||||||
|
|
||||||
|
let maybe_proposer = self.service_handle
|
||||||
|
.clone()
|
||||||
|
.get_validation_instance(parent_hash)
|
||||||
|
.and_then(move |tracker| future::ready(Ok(Proposer {
|
||||||
|
client,
|
||||||
|
tracker,
|
||||||
|
parent_hash,
|
||||||
|
parent_id,
|
||||||
|
parent_number,
|
||||||
|
transaction_pool,
|
||||||
|
slot_duration,
|
||||||
|
backend,
|
||||||
|
})));
|
||||||
|
|
||||||
|
Box::pin(maybe_proposer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The Polkadot proposer logic.
|
||||||
|
pub struct Proposer<Client, TxPool, Backend> {
|
||||||
|
client: Arc<Client>,
|
||||||
|
parent_hash: Hash,
|
||||||
|
parent_id: BlockId,
|
||||||
|
parent_number: BlockNumber,
|
||||||
|
tracker: Arc<crate::validation_service::ValidationInstanceHandle>,
|
||||||
|
transaction_pool: Arc<TxPool>,
|
||||||
|
slot_duration: u64,
|
||||||
|
backend: Arc<Backend>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where
|
||||||
|
TxPool: TransactionPool<Block=Block> + 'static,
|
||||||
|
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
|
||||||
|
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
||||||
|
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
|
||||||
|
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||||
|
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
|
||||||
|
{
|
||||||
|
type Error = Error;
|
||||||
|
type Transaction = sp_api::TransactionFor<Client, Block>;
|
||||||
|
type Proposal = Pin<
|
||||||
|
Box<
|
||||||
|
dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>>
|
||||||
|
+ Send
|
||||||
|
>
|
||||||
|
>;
|
||||||
|
|
||||||
|
fn propose(&mut self,
|
||||||
|
inherent_data: InherentData,
|
||||||
|
inherent_digests: DigestFor<Block>,
|
||||||
|
max_duration: Duration,
|
||||||
|
record_proof: RecordProof,
|
||||||
|
) -> Self::Proposal {
|
||||||
|
const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates.
|
||||||
|
|
||||||
|
let initial_included = self.tracker.table().includable_count();
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
let dynamic_inclusion = DynamicInclusion::new(
|
||||||
|
self.tracker.table().num_parachains(),
|
||||||
|
self.tracker.started(),
|
||||||
|
Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
|
||||||
|
);
|
||||||
|
|
||||||
|
let parent_hash = self.parent_hash.clone();
|
||||||
|
let parent_number = self.parent_number.clone();
|
||||||
|
let parent_id = self.parent_id.clone();
|
||||||
|
let client = self.client.clone();
|
||||||
|
let transaction_pool = self.transaction_pool.clone();
|
||||||
|
let table = self.tracker.table().clone();
|
||||||
|
let backend = self.backend.clone();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let enough_candidates = dynamic_inclusion.acceptable_in(
|
||||||
|
now,
|
||||||
|
initial_included,
|
||||||
|
).unwrap_or_else(|| Duration::from_millis(1));
|
||||||
|
|
||||||
|
let believed_timestamp = match inherent_data.timestamp_inherent_data() {
|
||||||
|
Ok(timestamp) => timestamp,
|
||||||
|
Err(e) => return Err(Error::InherentError(e)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let deadline_diff = max_duration - max_duration / 3;
|
||||||
|
let deadline = match Instant::now().checked_add(deadline_diff) {
|
||||||
|
None => return Err(Error::DeadlineComputeFailure(deadline_diff)),
|
||||||
|
Some(d) => d,
|
||||||
|
};
|
||||||
|
|
||||||
|
let data = CreateProposalData {
|
||||||
|
parent_hash,
|
||||||
|
parent_number,
|
||||||
|
parent_id,
|
||||||
|
client,
|
||||||
|
transaction_pool,
|
||||||
|
table,
|
||||||
|
believed_minimum_timestamp: believed_timestamp,
|
||||||
|
inherent_data: Some(inherent_data),
|
||||||
|
inherent_digests,
|
||||||
|
// leave some time for the proposal finalisation
|
||||||
|
deadline,
|
||||||
|
record_proof,
|
||||||
|
backend,
|
||||||
|
};
|
||||||
|
|
||||||
|
// set up delay until next allowed timestamp.
|
||||||
|
let current_timestamp = current_timestamp();
|
||||||
|
if current_timestamp < believed_timestamp {
|
||||||
|
Delay::new(Duration::from_millis(current_timestamp - believed_timestamp))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Delay::new(enough_candidates).await;
|
||||||
|
|
||||||
|
tokio_executor::blocking::run(move || {
|
||||||
|
let proposed_candidates = data.table.proposed_set();
|
||||||
|
data.propose_with(proposed_candidates)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}.boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn current_timestamp() -> u64 {
|
||||||
|
time::SystemTime::now().duration_since(time::UNIX_EPOCH)
|
||||||
|
.expect("now always later than unix epoch; qed")
|
||||||
|
.as_millis() as u64
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inner data of the create proposal.
|
||||||
|
struct CreateProposalData<Client, TxPool, Backend> {
|
||||||
|
parent_hash: Hash,
|
||||||
|
parent_number: BlockNumber,
|
||||||
|
parent_id: BlockId,
|
||||||
|
client: Arc<Client>,
|
||||||
|
transaction_pool: Arc<TxPool>,
|
||||||
|
table: Arc<crate::SharedTable>,
|
||||||
|
believed_minimum_timestamp: u64,
|
||||||
|
inherent_data: Option<InherentData>,
|
||||||
|
inherent_digests: DigestFor<Block>,
|
||||||
|
deadline: Instant,
|
||||||
|
record_proof: RecordProof,
|
||||||
|
backend: Arc<Backend>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Client, TxPool, Backend> CreateProposalData<Client, TxPool, Backend> where
|
||||||
|
TxPool: TransactionPool<Block=Block>,
|
||||||
|
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
|
||||||
|
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
||||||
|
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
|
||||||
|
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||||
|
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
|
||||||
|
{
|
||||||
|
fn propose_with(
|
||||||
|
mut self,
|
||||||
|
candidates: Vec<AttestedCandidate>,
|
||||||
|
) -> Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error> {
|
||||||
|
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
|
||||||
|
|
||||||
|
const MAX_TRANSACTIONS: usize = 40;
|
||||||
|
|
||||||
|
let mut inherent_data = self.inherent_data
|
||||||
|
.take()
|
||||||
|
.expect("CreateProposal is not polled after finishing; qed");
|
||||||
|
inherent_data.put_data(NEW_HEADS_IDENTIFIER, &candidates)
|
||||||
|
.map_err(Error::InherentError)?;
|
||||||
|
|
||||||
|
let runtime_api = self.client.runtime_api();
|
||||||
|
|
||||||
|
let mut block_builder = block_builder::BlockBuilder::new(
|
||||||
|
&*self.client,
|
||||||
|
self.client.expect_block_hash_from_id(&self.parent_id)?,
|
||||||
|
self.client.expect_block_number_from_id(&self.parent_id)?,
|
||||||
|
self.record_proof,
|
||||||
|
self.inherent_digests.clone(),
|
||||||
|
&*self.backend,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
{
|
||||||
|
let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?;
|
||||||
|
for inherent in inherents {
|
||||||
|
block_builder.push(inherent)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut unqueue_invalid = Vec::new();
|
||||||
|
let mut pending_size = 0;
|
||||||
|
|
||||||
|
let ready_iter = self.transaction_pool.ready();
|
||||||
|
for ready in ready_iter.take(MAX_TRANSACTIONS) {
|
||||||
|
let encoded_size = ready.data().encode().len();
|
||||||
|
if pending_size + encoded_size >= crate::evaluation::MAX_TRANSACTIONS_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if Instant::now() > self.deadline {
|
||||||
|
debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
match block_builder.push(ready.data().clone()) {
|
||||||
|
Ok(()) => {
|
||||||
|
debug!("[{:?}] Pushed to the block.", ready.hash());
|
||||||
|
pending_size += encoded_size;
|
||||||
|
}
|
||||||
|
Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
|
||||||
|
if e.exhausted_resources() =>
|
||||||
|
{
|
||||||
|
debug!("Block is full, proceed with proposing.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
|
||||||
|
unqueue_invalid.push(ready.hash().clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.transaction_pool.remove_invalid(&unqueue_invalid);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (new_block, storage_changes, proof) = block_builder.build()?.into_inner();
|
||||||
|
|
||||||
|
info!("Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]",
|
||||||
|
new_block.header.number,
|
||||||
|
Hash::from(new_block.header.hash()),
|
||||||
|
new_block.header.parent_hash,
|
||||||
|
new_block.extrinsics.iter()
|
||||||
|
.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ")
|
||||||
|
);
|
||||||
|
|
||||||
|
// TODO: full re-evaluation (https://github.com/paritytech/polkadot/issues/216)
|
||||||
|
let active_parachains = runtime_api.active_parachains(&self.parent_id)?;
|
||||||
|
assert!(crate::evaluation::evaluate_initial(
|
||||||
|
&new_block,
|
||||||
|
self.believed_minimum_timestamp,
|
||||||
|
&self.parent_hash,
|
||||||
|
self.parent_number,
|
||||||
|
&active_parachains[..],
|
||||||
|
).is_ok());
|
||||||
|
|
||||||
|
Ok(Proposal { block: new_block, storage_changes, proof })
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -46,6 +46,8 @@ pub enum Error {
|
|||||||
Timer(std::io::Error),
|
Timer(std::io::Error),
|
||||||
#[display(fmt = "Failed to compute deadline of now + {:?}", _0)]
|
#[display(fmt = "Failed to compute deadline of now + {:?}", _0)]
|
||||||
DeadlineComputeFailure(std::time::Duration),
|
DeadlineComputeFailure(std::time::Duration),
|
||||||
|
#[display(fmt = "Validation service is down.")]
|
||||||
|
ValidationServiceDown,
|
||||||
Join(tokio::task::JoinError)
|
Join(tokio::task::JoinError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,8 @@
|
|||||||
|
|
||||||
//! Polkadot block evaluation and evaluation errors.
|
//! Polkadot block evaluation and evaluation errors.
|
||||||
|
|
||||||
use super::MAX_TRANSACTIONS_SIZE;
|
// block size limit.
|
||||||
|
pub(crate) const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
|
||||||
|
|
||||||
use codec::Encode;
|
use codec::Encode;
|
||||||
use polkadot_primitives::{Block, Hash, BlockNumber};
|
use polkadot_primitives::{Block, Hash, BlockNumber};
|
||||||
|
|||||||
+14
-625
@@ -31,50 +31,22 @@
|
|||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
pin::Pin,
|
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{self, Duration, Instant},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use babe_primitives::BabeApi;
|
|
||||||
use sc_client_api::{Backend, BlockchainEvents, BlockBody};
|
|
||||||
use sp_blockchain::HeaderBackend;
|
|
||||||
use block_builder::BlockBuilderApi;
|
|
||||||
use codec::Encode;
|
use codec::Encode;
|
||||||
use consensus::{SelectChain, Proposal, RecordProof};
|
use polkadot_primitives::Hash;
|
||||||
use availability_store::Store as AvailabilityStore;
|
|
||||||
use parking_lot::Mutex;
|
|
||||||
use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header};
|
|
||||||
use polkadot_primitives::parachain::{
|
use polkadot_primitives::parachain::{
|
||||||
Id as ParaId, Chain, DutyRoster, CandidateReceipt,
|
Id as ParaId, Chain, DutyRoster, CandidateReceipt,
|
||||||
ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message, OutgoingMessages,
|
Statement as PrimitiveStatement, Message, OutgoingMessages,
|
||||||
Collation, PoVBlock, ErasureChunk, ValidatorSignature, ValidatorIndex,
|
Collation, PoVBlock, ErasureChunk, ValidatorSignature, ValidatorIndex,
|
||||||
ValidatorPair, ValidatorId, NEW_HEADS_IDENTIFIER,
|
ValidatorPair, ValidatorId,
|
||||||
};
|
};
|
||||||
use primitives::Pair;
|
use primitives::Pair;
|
||||||
use runtime_primitives::traits::{DigestFor, HasherFor};
|
|
||||||
use futures_timer::Delay;
|
|
||||||
use txpool_api::{TransactionPool, InPoolTransaction};
|
|
||||||
|
|
||||||
use attestation_service::ServiceHandle;
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::{future::{select, ready}, stream::unfold, task::{Spawn, SpawnExt}};
|
|
||||||
use collation::collation_fetch;
|
|
||||||
use dynamic_inclusion::DynamicInclusion;
|
|
||||||
use inherents::InherentData;
|
|
||||||
use sp_timestamp::TimestampInherentData;
|
|
||||||
use log::{info, debug, warn, trace, error};
|
|
||||||
use keystore::KeyStorePtr;
|
|
||||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
|
||||||
|
|
||||||
type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
|
|
||||||
|
|
||||||
fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin {
|
|
||||||
unfold((), move |_| {
|
|
||||||
futures_timer::Delay::new(duration).map(|_| Some(((), ())))
|
|
||||||
}).map(drop)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
pub use self::block_production::ProposerFactory;
|
||||||
pub use self::collation::{
|
pub use self::collation::{
|
||||||
validate_collation, validate_incoming, message_queue_root, egress_roots, Collators,
|
validate_collation, validate_incoming, message_queue_root, egress_roots, Collators,
|
||||||
produce_receipt_and_chunks,
|
produce_receipt_and_chunks,
|
||||||
@@ -84,20 +56,19 @@ pub use self::shared_table::{
|
|||||||
SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement,
|
SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement,
|
||||||
GenericStatement,
|
GenericStatement,
|
||||||
};
|
};
|
||||||
|
pub use self::validation_service::{ServiceHandle, ServiceBuilder};
|
||||||
|
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
pub use parachain::wasm_executor::{run_worker as run_validation_worker};
|
pub use parachain::wasm_executor::{run_worker as run_validation_worker};
|
||||||
|
|
||||||
mod attestation_service;
|
|
||||||
mod dynamic_inclusion;
|
mod dynamic_inclusion;
|
||||||
mod evaluation;
|
mod evaluation;
|
||||||
mod error;
|
mod error;
|
||||||
mod shared_table;
|
mod shared_table;
|
||||||
|
|
||||||
pub mod collation;
|
pub mod collation;
|
||||||
|
pub mod validation_service;
|
||||||
// block size limit.
|
pub mod block_production;
|
||||||
const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
|
|
||||||
|
|
||||||
/// Incoming messages; a series of sorted (ParaId, Message) pairs.
|
/// Incoming messages; a series of sorted (ParaId, Message) pairs.
|
||||||
pub type Incoming = Vec<(ParaId, Vec<Message>)>;
|
pub type Incoming = Vec<(ParaId, Vec<Message>)>;
|
||||||
@@ -148,6 +119,13 @@ pub trait Network {
|
|||||||
) -> Self::BuildTableRouter;
|
) -> Self::BuildTableRouter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The local duty of a validator.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct LocalDuty {
|
||||||
|
validation: Chain,
|
||||||
|
index: ValidatorIndex,
|
||||||
|
}
|
||||||
|
|
||||||
/// Information about a specific group.
|
/// Information about a specific group.
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct GroupInfo {
|
pub struct GroupInfo {
|
||||||
@@ -246,595 +224,6 @@ pub fn outgoing_queues(outgoing_targeted: &'_ OutgoingMessages)
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// finds the first key we are capable of signing with out of the given set of validators,
|
|
||||||
// if any.
|
|
||||||
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
|
|
||||||
let keystore = keystore.read();
|
|
||||||
validators.iter()
|
|
||||||
.find_map(|v| {
|
|
||||||
keystore.key_pair::<ValidatorPair>(&v).ok()
|
|
||||||
})
|
|
||||||
.map(|pair| Arc::new(pair))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Constructs parachain-agreement instances.
|
|
||||||
struct ParachainValidation<C, N, P> {
|
|
||||||
/// The client instance.
|
|
||||||
client: Arc<P>,
|
|
||||||
/// The backing network handle.
|
|
||||||
network: N,
|
|
||||||
/// Parachain collators.
|
|
||||||
collators: C,
|
|
||||||
/// handle to remote task executor
|
|
||||||
handle: TaskExecutor,
|
|
||||||
/// Store for extrinsic data.
|
|
||||||
availability_store: AvailabilityStore,
|
|
||||||
/// Live agreements. Maps relay chain parent hashes to attestation
|
|
||||||
/// instances.
|
|
||||||
live_instances: Mutex<HashMap<Hash, Arc<AttestationTracker>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C, N, P> ParachainValidation<C, N, P> where
|
|
||||||
C: Collators + Send + Unpin + 'static + Sync,
|
|
||||||
N: Network,
|
|
||||||
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
|
|
||||||
P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
|
||||||
C::Collation: Send + Unpin + 'static,
|
|
||||||
N::TableRouter: Send + 'static,
|
|
||||||
N::BuildTableRouter: Unpin + Send + 'static,
|
|
||||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
|
||||||
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
|
|
||||||
{
|
|
||||||
/// Get an attestation table for given parent hash.
|
|
||||||
///
|
|
||||||
/// This starts a parachain agreement process on top of the parent hash if
|
|
||||||
/// one has not already started.
|
|
||||||
///
|
|
||||||
/// Additionally, this will trigger broadcast of data to the new block's duty
|
|
||||||
/// roster.
|
|
||||||
fn get_or_instantiate(
|
|
||||||
&self,
|
|
||||||
parent_hash: Hash,
|
|
||||||
keystore: &KeyStorePtr,
|
|
||||||
max_block_data_size: Option<u64>,
|
|
||||||
)
|
|
||||||
-> Result<Arc<AttestationTracker>, Error>
|
|
||||||
{
|
|
||||||
let mut live_instances = self.live_instances.lock();
|
|
||||||
if let Some(tracker) = live_instances.get(&parent_hash) {
|
|
||||||
return Ok(tracker.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
let id = BlockId::hash(parent_hash);
|
|
||||||
|
|
||||||
let validators = self.client.runtime_api().validators(&id)?;
|
|
||||||
let sign_with = signing_key(&validators[..], keystore);
|
|
||||||
|
|
||||||
let duty_roster = self.client.runtime_api().duty_roster(&id)?;
|
|
||||||
|
|
||||||
let (group_info, local_duty) = make_group_info(
|
|
||||||
duty_roster,
|
|
||||||
&validators,
|
|
||||||
sign_with.as_ref().map(|k| k.public()),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
|
|
||||||
parent_hash,
|
|
||||||
local_duty,
|
|
||||||
);
|
|
||||||
|
|
||||||
let active_parachains = self.client.runtime_api().active_parachains(&id)?;
|
|
||||||
|
|
||||||
debug!(target: "validation", "Active parachains: {:?}", active_parachains);
|
|
||||||
|
|
||||||
// If we are a validator, we need to store our index in this round in availability store.
|
|
||||||
// This will tell which erasure chunk we should store.
|
|
||||||
if let Some(ref local_duty) = local_duty {
|
|
||||||
if let Err(e) = self.availability_store.add_validator_index_and_n_validators(
|
|
||||||
&parent_hash,
|
|
||||||
local_duty.index,
|
|
||||||
validators.len() as u32,
|
|
||||||
) {
|
|
||||||
warn!(
|
|
||||||
target: "validation",
|
|
||||||
"Failed to add validator index and n_validators to the availability-store: {:?}", e
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let table = Arc::new(SharedTable::new(
|
|
||||||
validators.clone(),
|
|
||||||
group_info,
|
|
||||||
sign_with,
|
|
||||||
parent_hash,
|
|
||||||
self.availability_store.clone(),
|
|
||||||
max_block_data_size,
|
|
||||||
));
|
|
||||||
|
|
||||||
let (_drop_signal, exit) = exit_future::signal();
|
|
||||||
|
|
||||||
let router = self.network.communication_for(
|
|
||||||
table.clone(),
|
|
||||||
&validators,
|
|
||||||
exit.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) {
|
|
||||||
self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index, exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
let tracker = Arc::new(AttestationTracker {
|
|
||||||
table,
|
|
||||||
started: Instant::now(),
|
|
||||||
_drop_signal,
|
|
||||||
});
|
|
||||||
|
|
||||||
live_instances.insert(parent_hash, tracker.clone());
|
|
||||||
|
|
||||||
Ok(tracker)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Retain validation sessions matching predicate.
|
|
||||||
fn retain<F: FnMut(&Hash) -> bool>(&self, mut pred: F) {
|
|
||||||
self.live_instances.lock().retain(|k, _| pred(k))
|
|
||||||
}
|
|
||||||
|
|
||||||
// launch parachain work asynchronously.
|
|
||||||
fn launch_work(
|
|
||||||
&self,
|
|
||||||
relay_parent: Hash,
|
|
||||||
validation_para: ParaId,
|
|
||||||
build_router: N::BuildTableRouter,
|
|
||||||
max_block_data_size: Option<u64>,
|
|
||||||
authorities_num: usize,
|
|
||||||
local_id: ValidatorIndex,
|
|
||||||
exit: exit_future::Exit,
|
|
||||||
) {
|
|
||||||
let (collators, client) = (self.collators.clone(), self.client.clone());
|
|
||||||
let availability_store = self.availability_store.clone();
|
|
||||||
|
|
||||||
let with_router = move |router: N::TableRouter| {
|
|
||||||
// fetch a local collation from connected collators.
|
|
||||||
let collation_work = collation_fetch(
|
|
||||||
validation_para,
|
|
||||||
relay_parent,
|
|
||||||
collators,
|
|
||||||
client.clone(),
|
|
||||||
max_block_data_size,
|
|
||||||
);
|
|
||||||
|
|
||||||
collation_work.map(move |result| match result {
|
|
||||||
Ok((collation, outgoing_targeted, fees_charged)) => {
|
|
||||||
match produce_receipt_and_chunks(
|
|
||||||
authorities_num,
|
|
||||||
&collation.pov,
|
|
||||||
&outgoing_targeted,
|
|
||||||
fees_charged,
|
|
||||||
&collation.info,
|
|
||||||
) {
|
|
||||||
Ok((receipt, chunks)) => {
|
|
||||||
// Apparently the `async move` block is the only way to convince
|
|
||||||
// the compiler that we are not moving values out of borrowed context.
|
|
||||||
let av_clone = availability_store.clone();
|
|
||||||
let chunks_clone = chunks.clone();
|
|
||||||
let receipt_clone = receipt.clone();
|
|
||||||
|
|
||||||
let res = async move {
|
|
||||||
if let Err(e) = av_clone.clone().add_erasure_chunks(
|
|
||||||
relay_parent.clone(),
|
|
||||||
receipt_clone,
|
|
||||||
chunks_clone,
|
|
||||||
).await {
|
|
||||||
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.unit_error()
|
|
||||||
.boxed()
|
|
||||||
.then(move |_| {
|
|
||||||
router.local_collation(collation, receipt, outgoing_targeted, (local_id, &chunks));
|
|
||||||
ready(())
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Some(res))
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(target: "validation", "Failed to produce a receipt: {:?}", e);
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
let router = build_router
|
|
||||||
.map_ok(with_router)
|
|
||||||
.map_err(|e| {
|
|
||||||
warn!(target: "validation" , "Failed to build table router: {:?}", e);
|
|
||||||
})
|
|
||||||
.and_then(|f| f)
|
|
||||||
.and_then(|f| match f {
|
|
||||||
Some(f) => f.map(Ok).boxed(),
|
|
||||||
None => ready(Ok(())).boxed(),
|
|
||||||
}).boxed();
|
|
||||||
|
|
||||||
let cancellable_work = select(exit, router).map(drop);
|
|
||||||
|
|
||||||
// spawn onto thread pool.
|
|
||||||
if self.handle.spawn(cancellable_work).is_err() {
|
|
||||||
error!("Failed to spawn cancellable work task");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Parachain validation for a single block.
|
|
||||||
struct AttestationTracker {
|
|
||||||
_drop_signal: exit_future::Signal,
|
|
||||||
table: Arc<SharedTable>,
|
|
||||||
started: Instant,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Polkadot proposer factory.
|
|
||||||
pub struct ProposerFactory<C, N, P, SC, TxPool, B> {
|
|
||||||
parachain_validation: Arc<ParachainValidation<C, N, P>>,
|
|
||||||
transaction_pool: Arc<TxPool>,
|
|
||||||
keystore: KeyStorePtr,
|
|
||||||
_service_handle: ServiceHandle,
|
|
||||||
babe_slot_duration: u64,
|
|
||||||
_select_chain: SC,
|
|
||||||
max_block_data_size: Option<u64>,
|
|
||||||
backend: Arc<B>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C, N, P, SC, TxPool, B> ProposerFactory<C, N, P, SC, TxPool, B> where
|
|
||||||
C: Collators + Send + Sync + Unpin + 'static,
|
|
||||||
C::Collation: Send + Unpin + 'static,
|
|
||||||
P: BlockchainEvents<Block> + BlockBody<Block>,
|
|
||||||
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
|
|
||||||
P::Api: ParachainHost<Block> +
|
|
||||||
BlockBuilderApi<Block> +
|
|
||||||
BabeApi<Block> +
|
|
||||||
ApiExt<Block, Error = sp_blockchain::Error>,
|
|
||||||
N: Network + Send + Sync + 'static,
|
|
||||||
N::TableRouter: Send + 'static,
|
|
||||||
N::BuildTableRouter: Send + Unpin + 'static,
|
|
||||||
TxPool: TransactionPool,
|
|
||||||
SC: SelectChain<Block> + 'static,
|
|
||||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
|
||||||
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
|
|
||||||
{
|
|
||||||
/// Create a new proposer factory.
|
|
||||||
pub fn new(
|
|
||||||
client: Arc<P>,
|
|
||||||
_select_chain: SC,
|
|
||||||
network: N,
|
|
||||||
collators: C,
|
|
||||||
transaction_pool: Arc<TxPool>,
|
|
||||||
thread_pool: TaskExecutor,
|
|
||||||
keystore: KeyStorePtr,
|
|
||||||
availability_store: AvailabilityStore,
|
|
||||||
babe_slot_duration: u64,
|
|
||||||
max_block_data_size: Option<u64>,
|
|
||||||
backend: Arc<B>,
|
|
||||||
) -> Self {
|
|
||||||
let parachain_validation = Arc::new(ParachainValidation {
|
|
||||||
client: client.clone(),
|
|
||||||
network,
|
|
||||||
collators,
|
|
||||||
handle: thread_pool.clone(),
|
|
||||||
availability_store: availability_store.clone(),
|
|
||||||
live_instances: Mutex::new(HashMap::new()),
|
|
||||||
});
|
|
||||||
|
|
||||||
let service_handle = crate::attestation_service::start(
|
|
||||||
client,
|
|
||||||
_select_chain.clone(),
|
|
||||||
parachain_validation.clone(),
|
|
||||||
thread_pool,
|
|
||||||
keystore.clone(),
|
|
||||||
max_block_data_size,
|
|
||||||
);
|
|
||||||
|
|
||||||
ProposerFactory {
|
|
||||||
parachain_validation,
|
|
||||||
transaction_pool,
|
|
||||||
keystore,
|
|
||||||
_service_handle: service_handle,
|
|
||||||
babe_slot_duration,
|
|
||||||
_select_chain,
|
|
||||||
max_block_data_size,
|
|
||||||
backend,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C, N, P, SC, TxPool, B> consensus::Environment<Block> for ProposerFactory<C, N, P, SC, TxPool, B> where
|
|
||||||
C: Collators + Send + Unpin + 'static + Sync,
|
|
||||||
N: Network,
|
|
||||||
TxPool: TransactionPool<Block=Block> + 'static,
|
|
||||||
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
|
|
||||||
P::Api: ParachainHost<Block> +
|
|
||||||
BlockBuilderApi<Block> +
|
|
||||||
BabeApi<Block> +
|
|
||||||
ApiExt<Block, Error = sp_blockchain::Error>,
|
|
||||||
C::Collation: Send + Unpin + 'static,
|
|
||||||
N::TableRouter: Send + 'static,
|
|
||||||
N::BuildTableRouter: Send + Unpin + 'static,
|
|
||||||
SC: SelectChain<Block>,
|
|
||||||
B: Backend<Block, State = sp_api::StateBackendFor<P, Block>> + 'static,
|
|
||||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
|
||||||
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
|
|
||||||
{
|
|
||||||
type CreateProposer = Pin<Box<
|
|
||||||
dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + Unpin + 'static
|
|
||||||
>>;
|
|
||||||
type Proposer = Proposer<P, TxPool, B>;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn init(
|
|
||||||
&mut self,
|
|
||||||
parent_header: &Header,
|
|
||||||
) -> Self::CreateProposer {
|
|
||||||
let parent_hash = parent_header.hash();
|
|
||||||
let parent_id = BlockId::hash(parent_hash);
|
|
||||||
|
|
||||||
let maybe_proposer = self.parachain_validation.get_or_instantiate(
|
|
||||||
parent_hash,
|
|
||||||
&self.keystore,
|
|
||||||
self.max_block_data_size,
|
|
||||||
).map(|tracker| Proposer {
|
|
||||||
client: self.parachain_validation.client.clone(),
|
|
||||||
tracker,
|
|
||||||
parent_hash,
|
|
||||||
parent_id,
|
|
||||||
parent_number: parent_header.number,
|
|
||||||
transaction_pool: self.transaction_pool.clone(),
|
|
||||||
slot_duration: self.babe_slot_duration,
|
|
||||||
backend: self.backend.clone(),
|
|
||||||
});
|
|
||||||
|
|
||||||
Box::pin(future::ready(maybe_proposer))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The local duty of a validator.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct LocalDuty {
|
|
||||||
validation: Chain,
|
|
||||||
index: ValidatorIndex,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The Polkadot proposer logic.
|
|
||||||
pub struct Proposer<Client, TxPool, Backend> {
|
|
||||||
client: Arc<Client>,
|
|
||||||
parent_hash: Hash,
|
|
||||||
parent_id: BlockId,
|
|
||||||
parent_number: BlockNumber,
|
|
||||||
tracker: Arc<AttestationTracker>,
|
|
||||||
transaction_pool: Arc<TxPool>,
|
|
||||||
slot_duration: u64,
|
|
||||||
backend: Arc<Backend>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where
|
|
||||||
TxPool: TransactionPool<Block=Block> + 'static,
|
|
||||||
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
|
|
||||||
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
|
||||||
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
|
|
||||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
|
||||||
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
|
|
||||||
{
|
|
||||||
type Error = Error;
|
|
||||||
type Transaction = sp_api::TransactionFor<Client, Block>;
|
|
||||||
type Proposal = Pin<
|
|
||||||
Box<
|
|
||||||
dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>>
|
|
||||||
+ Send
|
|
||||||
>
|
|
||||||
>;
|
|
||||||
|
|
||||||
fn propose(&mut self,
|
|
||||||
inherent_data: InherentData,
|
|
||||||
inherent_digests: DigestFor<Block>,
|
|
||||||
max_duration: Duration,
|
|
||||||
record_proof: RecordProof,
|
|
||||||
) -> Self::Proposal {
|
|
||||||
const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates.
|
|
||||||
|
|
||||||
let initial_included = self.tracker.table.includable_count();
|
|
||||||
let now = Instant::now();
|
|
||||||
|
|
||||||
let dynamic_inclusion = DynamicInclusion::new(
|
|
||||||
self.tracker.table.num_parachains(),
|
|
||||||
self.tracker.started,
|
|
||||||
Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
|
|
||||||
);
|
|
||||||
|
|
||||||
let parent_hash = self.parent_hash.clone();
|
|
||||||
let parent_number = self.parent_number.clone();
|
|
||||||
let parent_id = self.parent_id.clone();
|
|
||||||
let client = self.client.clone();
|
|
||||||
let transaction_pool = self.transaction_pool.clone();
|
|
||||||
let table = self.tracker.table.clone();
|
|
||||||
let backend = self.backend.clone();
|
|
||||||
|
|
||||||
async move {
|
|
||||||
let enough_candidates = dynamic_inclusion.acceptable_in(
|
|
||||||
now,
|
|
||||||
initial_included,
|
|
||||||
).unwrap_or_else(|| Duration::from_millis(1));
|
|
||||||
|
|
||||||
let believed_timestamp = match inherent_data.timestamp_inherent_data() {
|
|
||||||
Ok(timestamp) => timestamp,
|
|
||||||
Err(e) => return Err(Error::InherentError(e)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let deadline_diff = max_duration - max_duration / 3;
|
|
||||||
let deadline = match Instant::now().checked_add(deadline_diff) {
|
|
||||||
None => return Err(Error::DeadlineComputeFailure(deadline_diff)),
|
|
||||||
Some(d) => d,
|
|
||||||
};
|
|
||||||
|
|
||||||
let data = CreateProposalData {
|
|
||||||
parent_hash,
|
|
||||||
parent_number,
|
|
||||||
parent_id,
|
|
||||||
client,
|
|
||||||
transaction_pool,
|
|
||||||
table,
|
|
||||||
believed_minimum_timestamp: believed_timestamp,
|
|
||||||
inherent_data: Some(inherent_data),
|
|
||||||
inherent_digests,
|
|
||||||
// leave some time for the proposal finalisation
|
|
||||||
deadline,
|
|
||||||
record_proof,
|
|
||||||
backend,
|
|
||||||
};
|
|
||||||
|
|
||||||
// set up delay until next allowed timestamp.
|
|
||||||
let current_timestamp = current_timestamp();
|
|
||||||
if current_timestamp < believed_timestamp {
|
|
||||||
Delay::new(Duration::from_millis(current_timestamp - believed_timestamp))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Delay::new(enough_candidates).await;
|
|
||||||
|
|
||||||
tokio_executor::blocking::run(move || {
|
|
||||||
let proposed_candidates = data.table.proposed_set();
|
|
||||||
data.propose_with(proposed_candidates)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
}.boxed()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn current_timestamp() -> u64 {
|
|
||||||
time::SystemTime::now().duration_since(time::UNIX_EPOCH)
|
|
||||||
.expect("now always later than unix epoch; qed")
|
|
||||||
.as_millis() as u64
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Inner data of the create proposal.
|
|
||||||
struct CreateProposalData<Client, TxPool, Backend> {
|
|
||||||
parent_hash: Hash,
|
|
||||||
parent_number: BlockNumber,
|
|
||||||
parent_id: BlockId,
|
|
||||||
client: Arc<Client>,
|
|
||||||
transaction_pool: Arc<TxPool>,
|
|
||||||
table: Arc<SharedTable>,
|
|
||||||
believed_minimum_timestamp: u64,
|
|
||||||
inherent_data: Option<InherentData>,
|
|
||||||
inherent_digests: DigestFor<Block>,
|
|
||||||
deadline: Instant,
|
|
||||||
record_proof: RecordProof,
|
|
||||||
backend: Arc<Backend>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Client, TxPool, Backend> CreateProposalData<Client, TxPool, Backend> where
|
|
||||||
TxPool: TransactionPool<Block=Block>,
|
|
||||||
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
|
|
||||||
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
|
||||||
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
|
|
||||||
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
|
||||||
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
|
|
||||||
{
|
|
||||||
fn propose_with(
|
|
||||||
mut self,
|
|
||||||
candidates: Vec<AttestedCandidate>,
|
|
||||||
) -> Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error> {
|
|
||||||
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
|
|
||||||
|
|
||||||
const MAX_TRANSACTIONS: usize = 40;
|
|
||||||
|
|
||||||
let mut inherent_data = self.inherent_data
|
|
||||||
.take()
|
|
||||||
.expect("CreateProposal is not polled after finishing; qed");
|
|
||||||
inherent_data.put_data(NEW_HEADS_IDENTIFIER, &candidates)
|
|
||||||
.map_err(Error::InherentError)?;
|
|
||||||
|
|
||||||
let runtime_api = self.client.runtime_api();
|
|
||||||
|
|
||||||
let mut block_builder = block_builder::BlockBuilder::new(
|
|
||||||
&*self.client,
|
|
||||||
self.client.expect_block_hash_from_id(&self.parent_id)?,
|
|
||||||
self.client.expect_block_number_from_id(&self.parent_id)?,
|
|
||||||
self.record_proof,
|
|
||||||
self.inherent_digests.clone(),
|
|
||||||
&*self.backend,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
{
|
|
||||||
let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?;
|
|
||||||
for inherent in inherents {
|
|
||||||
block_builder.push(inherent)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut unqueue_invalid = Vec::new();
|
|
||||||
let mut pending_size = 0;
|
|
||||||
|
|
||||||
let ready_iter = self.transaction_pool.ready();
|
|
||||||
for ready in ready_iter.take(MAX_TRANSACTIONS) {
|
|
||||||
let encoded_size = ready.data().encode().len();
|
|
||||||
if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if Instant::now() > self.deadline {
|
|
||||||
debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
match block_builder.push(ready.data().clone()) {
|
|
||||||
Ok(()) => {
|
|
||||||
debug!("[{:?}] Pushed to the block.", ready.hash());
|
|
||||||
pending_size += encoded_size;
|
|
||||||
}
|
|
||||||
Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
|
|
||||||
if e.exhausted_resources() =>
|
|
||||||
{
|
|
||||||
debug!("Block is full, proceed with proposing.");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
|
|
||||||
unqueue_invalid.push(ready.hash().clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.transaction_pool.remove_invalid(&unqueue_invalid);
|
|
||||||
}
|
|
||||||
|
|
||||||
let (new_block, storage_changes, proof) = block_builder.build()?.into_inner();
|
|
||||||
|
|
||||||
info!("Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]",
|
|
||||||
new_block.header.number,
|
|
||||||
Hash::from(new_block.header.hash()),
|
|
||||||
new_block.header.parent_hash,
|
|
||||||
new_block.extrinsics.iter()
|
|
||||||
.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.join(", ")
|
|
||||||
);
|
|
||||||
|
|
||||||
// TODO: full re-evaluation (https://github.com/paritytech/polkadot/issues/216)
|
|
||||||
let active_parachains = runtime_api.active_parachains(&self.parent_id)?;
|
|
||||||
assert!(evaluation::evaluate_initial(
|
|
||||||
&new_block,
|
|
||||||
self.believed_minimum_timestamp,
|
|
||||||
&self.parent_hash,
|
|
||||||
self.parent_number,
|
|
||||||
&active_parachains[..],
|
|
||||||
).is_ok());
|
|
||||||
|
|
||||||
Ok(Proposal { block: new_block, storage_changes, proof })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -431,7 +431,10 @@ impl SharedTable {
|
|||||||
|
|
||||||
/// Import a single statement with remote source, whose signature has already been checked.
|
/// Import a single statement with remote source, whose signature has already been checked.
|
||||||
///
|
///
|
||||||
/// The statement producer, if any, will produce only statements concerning the same candidate
|
/// Validity and invalidity statements are only valid if the corresponding
|
||||||
|
/// candidate has already been imported.
|
||||||
|
///
|
||||||
|
/// The ParachainWork, if any, will produce only statements concerning the same candidate
|
||||||
/// as the one just imported
|
/// as the one just imported
|
||||||
pub fn import_remote_statement<R: TableRouter>(
|
pub fn import_remote_statement<R: TableRouter>(
|
||||||
&self,
|
&self,
|
||||||
@@ -446,8 +449,10 @@ impl SharedTable {
|
|||||||
/// Import many statements at once.
|
/// Import many statements at once.
|
||||||
///
|
///
|
||||||
/// Provide an iterator yielding remote, pre-checked statements.
|
/// Provide an iterator yielding remote, pre-checked statements.
|
||||||
|
/// Validity and invalidity statements are only valid if the corresponding
|
||||||
|
/// candidate has already been imported.
|
||||||
///
|
///
|
||||||
/// The statement producer, if any, will produce only statements concerning the same candidate
|
/// The ParachainWork, if any, will produce only statements concerning the same candidate
|
||||||
/// as the one just imported
|
/// as the one just imported
|
||||||
pub fn import_remote_statements<R, I, U>(&self, router: &R, iterable: I) -> U
|
pub fn import_remote_statements<R, I, U>(&self, router: &R, iterable: I) -> U
|
||||||
where
|
where
|
||||||
|
|||||||
@@ -0,0 +1,463 @@
|
|||||||
|
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Polkadot.
|
||||||
|
|
||||||
|
// Polkadot is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Polkadot is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! The validation service is a long-running future that creates and manages parachain attestation
|
||||||
|
//! instances.
|
||||||
|
//!
|
||||||
|
//! As soon as we import a new chain head, we start a parachain attestation session on top of it.
|
||||||
|
//! The block authorship service may want access to the attestation session, and for that reason
|
||||||
|
//! we expose a `ServiceHandle` which can be used to request a copy of it.
|
||||||
|
//!
|
||||||
|
//! In fact, the import notification and request from the block production pipeline may race to be
|
||||||
|
//! the first one to create the instant, but the import notification will usually win.
|
||||||
|
//!
|
||||||
|
//! These attestation sessions are kept live until they are periodically garbage-collected.
|
||||||
|
|
||||||
|
use std::{time::{Duration, Instant}, sync::Arc};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use sc_client_api::{BlockchainEvents, BlockBody};
|
||||||
|
use sp_blockchain::HeaderBackend;
|
||||||
|
use block_builder::BlockBuilderApi;
|
||||||
|
use consensus::SelectChain;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use futures::{future::{ready, select}, task::{Spawn, SpawnExt}};
|
||||||
|
use polkadot_primitives::{Block, Hash, BlockId};
|
||||||
|
use polkadot_primitives::parachain::{
|
||||||
|
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
|
||||||
|
};
|
||||||
|
use babe_primitives::BabeApi;
|
||||||
|
use keystore::KeyStorePtr;
|
||||||
|
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||||
|
use runtime_primitives::traits::HasherFor;
|
||||||
|
use availability_store::Store as AvailabilityStore;
|
||||||
|
|
||||||
|
use log::{warn, error, info, debug};
|
||||||
|
|
||||||
|
use super::{Network, Collators, SharedTable, TableRouter};
|
||||||
|
use crate::Error;
|
||||||
|
|
||||||
|
/// A handle to spawn background tasks onto.
|
||||||
|
pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
|
||||||
|
|
||||||
|
// Remote processes may request for a validation instance to be cloned or instantiated.
|
||||||
|
// They send a oneshot channel.
|
||||||
|
type ValidationInstanceRequest = (
|
||||||
|
Hash,
|
||||||
|
futures::channel::oneshot::Sender<Result<Arc<ValidationInstanceHandle>, Error>>,
|
||||||
|
);
|
||||||
|
|
||||||
|
/// A handle to a single instance of parachain validation, which is pinned to
|
||||||
|
/// a specific relay-chain block. This is the instance that should be used when
|
||||||
|
/// constructing any
|
||||||
|
pub(crate) struct ValidationInstanceHandle {
|
||||||
|
_drop_signal: exit_future::Signal,
|
||||||
|
table: Arc<SharedTable>,
|
||||||
|
started: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ValidationInstanceHandle {
|
||||||
|
/// Access the underlying table of attestations on parachain candidates.
|
||||||
|
pub(crate) fn table(&self) -> &Arc<SharedTable> {
|
||||||
|
&self.table
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The moment we started this validation instance.
|
||||||
|
pub(crate) fn started(&self) -> Instant {
|
||||||
|
self.started.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A handle to the service. This can be used to create a block-production environment.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ServiceHandle {
|
||||||
|
sender: futures::channel::mpsc::Sender<ValidationInstanceRequest>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServiceHandle {
|
||||||
|
/// Requests instantiation or cloning of a validation instance from the service.
|
||||||
|
///
|
||||||
|
/// This can fail if the service task has shut down for some reason.
|
||||||
|
pub(crate) async fn get_validation_instance(self, relay_parent: Hash)
|
||||||
|
-> Result<Arc<ValidationInstanceHandle>, Error>
|
||||||
|
{
|
||||||
|
let mut sender = self.sender;
|
||||||
|
let instance_rx = loop {
|
||||||
|
let (instance_tx, instance_rx) = futures::channel::oneshot::channel();
|
||||||
|
match sender.send((relay_parent, instance_tx)).await {
|
||||||
|
Ok(()) => break instance_rx,
|
||||||
|
Err(e) => if !e.is_full() {
|
||||||
|
// Sink::send should be doing `poll_ready` before start-send,
|
||||||
|
// so this should only happen when there is a race.
|
||||||
|
return Err(Error::ValidationServiceDown)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
instance_rx.map_err(|_| Error::ValidationServiceDown).await.and_then(|x| x)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin {
|
||||||
|
stream::unfold((), move |_| {
|
||||||
|
futures_timer::Delay::new(duration).map(|_| Some(((), ())))
|
||||||
|
}).map(drop)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A builder for the validation service.
|
||||||
|
pub struct ServiceBuilder<C, N, P, SC> {
|
||||||
|
/// The underlying blockchain client.
|
||||||
|
pub client: Arc<P>,
|
||||||
|
/// A handle to the network object used to communicate.
|
||||||
|
pub network: N,
|
||||||
|
/// A handle to the collator pool we are using.
|
||||||
|
pub collators: C,
|
||||||
|
/// A handle to a background executor.
|
||||||
|
pub task_executor: TaskExecutor,
|
||||||
|
/// A handle to the availability store.
|
||||||
|
pub availability_store: AvailabilityStore,
|
||||||
|
/// A chain selector for determining active leaves in the block-DAG.
|
||||||
|
pub select_chain: SC,
|
||||||
|
/// The keystore which holds the signing keys.
|
||||||
|
pub keystore: KeyStorePtr,
|
||||||
|
/// The maximum block-data size in bytes.
|
||||||
|
pub max_block_data_size: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<C, N, P, SC> ServiceBuilder<C, N, P, SC> where
|
||||||
|
C: Collators + Send + Sync + Unpin + 'static,
|
||||||
|
C::Collation: Send + Unpin + 'static,
|
||||||
|
P: BlockchainEvents<Block> + BlockBody<Block>,
|
||||||
|
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
|
||||||
|
P::Api: ParachainHost<Block> +
|
||||||
|
BlockBuilderApi<Block> +
|
||||||
|
BabeApi<Block> +
|
||||||
|
ApiExt<Block, Error = sp_blockchain::Error>,
|
||||||
|
N: Network + Send + Sync + 'static,
|
||||||
|
N::TableRouter: Send + 'static,
|
||||||
|
N::BuildTableRouter: Send + Unpin + 'static,
|
||||||
|
SC: SelectChain<Block> + 'static,
|
||||||
|
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||||
|
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
|
||||||
|
{
|
||||||
|
/// Build the service - this consists of a handle to it, as well as a background
|
||||||
|
/// future to be run to completion.
|
||||||
|
pub fn build(self) -> (ServiceHandle, impl Future<Output = ()> + Send + 'static) {
|
||||||
|
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
|
||||||
|
const CHAN_BUFFER: usize = 10;
|
||||||
|
|
||||||
|
enum Message {
|
||||||
|
CollectGarbage,
|
||||||
|
// relay-parent, receiver for instance.
|
||||||
|
RequestInstance(ValidationInstanceRequest),
|
||||||
|
// new chain heads - import notification.
|
||||||
|
NotifyImport(sc_client_api::BlockImportNotification<Block>),
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut parachain_validation = ParachainValidationInstances {
|
||||||
|
client: self.client.clone(),
|
||||||
|
network: self.network,
|
||||||
|
collators: self.collators,
|
||||||
|
handle: self.task_executor,
|
||||||
|
availability_store: self.availability_store,
|
||||||
|
live_instances: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let client = self.client;
|
||||||
|
let select_chain = self.select_chain;
|
||||||
|
let keystore = self.keystore;
|
||||||
|
let max_block_data_size = self.max_block_data_size;
|
||||||
|
|
||||||
|
let (tx, rx) = futures::channel::mpsc::channel(CHAN_BUFFER);
|
||||||
|
let interval = interval(TIMER_INTERVAL).map(|_| Message::CollectGarbage);
|
||||||
|
let import_notifications = client.import_notification_stream().map(Message::NotifyImport);
|
||||||
|
let instance_requests = rx.map(Message::RequestInstance);
|
||||||
|
let service = ServiceHandle { sender: tx };
|
||||||
|
|
||||||
|
let background_work = async move {
|
||||||
|
let message_stream = futures::stream::select(interval, instance_requests);
|
||||||
|
let mut message_stream = futures::stream::select(import_notifications, message_stream);
|
||||||
|
while let Some(message) = message_stream.next().await {
|
||||||
|
match message {
|
||||||
|
Message::CollectGarbage => {
|
||||||
|
match select_chain.leaves() {
|
||||||
|
Ok(leaves) => {
|
||||||
|
parachain_validation.retain(|h| leaves.contains(h));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error fetching leaves from client: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Message::RequestInstance((relay_parent, sender)) => {
|
||||||
|
// Upstream will handle the failure case.
|
||||||
|
let _ = sender.send(parachain_validation.get_or_instantiate(
|
||||||
|
relay_parent,
|
||||||
|
&keystore,
|
||||||
|
max_block_data_size,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Message::NotifyImport(notification) => {
|
||||||
|
let relay_parent = notification.hash;
|
||||||
|
if notification.is_new_best {
|
||||||
|
let res = parachain_validation.get_or_instantiate(
|
||||||
|
relay_parent,
|
||||||
|
&keystore,
|
||||||
|
max_block_data_size,
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Err(e) = res {
|
||||||
|
warn!(
|
||||||
|
"Unable to start parachain validation on top of {:?}: {}",
|
||||||
|
relay_parent, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
(service, background_work)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// finds the first key we are capable of signing with out of the given set of validators,
|
||||||
|
// if any.
|
||||||
|
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
|
||||||
|
let keystore = keystore.read();
|
||||||
|
validators.iter()
|
||||||
|
.find_map(|v| {
|
||||||
|
keystore.key_pair::<ValidatorPair>(&v).ok()
|
||||||
|
})
|
||||||
|
.map(|pair| Arc::new(pair))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Constructs parachain-agreement instances.
|
||||||
|
pub(crate) struct ParachainValidationInstances<C, N, P> {
|
||||||
|
/// The client instance.
|
||||||
|
client: Arc<P>,
|
||||||
|
/// The backing network handle.
|
||||||
|
network: N,
|
||||||
|
/// Parachain collators.
|
||||||
|
collators: C,
|
||||||
|
/// handle to remote task executor
|
||||||
|
handle: TaskExecutor,
|
||||||
|
/// Store for extrinsic data.
|
||||||
|
availability_store: AvailabilityStore,
|
||||||
|
/// Live agreements. Maps relay chain parent hashes to attestation
|
||||||
|
/// instances.
|
||||||
|
live_instances: HashMap<Hash, Arc<ValidationInstanceHandle>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<C, N, P> ParachainValidationInstances<C, N, P> where
|
||||||
|
C: Collators + Send + Unpin + 'static,
|
||||||
|
N: Network,
|
||||||
|
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
|
||||||
|
P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
|
||||||
|
C::Collation: Send + Unpin + 'static,
|
||||||
|
N::TableRouter: Send + 'static,
|
||||||
|
N::BuildTableRouter: Unpin + Send + 'static,
|
||||||
|
// Rust bug: https://github.com/rust-lang/rust/issues/24159
|
||||||
|
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
|
||||||
|
{
|
||||||
|
/// Get an attestation table for given parent hash.
|
||||||
|
///
|
||||||
|
/// This starts a parachain agreement process on top of the parent hash if
|
||||||
|
/// one has not already started.
|
||||||
|
///
|
||||||
|
/// Additionally, this will trigger broadcast of data to the new block's duty
|
||||||
|
/// roster.
|
||||||
|
fn get_or_instantiate(
|
||||||
|
&mut self,
|
||||||
|
parent_hash: Hash,
|
||||||
|
keystore: &KeyStorePtr,
|
||||||
|
max_block_data_size: Option<u64>,
|
||||||
|
)
|
||||||
|
-> Result<Arc<ValidationInstanceHandle>, Error>
|
||||||
|
{
|
||||||
|
use primitives::Pair;
|
||||||
|
|
||||||
|
if let Some(tracker) = self.live_instances.get(&parent_hash) {
|
||||||
|
return Ok(tracker.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let id = BlockId::hash(parent_hash);
|
||||||
|
|
||||||
|
let validators = self.client.runtime_api().validators(&id)?;
|
||||||
|
let sign_with = signing_key(&validators[..], keystore);
|
||||||
|
|
||||||
|
let duty_roster = self.client.runtime_api().duty_roster(&id)?;
|
||||||
|
|
||||||
|
let (group_info, local_duty) = crate::make_group_info(
|
||||||
|
duty_roster,
|
||||||
|
&validators,
|
||||||
|
sign_with.as_ref().map(|k| k.public()),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
|
||||||
|
parent_hash,
|
||||||
|
local_duty,
|
||||||
|
);
|
||||||
|
|
||||||
|
let active_parachains = self.client.runtime_api().active_parachains(&id)?;
|
||||||
|
|
||||||
|
debug!(target: "validation", "Active parachains: {:?}", active_parachains);
|
||||||
|
|
||||||
|
// If we are a validator, we need to store our index in this round in availability store.
|
||||||
|
// This will tell which erasure chunk we should store.
|
||||||
|
if let Some(ref local_duty) = local_duty {
|
||||||
|
if let Err(e) = self.availability_store.add_validator_index_and_n_validators(
|
||||||
|
&parent_hash,
|
||||||
|
local_duty.index,
|
||||||
|
validators.len() as u32,
|
||||||
|
) {
|
||||||
|
warn!(
|
||||||
|
target: "validation",
|
||||||
|
"Failed to add validator index and n_validators to the availability-store: {:?}", e
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let table = Arc::new(SharedTable::new(
|
||||||
|
validators.clone(),
|
||||||
|
group_info,
|
||||||
|
sign_with,
|
||||||
|
parent_hash,
|
||||||
|
self.availability_store.clone(),
|
||||||
|
max_block_data_size,
|
||||||
|
));
|
||||||
|
|
||||||
|
let (_drop_signal, exit) = exit_future::signal();
|
||||||
|
|
||||||
|
let router = self.network.communication_for(
|
||||||
|
table.clone(),
|
||||||
|
&validators,
|
||||||
|
exit.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) {
|
||||||
|
self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index, exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
let tracker = Arc::new(ValidationInstanceHandle {
|
||||||
|
table,
|
||||||
|
started: Instant::now(),
|
||||||
|
_drop_signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
self.live_instances.insert(parent_hash, tracker.clone());
|
||||||
|
|
||||||
|
Ok(tracker)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retain validation sessions matching predicate.
|
||||||
|
fn retain<F: FnMut(&Hash) -> bool>(&mut self, mut pred: F) {
|
||||||
|
self.live_instances.retain(|k, _| pred(k))
|
||||||
|
}
|
||||||
|
|
||||||
|
// launch parachain work asynchronously.
|
||||||
|
fn launch_work(
|
||||||
|
&self,
|
||||||
|
relay_parent: Hash,
|
||||||
|
validation_para: ParaId,
|
||||||
|
build_router: N::BuildTableRouter,
|
||||||
|
max_block_data_size: Option<u64>,
|
||||||
|
authorities_num: usize,
|
||||||
|
local_id: ValidatorIndex,
|
||||||
|
exit: exit_future::Exit,
|
||||||
|
) {
|
||||||
|
let (collators, client) = (self.collators.clone(), self.client.clone());
|
||||||
|
let availability_store = self.availability_store.clone();
|
||||||
|
|
||||||
|
let with_router = move |router: N::TableRouter| {
|
||||||
|
// fetch a local collation from connected collators.
|
||||||
|
let collation_work = crate::collation::collation_fetch(
|
||||||
|
validation_para,
|
||||||
|
relay_parent,
|
||||||
|
collators,
|
||||||
|
client.clone(),
|
||||||
|
max_block_data_size,
|
||||||
|
);
|
||||||
|
|
||||||
|
collation_work.map(move |result| match result {
|
||||||
|
Ok((collation, outgoing_targeted, fees_charged)) => {
|
||||||
|
match crate::collation::produce_receipt_and_chunks(
|
||||||
|
authorities_num,
|
||||||
|
&collation.pov,
|
||||||
|
&outgoing_targeted,
|
||||||
|
fees_charged,
|
||||||
|
&collation.info,
|
||||||
|
) {
|
||||||
|
Ok((receipt, chunks)) => {
|
||||||
|
// Apparently the `async move` block is the only way to convince
|
||||||
|
// the compiler that we are not moving values out of borrowed context.
|
||||||
|
let av_clone = availability_store.clone();
|
||||||
|
let chunks_clone = chunks.clone();
|
||||||
|
let receipt_clone = receipt.clone();
|
||||||
|
|
||||||
|
let res = async move {
|
||||||
|
if let Err(e) = av_clone.clone().add_erasure_chunks(
|
||||||
|
relay_parent.clone(),
|
||||||
|
receipt_clone,
|
||||||
|
chunks_clone,
|
||||||
|
).await {
|
||||||
|
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.unit_error()
|
||||||
|
.boxed()
|
||||||
|
.then(move |_| {
|
||||||
|
router.local_collation(
|
||||||
|
collation,
|
||||||
|
receipt,
|
||||||
|
outgoing_targeted,
|
||||||
|
(local_id, &chunks),
|
||||||
|
);
|
||||||
|
ready(())
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
Some(res)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(target: "validation", "Failed to produce a receipt: {:?}", e);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let router = build_router
|
||||||
|
.map_ok(with_router)
|
||||||
|
.map_err(|e| {
|
||||||
|
warn!(target: "validation" , "Failed to build table router: {:?}", e);
|
||||||
|
});
|
||||||
|
|
||||||
|
let cancellable_work = select(exit, router).map(drop);
|
||||||
|
|
||||||
|
// spawn onto thread pool.
|
||||||
|
if self.handle.spawn(cancellable_work).is_err() {
|
||||||
|
error!("Failed to spawn cancellable work task");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user