Remove v0 node-side parachains code (#1609)

* clean out v0 consensus crates

* remove service dependencies on old consensus code

* fix cli

* kill adder-collator

* bump Cargo.lock
This commit is contained in:
Robert Habermeier
2020-08-24 13:43:01 +02:00
committed by GitHub
parent 591e9b7454
commit 430cf6e6f2
40 changed files with 40 additions and 11469 deletions
+17 -82
View File
@@ -22,36 +22,25 @@
use std::{
pin::Pin,
sync::Arc,
time::{self, Duration, Instant},
time::Duration,
};
use sp_blockchain::HeaderBackend;
use block_builder::{BlockBuilderApi, BlockBuilderProvider};
use consensus::{Proposal, RecordProof};
use polkadot_primitives::v0::{Block, Header};
use polkadot_primitives::v0::{
ParachainHost, NEW_HEADS_IDENTIFIER,
};
use polkadot_primitives::v0::{NEW_HEADS_IDENTIFIER, Block, Header, AttestedCandidate};
use runtime_primitives::traits::{DigestFor, HashFor};
use futures_timer::Delay;
use txpool_api::TransactionPool;
use futures::prelude::*;
use inherents::InherentData;
use sp_timestamp::TimestampInherentData;
use sp_api::{ApiExt, ProvideRuntimeApi};
use prometheus_endpoint::Registry as PrometheusRegistry;
use crate::{
Error,
dynamic_inclusion::DynamicInclusion,
validation_service::ServiceHandle,
};
use crate::Error;
// Polkadot proposer factory.
pub struct ProposerFactory<Client, TxPool, Backend> {
service_handle: ServiceHandle,
babe_slot_duration: u64,
factory: sc_basic_authorship::ProposerFactory<TxPool, Backend, Client>,
}
@@ -60,8 +49,6 @@ impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> {
pub fn new(
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
service_handle: ServiceHandle,
babe_slot_duration: u64,
prometheus: Option<&PrometheusRegistry>,
) -> Self {
let factory = sc_basic_authorship::ProposerFactory::new(
@@ -70,8 +57,6 @@ impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> {
prometheus,
);
ProposerFactory {
service_handle,
babe_slot_duration,
factory,
}
}
@@ -82,7 +67,7 @@ impl<Client, TxPool, Backend> consensus::Environment<Block>
where
TxPool: TransactionPool<Block=Block> + 'static,
Client: BlockBuilderProvider<Backend, Block, Client> + ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block>
Client::Api: BlockBuilderApi<Block>
+ ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<
Block,
@@ -101,37 +86,24 @@ where
&mut self,
parent_header: &Header,
) -> Self::CreateProposer {
let parent_hash = parent_header.hash();
let slot_duration = self.babe_slot_duration.clone();
let proposer = self.factory.init(parent_header).into_inner();
let proposer = self.factory.init(parent_header)
.into_inner()
.map_err(Into::into)
.map(|proposer| Proposer { proposer });
let maybe_proposer = self.service_handle
.clone()
.get_validation_instance(parent_hash)
.and_then(move |tracker| future::ready(proposer
.map_err(Into::into)
.map(|proposer| Proposer {
tracker,
slot_duration,
proposer,
})
));
Box::pin(maybe_proposer)
Box::pin(future::ready(proposer))
}
}
/// The Polkadot proposer logic.
pub struct Proposer<Client, TxPool: TransactionPool<Block=Block>, Backend> {
tracker: crate::validation_service::ValidationInstanceHandle,
slot_duration: u64,
proposer: sc_basic_authorship::Proposer<Backend, Block, Client, TxPool>,
}
impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where
TxPool: TransactionPool<Block=Block> + 'static,
Client: BlockBuilderProvider<Backend, Block, Client> + ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Client::Api: BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HashFor<Block>> + Send,
@@ -140,8 +112,11 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
type Transaction = sp_api::TransactionFor<Client, Block>;
type Proposal = Pin<
Box<
dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>>
+ Send
dyn Future<Output = Result<
Proposal<Block, sp_api::TransactionFor<Client, Block>>,
Self::Error,
>>
+ Send
>
>;
@@ -152,57 +127,17 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
max_duration: Duration,
record_proof: RecordProof,
) -> Self::Proposal {
const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates.
let initial_included = self.tracker.table().includable_count();
let now = Instant::now();
let dynamic_inclusion = DynamicInclusion::new(
self.tracker.table().num_parachains(),
self.tracker.started(),
Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
);
async move {
let enough_candidates = dynamic_inclusion.acceptable_in(
now,
initial_included,
).unwrap_or_else(|| Duration::from_millis(1));
let believed_timestamp = match inherent_data.timestamp_inherent_data() {
Ok(timestamp) => timestamp,
Err(e) => return Err(Error::InherentError(e)),
};
let deadline_diff = max_duration - max_duration / 3;
// set up delay until next allowed timestamp.
let current_timestamp = current_timestamp();
if current_timestamp < believed_timestamp {
Delay::new(Duration::from_millis(current_timestamp - believed_timestamp))
.await;
}
Delay::new(enough_candidates).await;
let proposed_candidates = self.tracker.table().proposed_set();
let mut inherent_data = inherent_data;
inherent_data.put_data(NEW_HEADS_IDENTIFIER, &proposed_candidates)
inherent_data.put_data(NEW_HEADS_IDENTIFIER, &Vec::<AttestedCandidate>::new())
.map_err(Error::InherentError)?;
self.proposer.propose(
inherent_data,
inherent_digests.clone(),
deadline_diff,
max_duration,
record_proof
).await.map_err(Into::into)
}.boxed()
}
}
fn current_timestamp() -> u64 {
time::SystemTime::now().duration_since(time::UNIX_EPOCH)
.expect("now always later than unix epoch; qed")
.as_millis() as u64
}
-117
View File
@@ -1,117 +0,0 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Validator-side view of collation.
//!
//! This module contains type definitions, a trait for a batch of collators, and a trait for
//! attempting to fetch a collation repeatedly until a valid one is obtained.
use std::sync::Arc;
use polkadot_primitives::v0::{
BlakeTwo256, Block, Hash, HashT,
CollatorId, ParachainHost, Id as ParaId, Collation, ErasureChunk, CollationInfo,
};
use polkadot_erasure_coding as erasure;
use sp_api::ProvideRuntimeApi;
use futures::prelude::*;
use log::debug;
use primitives::traits::SpawnNamed;
/// Encapsulates connections to collators and allows collation on any parachain.
///
/// This is expected to be a lightweight, shared type like an `Arc`.
pub trait Collators: Clone {
/// Errors when producing collations.
type Error: std::fmt::Debug;
/// A full collation.
type Collation: Future<Output=Result<Collation, Self::Error>>;
/// Collate on a specific parachain, building on a given relay chain parent hash.
///
/// The returned collation should be checked for basic validity in the signature
/// and will be checked for state-transition validity by the consumer of this trait.
///
/// This does not have to guarantee local availability, as a valid collation
/// will be passed to the `TableRouter` instance.
///
/// The returned future may be prematurely concluded if the `relay_parent` goes
/// out of date.
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation;
/// Note a bad collator. TODO: take proof (https://github.com/paritytech/polkadot/issues/217)
fn note_bad_collator(&self, collator: CollatorId);
}
/// A future which resolves when a collation is available.
pub async fn collation_fetch<C: Collators, P>(
validation_pool: Option<crate::pipeline::ValidationPool>,
parachain: ParaId,
relay_parent: Hash,
collators: C,
client: Arc<P>,
max_block_data_size: Option<u64>,
n_validators: usize,
spawner: impl SpawnNamed + Clone + 'static,
) -> Result<(CollationInfo, crate::pipeline::FullOutput), C::Error>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
C: Collators + Unpin,
P: ProvideRuntimeApi<Block>,
<C as Collators>::Collation: Unpin,
{
loop {
let collation = collators.collate(parachain, relay_parent).await?;
let Collation { info, pov } = collation;
let res = crate::pipeline::full_output_validation_with_api(
validation_pool.as_ref(),
&*client,
&info,
&pov,
&relay_parent,
max_block_data_size,
n_validators,
spawner.clone(),
);
match res {
Ok(full_output) => {
return Ok((info, full_output))
}
Err(e) => {
debug!("Failed to validate parachain due to API error: {}", e);
// just continue if we got a bad collation or failed to validate
collators.note_bad_collator(info.collator)
}
}
}
}
/// Validate an erasure chunk against an expected root.
pub fn validate_chunk(
root: &Hash,
chunk: &ErasureChunk,
) -> Result<(), ()> {
let expected = erasure::branch_hash(root, &chunk.proof, chunk.index as usize).map_err(|_| ())?;
let got = BlakeTwo256::hash(&chunk.chunk);
if expected != got {
return Err(())
}
Ok(())
}
@@ -1,130 +0,0 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Dynamic inclusion threshold over time.
use std::time::{Duration, Instant};
fn duration_to_micros(duration: &Duration) -> u64 {
duration.as_secs() * 1_000_000 + (duration.subsec_nanos() / 1000) as u64
}
/// Dynamic inclusion threshold over time.
///
/// The acceptable proportion of parachains which must have parachain candidates
/// reduces over time (eventually going to zero).
#[derive(Debug, Clone)]
pub struct DynamicInclusion {
start: Instant,
y: u64,
m: u64,
}
impl DynamicInclusion {
/// Constructs a new dynamic inclusion threshold calculator based on the time now,
/// how many parachain candidates are required at the beginning, and when an empty
/// block will be allowed.
pub fn new(initial: usize, start: Instant, allow_empty: Duration) -> Self {
// linear function f(n_candidates) -> valid after microseconds
// f(0) = allow_empty
// f(initial) = 0
// m is actually the negative slope to avoid using signed arithmetic.
let (y, m) = if initial != 0 {
let y = duration_to_micros(&allow_empty);
(y, y / initial as u64)
} else {
(0, 0)
};
DynamicInclusion {
start,
y,
m,
}
}
/// Returns the duration from `now` after which the amount of included parachain candidates
/// would be enough, or `None` if it is sufficient now.
///
/// Panics if `now` is earlier than the `start`.
pub fn acceptable_in(&self, now: Instant, included: usize) -> Option<Duration> {
let elapsed = now.duration_since(self.start);
let elapsed = duration_to_micros(&elapsed);
let valid_after = self.y.saturating_sub(self.m * included as u64);
if elapsed >= valid_after {
None
} else {
Some(Duration::from_millis((valid_after - elapsed) as u64 / 1000))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn full_immediately_allowed() {
let now = Instant::now();
let dynamic = DynamicInclusion::new(
10,
now,
Duration::from_millis(4000),
);
assert!(dynamic.acceptable_in(now, 10).is_none());
assert!(dynamic.acceptable_in(now, 11).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(2000), 10).is_none());
}
#[test]
fn half_allowed_halfway() {
let now = Instant::now();
let dynamic = DynamicInclusion::new(
10,
now,
Duration::from_millis(4000),
);
assert_eq!(dynamic.acceptable_in(now, 5), Some(Duration::from_millis(2000)));
assert!(dynamic.acceptable_in(now + Duration::from_millis(2000), 5).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(3000), 5).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(4000), 5).is_none());
}
#[test]
fn zero_initial_is_flat() {
let now = Instant::now();
let dynamic = DynamicInclusion::new(
0,
now,
Duration::from_secs(10_000),
);
for i in 0..10_001 {
let now = now + Duration::from_secs(i);
assert!(dynamic.acceptable_in(now, 0).is_none());
assert!(dynamic.acceptable_in(now, 1).is_none());
assert!(dynamic.acceptable_in(now, 10).is_none());
}
}
}
-62
View File
@@ -16,8 +16,6 @@
//! Errors that can occur during the validation process.
use polkadot_primitives::v0::{ValidatorId, Hash};
/// Error type for validation
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
@@ -25,66 +23,9 @@ pub enum Error {
Client(sp_blockchain::Error),
/// Consensus error
Consensus(consensus::error::Error),
/// A wasm-validation error.
WasmValidation(parachain::wasm_executor::ValidationError),
/// An I/O error.
Io(std::io::Error),
/// An error in the availability erasure-coding.
ErasureCoding(polkadot_erasure_coding::Error),
#[display(fmt = "Invalid duty roster length: expected {}, got {}", expected, got)]
InvalidDutyRosterLength {
/// Expected roster length
expected: usize,
/// Actual roster length
got: usize,
},
/// Local account not a validator at this block
#[display(fmt = "Local account ID ({:?}) not a validator at this block.", _0)]
NotValidator(ValidatorId),
/// Unexpected error checking inherents
#[display(fmt = "Unexpected error while checking inherents: {}", _0)]
InherentError(inherents::Error),
/// Proposer destroyed before finishing proposing or evaluating
#[display(fmt = "Proposer destroyed before finishing proposing or evaluating")]
PrematureDestruction,
/// Failed to build the table router.
#[display(fmt = "Failed to build the table router: {}", _0)]
CouldNotBuildTableRouter(String),
/// Timer failed
#[display(fmt = "Timer failed: {}", _0)]
Timer(std::io::Error),
#[display(fmt = "Failed to compute deadline of now + {:?}", _0)]
DeadlineComputeFailure(std::time::Duration),
#[display(fmt = "Validation service is down.")]
ValidationServiceDown,
/// PoV-block in collation doesn't match provided.
#[display(fmt = "PoV hash mismatch. Expected {:?}, got {:?}", _0, _1)]
PoVHashMismatch(Hash, Hash),
/// Collator signature is invalid.
#[display(fmt = "Invalid collator signature on collation")]
InvalidCollatorSignature,
/// Head-data too large.
#[display(fmt = "Head data size of {} exceeded maximum of {}", _0, _1)]
HeadDataTooLarge(usize, usize),
/// Head-data mismatch after validation.
#[display(fmt = "Validation produced a different parachain header")]
HeadDataMismatch,
/// Relay parent of candidate not allowed.
#[display(fmt = "Relay parent {} of candidate not allowed in this context.", _0)]
DisallowedRelayParent(Hash),
/// Commitments in candidate match commitments produced by validation.
#[display(fmt = "Commitments in candidate receipt do not match those produced by validation")]
CommitmentsMismatch,
/// The parachain for which validation work is being done is not active.
#[display(fmt = "Parachain {:?} is not active", _0)]
InactiveParachain(polkadot_primitives::v0::Id),
/// Block data is too big
#[display(fmt = "Block data is too big (maximum allowed size: {}, actual size: {})", size, max_size)]
BlockDataTooBig { size: u64, max_size: u64 },
Join(tokio::task::JoinError),
/// Could not cover fee for an operation e.g. for sending `UpwardMessage`.
#[display(fmt = "Parachain could not cover fee for an operation e.g. for sending an `UpwardMessage`.")]
CouldNotCoverFee,
}
impl std::error::Error for Error {
@@ -92,9 +33,6 @@ impl std::error::Error for Error {
match self {
Error::Client(ref err) => Some(err),
Error::Consensus(ref err) => Some(err),
Error::WasmValidation(ref err) => Some(err),
Error::ErasureCoding(ref err) => Some(err),
Error::Io(ref err) => Some(err),
_ => None,
}
}
-197
View File
@@ -29,207 +29,10 @@
//!
//! Groups themselves may be compromised by malicious authorities.
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use codec::Encode;
use polkadot_primitives::v0::{
Id as ParaId, Chain, DutyRoster, AbridgedCandidateReceipt,
CompactStatement as PrimitiveStatement,
PoVBlock, ErasureChunk, ValidatorSignature, ValidatorIndex,
ValidatorPair, ValidatorId, SigningContext,
};
use primitives::Pair;
use futures::prelude::*;
pub use self::block_production::ProposerFactory;
pub use self::collation::Collators;
pub use self::error::Error;
pub use self::shared_table::{
SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement,
GenericStatement,
};
pub use self::validation_service::{ServiceHandle, ServiceBuilder};
pub use parachain::wasm_executor::run_worker as run_validation_worker;
mod dynamic_inclusion;
mod error;
mod shared_table;
pub mod block_production;
pub mod collation;
pub mod pipeline;
pub mod validation_service;
/// A handle to a statement table router.
///
/// This is expected to be a lightweight, shared type like an `Arc`.
/// Once all instances are dropped, consensus networking for this router
/// should be cleaned up.
pub trait TableRouter: Clone {
/// Errors when fetching data from the network.
type Error: std::fmt::Debug;
/// Future that drives sending of the local collation to the network.
type SendLocalCollation: Future<Output=Result<(), Self::Error>>;
/// Future that resolves when candidate data is fetched.
type FetchValidationProof: Future<Output=Result<PoVBlock, Self::Error>>;
/// Call with local candidate data. This will sign, import, and broadcast a statement about the candidate.
fn local_collation(
&self,
receipt: AbridgedCandidateReceipt,
pov_block: PoVBlock,
chunks: (ValidatorIndex, &[ErasureChunk]),
) -> Self::SendLocalCollation;
/// Fetch validation proof for a specific candidate.
///
/// This future must conclude once all `Clone`s of this `TableRouter` have
/// been cleaned up.
fn fetch_pov_block(&self, candidate: &AbridgedCandidateReceipt) -> Self::FetchValidationProof;
}
/// A long-lived network which can create parachain statement and BFT message routing processes on demand.
pub trait Network {
/// The error type of asynchronously building the table router.
type Error: std::fmt::Debug;
/// The table router type. This should handle importing of any statements,
/// routing statements to peers, and driving completion of any `StatementProducers`.
type TableRouter: TableRouter;
/// The future used for asynchronously building the table router.
/// This should not fail.
type BuildTableRouter: Future<Output=Result<Self::TableRouter,Self::Error>>;
/// Instantiate a table router using the given shared table.
/// Also pass through any outgoing messages to be broadcast to peers.
#[must_use]
fn build_table_router(
&self,
table: Arc<SharedTable>,
authorities: &[ValidatorId],
) -> Self::BuildTableRouter;
}
/// The local duty of a validator.
#[derive(Debug)]
pub struct LocalDuty {
validation: Chain,
index: ValidatorIndex,
}
/// Information about a specific group.
#[derive(Debug, Clone, Default)]
pub struct GroupInfo {
/// Authorities meant to check validity of candidates.
validity_guarantors: HashSet<ValidatorId>,
/// Number of votes needed for validity.
needed_validity: usize,
}
/// Sign a table statement against a parent hash.
/// The actual message signed is the encoded statement concatenated with the
/// parent hash.
pub fn sign_table_statement(
statement: &Statement,
key: &ValidatorPair,
signing_context: &SigningContext,
) -> ValidatorSignature {
let mut encoded = PrimitiveStatement::from(statement).encode();
encoded.extend(signing_context.encode());
key.sign(&encoded)
}
/// Check signature on table statement.
pub fn check_statement(
statement: &Statement,
signature: &ValidatorSignature,
signer: ValidatorId,
signing_context: &SigningContext,
) -> bool {
use runtime_primitives::traits::AppVerify;
let mut encoded = PrimitiveStatement::from(statement).encode();
encoded.extend(signing_context.encode());
signature.verify(&encoded[..], &signer)
}
/// Compute group info out of a duty roster and a local authority set.
pub fn make_group_info(
roster: DutyRoster,
authorities: &[ValidatorId],
local_id: Option<ValidatorId>,
) -> Result<(HashMap<ParaId, GroupInfo>, Option<LocalDuty>), Error> {
if roster.validator_duty.len() != authorities.len() {
return Err(Error::InvalidDutyRosterLength {
expected: authorities.len(),
got: roster.validator_duty.len()
});
}
let mut local_validation = None;
let mut local_index = 0;
let mut map = HashMap::new();
let duty_iter = authorities.iter().zip(&roster.validator_duty);
for (i, (authority, v_duty)) in duty_iter.enumerate() {
if Some(authority) == local_id.as_ref() {
local_validation = Some(v_duty.clone());
local_index = i;
}
match *v_duty {
Chain::Relay => {}, // does nothing for now.
Chain::Parachain(ref id) => {
map.entry(id.clone()).or_insert_with(GroupInfo::default)
.validity_guarantors
.insert(authority.clone());
}
}
}
for live_group in map.values_mut() {
let validity_len = live_group.validity_guarantors.len();
live_group.needed_validity = validity_len / 2 + validity_len % 2;
}
let local_duty = local_validation.map(|v| LocalDuty {
validation: v,
index: local_index as u32,
});
Ok((map, local_duty))
}
#[cfg(test)]
mod tests {
use super::*;
use sp_keyring::Sr25519Keyring;
#[test]
fn sign_and_check_statement() {
let statement: Statement = GenericStatement::Valid([1; 32].into());
let parent_hash = [2; 32].into();
let signing_context = SigningContext {
session_index: Default::default(),
parent_hash,
};
let sig = sign_table_statement(&statement, &Sr25519Keyring::Alice.pair().into(), &signing_context);
let wrong_signing_context = SigningContext {
session_index: Default::default(),
parent_hash: [0xff; 32].into(),
};
assert!(check_statement(&statement, &sig, Sr25519Keyring::Alice.public().into(), &signing_context));
assert!(!check_statement(&statement, &sig, Sr25519Keyring::Alice.public().into(), &wrong_signing_context));
assert!(!check_statement(&statement, &sig, Sr25519Keyring::Bob.public().into(), &signing_context));
}
}
-362
View File
@@ -1,362 +0,0 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The pipeline of validation functions a parachain block must pass through before
//! it can be voted for.
use codec::Encode;
use polkadot_erasure_coding as erasure;
use polkadot_primitives::v0::{
CollationInfo, PoVBlock, LocalValidationData, GlobalValidationData, OmittedValidationData,
AvailableData, FeeSchedule, CandidateCommitments, ErasureChunk, ParachainHost,
Id as ParaId, AbridgedCandidateReceipt, ValidationCode,
};
use polkadot_primitives::v0::{Block, BlockId, Balance, Hash};
use parachain::{
wasm_executor::{self, ExecutionMode},
primitives::{UpwardMessage, ValidationParams},
};
use runtime_primitives::traits::{BlakeTwo256, Hash as HashT};
use sp_api::ProvideRuntimeApi;
use crate::Error;
use primitives::traits::SpawnNamed;
pub use parachain::wasm_executor::ValidationPool;
/// Does basic checks of a collation. Provide the encoded PoV-block.
pub fn basic_checks(
collation: &CollationInfo,
expected_relay_parent: &Hash,
max_block_data_size: Option<u64>,
encoded_pov: &[u8],
) -> Result<(), Error> {
if &collation.relay_parent != expected_relay_parent {
return Err(Error::DisallowedRelayParent(collation.relay_parent));
}
if let Some(max_size) = max_block_data_size {
if encoded_pov.len() as u64 > max_size {
return Err(Error::BlockDataTooBig { size: encoded_pov.len() as _, max_size });
}
}
let hash = BlakeTwo256::hash(encoded_pov);
if hash != collation.pov_block_hash {
return Err(Error::PoVHashMismatch(collation.pov_block_hash, hash));
}
if let Err(()) = collation.check_signature() {
return Err(Error::InvalidCollatorSignature);
}
Ok(())
}
/// Data from a fully-outputted validation of a parachain candidate. This contains
/// all outputs and commitments of the validation as well as all additional data to make available.
pub struct FullOutput {
/// Data about the candidate to keep available in the network.
pub available_data: AvailableData,
/// Commitments issued alongside the candidate to be placed on-chain.
pub commitments: CandidateCommitments,
/// All erasure-chunks associated with the available data. Each validator
/// should keep their chunk (by index). Other chunks do not need to be
/// kept available long-term, but should be distributed to other validators.
pub erasure_chunks: Vec<ErasureChunk>,
/// The number of validators that were present at this validation.
pub n_validators: usize,
}
impl FullOutput {
/// Check consistency of the outputs produced by the validation pipeline against
/// data contained within a candidate receipt.
pub fn check_consistency(&self, receipt: &AbridgedCandidateReceipt) -> Result<(), Error> {
if self.commitments != receipt.commitments {
Err(Error::CommitmentsMismatch)
} else {
Ok(())
}
}
}
/// The successful result of validating a collation. If the full commitments of the
/// validation are needed, call `full_output`. Otherwise, safely drop this value.
pub struct ValidatedCandidate<'a> {
pov_block: &'a PoVBlock,
global_validation: &'a GlobalValidationData,
local_validation: &'a LocalValidationData,
upward_messages: Vec<UpwardMessage>,
fees: Balance,
processed_downward_messages: u32,
}
impl<'a> ValidatedCandidate<'a> {
/// Fully-compute the commitments and outputs of the candidate. Provide the number
/// of validators. This computes the erasure-coding.
pub fn full_output(self, n_validators: usize) -> Result<FullOutput, Error> {
let ValidatedCandidate {
pov_block,
global_validation,
local_validation,
upward_messages,
fees,
processed_downward_messages,
} = self;
let omitted_validation = OmittedValidationData {
global_validation: global_validation.clone(),
local_validation: local_validation.clone(),
};
let available_data = AvailableData {
pov_block: pov_block.clone(),
omitted_validation,
};
let erasure_chunks = erasure::obtain_chunks_v0(
n_validators,
&available_data,
)?;
let branches = erasure::branches(erasure_chunks.as_ref());
let erasure_root = branches.root();
let chunks: Vec<_> = erasure_chunks
.iter()
.zip(branches.map(|(proof, _)| proof))
.enumerate()
.map(|(index, (chunk, proof))| ErasureChunk {
// branches borrows the original chunks, but this clone could probably be dodged.
chunk: chunk.clone(),
index: index as u32,
proof,
})
.collect();
let commitments = CandidateCommitments {
upward_messages,
fees,
erasure_root,
new_validation_code: None,
processed_downward_messages,
};
Ok(FullOutput {
available_data,
commitments,
erasure_chunks: chunks,
n_validators,
})
}
}
/// Validate that the given `UpwardMessage`s are covered by the given `free_balance`.
///
/// Will return an error if the `free_balance` does not cover the required fees to the
/// given `msgs`. On success it returns the fees that need to be charged for the `msgs`.
fn validate_upward_messages(
msgs: &[UpwardMessage],
fee_schedule: FeeSchedule,
free_balance: Balance,
) -> Result<Balance, Error> {
msgs.iter().try_fold(Balance::from(0u128), |fees_charged, msg| {
let fees = fee_schedule.compute_message_fee(msg.data.len());
let fees_charged = fees_charged.saturating_add(fees);
if fees_charged > free_balance {
Err(Error::CouldNotCoverFee)
} else {
Ok(fees_charged)
}
})
}
/// Does full checks of a collation, with provided PoV-block and contextual data.
pub fn validate<'a>(
validation_pool: Option<&'_ ValidationPool>,
collation: &'a CollationInfo,
pov_block: &'a PoVBlock,
local_validation: &'a LocalValidationData,
global_validation: &'a GlobalValidationData,
validation_code: &ValidationCode,
spawner: impl SpawnNamed + 'static,
) -> Result<ValidatedCandidate<'a>, Error> {
if collation.head_data.0.len() > global_validation.max_head_data_size as _ {
return Err(Error::HeadDataTooLarge(
collation.head_data.0.len(),
global_validation.max_head_data_size as _,
));
}
let params = ValidationParams {
parent_head: local_validation.parent_head.clone(),
block_data: pov_block.block_data.clone(),
relay_chain_height: global_validation.block_number,
hrmp_mqc_heads: Vec::new(),
};
// TODO: remove when ext does not do this.
let fee_schedule = FeeSchedule {
base: 0,
per_byte: 0,
};
let execution_mode = validation_pool
.map(ExecutionMode::Remote)
.unwrap_or(ExecutionMode::Local);
match wasm_executor::validate_candidate(
&validation_code.0,
params,
execution_mode,
spawner,
) {
Ok(result) => {
if result.head_data == collation.head_data {
let fees = validate_upward_messages(
&result.upward_messages,
fee_schedule,
local_validation.balance,
)?;
Ok(ValidatedCandidate {
pov_block,
global_validation,
local_validation,
upward_messages: result.upward_messages,
fees,
processed_downward_messages: result.processed_downward_messages,
})
} else {
Err(Error::HeadDataMismatch)
}
}
Err(e) => Err(e.into()),
}
}
/// Extracts validation parameters from a Polkadot runtime API for a specific parachain.
pub fn validation_params<P>(api: &P, relay_parent: Hash, para_id: ParaId)
-> Result<(LocalValidationData, GlobalValidationData, ValidationCode), Error>
where
P: ProvideRuntimeApi<Block>,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
{
let api = api.runtime_api();
let relay_parent = BlockId::hash(relay_parent);
// fetch all necessary data from runtime.
let local_validation = api.local_validation_data(&relay_parent, para_id)?
.ok_or_else(|| Error::InactiveParachain(para_id))?;
let global_validation = api.global_validation_data(&relay_parent)?;
let validation_code = api.parachain_code(&relay_parent, para_id)?
.ok_or_else(|| Error::InactiveParachain(para_id))?;
Ok((local_validation, global_validation, validation_code))
}
/// Does full-pipeline validation of a collation with provided contextual parameters.
pub fn full_output_validation_with_api<P>(
validation_pool: Option<&ValidationPool>,
api: &P,
collation: &CollationInfo,
pov_block: &PoVBlock,
expected_relay_parent: &Hash,
max_block_data_size: Option<u64>,
n_validators: usize,
spawner: impl SpawnNamed + 'static,
) -> Result<FullOutput, Error> where
P: ProvideRuntimeApi<Block>,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
{
let para_id = collation.parachain_index;
let (local_validation, global_validation, validation_code)
= validation_params(&*api, collation.relay_parent, para_id)?;
// put the parameters through the validation pipeline, producing
// erasure chunks.
let encoded_pov = pov_block.encode();
basic_checks(
&collation,
&expected_relay_parent,
max_block_data_size,
&encoded_pov,
)
.and_then(|()| {
let res = validate(
validation_pool,
&collation,
&pov_block,
&local_validation,
&global_validation,
&validation_code,
spawner,
);
match res {
Err(ref err) => log::debug!(
target: "validation",
"Failed to validate PoVBlock for parachain ({}): {:?}",
para_id,
err,
),
Ok(_) => log::debug!(
target: "validation",
"Successfully validated PoVBlock for parachain ({}).",
para_id,
),
}
res
})
.and_then(|validated| validated.full_output(n_validators))
}
#[cfg(test)]
mod tests {
use super::*;
use parachain::primitives::ParachainDispatchOrigin;
fn add_msg(size: usize, msgs: &mut Vec<UpwardMessage>) {
let msg = UpwardMessage { data: vec![0; size], origin: ParachainDispatchOrigin::Parachain };
msgs.push(msg);
}
#[test]
fn validate_upward_messages_works() {
let fee_schedule = FeeSchedule {
base: 1000,
per_byte: 10,
};
let free_balance = 1_000_000;
let mut msgs = Vec::new();
add_msg(100, &mut msgs);
assert_eq!(2000, validate_upward_messages(&msgs, fee_schedule, free_balance).unwrap());
add_msg(100, &mut msgs);
assert_eq!(4000, validate_upward_messages(&msgs, fee_schedule, free_balance).unwrap());
add_msg((1_000_000 - 4000 - 1000) / 10, &mut msgs);
assert_eq!(1_000_000, validate_upward_messages(&msgs, fee_schedule, free_balance).unwrap());
// cannot pay fee.
add_msg(1, &mut msgs);
let err = validate_upward_messages(&msgs, fee_schedule, free_balance).unwrap_err();
assert!(matches!(err, Error::CouldNotCoverFee));
}
}
@@ -1,122 +0,0 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Implements a future which resolves when all of the candidates referenced are includable.
use std::collections::HashMap;
use futures::channel::oneshot;
use polkadot_primitives::v0::Hash;
/// Track includability of a set of candidates,
pub(super) fn track<I: IntoIterator<Item=(Hash, bool)>>(candidates: I)
-> (IncludabilitySender, oneshot::Receiver<()>) {
let (tx, rx) = oneshot::channel();
let tracking: HashMap<_, _> = candidates.into_iter().collect();
let includable_count = tracking.values().filter(|x| **x).count();
let mut sender = IncludabilitySender {
tracking,
includable_count,
sender: Some(tx),
};
sender.try_complete();
(sender, rx)
}
/// The sending end of the includability sender.
pub(super) struct IncludabilitySender {
tracking: HashMap<Hash, bool>,
includable_count: usize,
sender: Option<oneshot::Sender<()>>,
}
impl IncludabilitySender {
/// update the inner candidate. wakes up the task as necessary.
/// returns `Err(Canceled)` if the other end has hung up.
///
/// returns `true` when this is completed and should be destroyed.
pub fn update_candidate(&mut self, candidate: Hash, includable: bool) -> bool {
use std::collections::hash_map::Entry;
match self.tracking.entry(candidate) {
Entry::Vacant(_) => {}
Entry::Occupied(mut entry) => {
let old = entry.insert(includable);
if !old && includable {
self.includable_count += 1;
} else if old && !includable {
self.includable_count -= 1;
}
}
}
self.try_complete()
}
/// whether the sender is completed.
pub fn is_complete(&self) -> bool {
self.sender.is_none()
}
fn try_complete(&mut self) -> bool {
if self.includable_count == self.tracking.len() {
if let Some(sender) = self.sender.take() {
let _ = sender.send(());
}
true
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::block_on;
#[test]
fn it_works() {
let hash1 = [1; 32].into();
let hash2 = [2; 32].into();
let hash3 = [3; 32].into();
let (mut sender, recv) = track([
(hash1, true),
(hash2, true),
(hash2, false), // overwrite should favor latter.
(hash3, true),
].iter().cloned());
assert!(!sender.is_complete());
// true -> false transition is possible and should be handled.
sender.update_candidate(hash1, false);
assert!(!sender.is_complete());
sender.update_candidate(hash2, true);
assert!(!sender.is_complete());
sender.update_candidate(hash1, true);
assert!(sender.is_complete());
block_on(recv).unwrap();
}
}
File diff suppressed because it is too large Load Diff
@@ -1,802 +0,0 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The validation service is a long-running future that creates and manages parachain attestation
//! instances.
//!
//! As soon as we import a new chain head, we start a parachain attestation session on top of it.
//! The block authorship service may want access to the attestation session, and for that reason
//! we expose a `ServiceHandle` which can be used to request a copy of it.
//!
//! In fact, the import notification and request from the block production pipeline may race to be
//! the first one to create the instant, but the import notification will usually win.
//!
//! These attestation sessions are kept live until they are periodically garbage-collected.
use std::{time::{Duration, Instant}, sync::Arc, pin::Pin, collections::HashMap};
use crate::pipeline::FullOutput;
use sc_client_api::{BlockchainEvents, BlockBackend};
use consensus::SelectChain;
use futures::prelude::*;
use polkadot_primitives::v0::{
Block, Hash, BlockId,
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
CollationInfo, SigningContext,
};
use keystore::KeyStorePtr;
use sp_api::{ProvideRuntimeApi, ApiExt};
use runtime_primitives::traits::HashFor;
use availability_store::Store as AvailabilityStore;
use primitives::traits::SpawnNamed;
use ansi_term::Colour;
use log::{warn, info, debug, trace};
use super::{Network, Collators, SharedTable, TableRouter};
use crate::Error;
use crate::pipeline::ValidationPool;
// Remote processes may request for a validation instance to be cloned or instantiated.
// They send a oneshot channel.
type ValidationInstanceRequest = (
Hash,
futures::channel::oneshot::Sender<Result<ValidationInstanceHandle, Error>>,
);
/// A handle to a single instance of parachain validation, which is pinned to
/// a specific relay-chain block. This is the instance that should be used when
/// constructing any
#[derive(Clone)]
pub(crate) struct ValidationInstanceHandle {
table: Arc<SharedTable>,
started: Instant,
}
impl ValidationInstanceHandle {
/// Access the underlying table of attestations on parachain candidates.
pub(crate) fn table(&self) -> &Arc<SharedTable> {
&self.table
}
/// The moment we started this validation instance.
pub(crate) fn started(&self) -> Instant {
self.started.clone()
}
}
/// A handle to the service. This can be used to create a block-production environment.
#[derive(Clone)]
pub struct ServiceHandle {
sender: futures::channel::mpsc::Sender<ValidationInstanceRequest>,
}
impl ServiceHandle {
/// Requests instantiation or cloning of a validation instance from the service.
///
/// This can fail if the service task has shut down for some reason.
pub(crate) async fn get_validation_instance(self, relay_parent: Hash)
-> Result<ValidationInstanceHandle, Error>
{
let mut sender = self.sender;
let instance_rx = loop {
let (instance_tx, instance_rx) = futures::channel::oneshot::channel();
match sender.send((relay_parent, instance_tx)).await {
Ok(()) => break instance_rx,
Err(e) => if !e.is_full() {
// Sink::send should be doing `poll_ready` before start-send,
// so this should only happen when there is a race.
return Err(Error::ValidationServiceDown)
},
}
};
instance_rx.map_err(|_| Error::ValidationServiceDown).await.and_then(|x| x)
}
}
fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin {
stream::unfold((), move |_| {
futures_timer::Delay::new(duration).map(|_| Some(((), ())))
}).map(drop)
}
/// A builder for the validation service.
pub struct ServiceBuilder<C, N, P, SC, SP> {
/// The underlying blockchain client.
pub client: Arc<P>,
/// A handle to the network object used to communicate.
pub network: N,
/// A handle to the collator pool we are using.
pub collators: C,
/// A handle to a background executor.
pub spawner: SP,
/// A handle to the availability store.
pub availability_store: AvailabilityStore,
/// A chain selector for determining active leaves in the block-DAG.
pub select_chain: SC,
/// The keystore which holds the signing keys.
pub keystore: KeyStorePtr,
/// The maximum block-data size in bytes.
pub max_block_data_size: Option<u64>,
}
impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents<Block> + BlockBackend<Block>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static + Sync,
N::BuildTableRouter: Send + Unpin + 'static,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
SC: SelectChain<Block> + 'static,
SP: SpawnNamed + Clone + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
{
/// Build the service - this consists of a handle to it, as well as a background
/// future to be run to completion.
pub fn build(self) -> (ServiceHandle, impl Future<Output = ()> + Send + 'static) {
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
const CHAN_BUFFER: usize = 10;
enum Message {
CollectGarbage,
// relay-parent, receiver for instance.
RequestInstance(ValidationInstanceRequest),
// new chain heads - import notification.
NotifyImport(sc_client_api::BlockImportNotification<Block>),
}
let validation_pool = Some(ValidationPool::new());
let mut parachain_validation = ParachainValidationInstances {
client: self.client.clone(),
network: self.network,
spawner: self.spawner.clone(),
availability_store: self.availability_store,
live_instances: HashMap::new(),
validation_pool: validation_pool.clone(),
collation_fetch: DefaultCollationFetch {
collators: self.collators,
validation_pool,
spawner: self.spawner,
},
};
let client = self.client;
let select_chain = self.select_chain;
let keystore = self.keystore;
let max_block_data_size = self.max_block_data_size;
let (tx, rx) = futures::channel::mpsc::channel(CHAN_BUFFER);
let interval = interval(TIMER_INTERVAL).map(|_| Message::CollectGarbage);
let import_notifications = client.import_notification_stream().map(Message::NotifyImport);
let instance_requests = rx.map(Message::RequestInstance);
let service = ServiceHandle { sender: tx };
let background_work = async move {
let message_stream = futures::stream::select(interval, instance_requests);
let mut message_stream = futures::stream::select(import_notifications, message_stream);
while let Some(message) = message_stream.next().await {
match message {
Message::CollectGarbage => {
match select_chain.leaves() {
Ok(leaves) => {
parachain_validation.retain(|h| leaves.contains(h));
}
Err(e) => {
warn!("Error fetching leaves from client: {:?}", e);
}
}
}
Message::RequestInstance((relay_parent, sender)) => {
// Upstream will handle the failure case.
let _ = sender.send(parachain_validation.get_or_instantiate(
relay_parent,
&keystore,
max_block_data_size,
).await);
}
Message::NotifyImport(notification) => {
let relay_parent = notification.hash;
if notification.is_new_best {
let res = parachain_validation.get_or_instantiate(
relay_parent,
&keystore,
max_block_data_size,
).await;
if let Err(e) = res {
warn!(
"Unable to start parachain validation on top of {:?}: {}",
relay_parent, e
);
}
}
}
}
}
};
(service, background_work)
}
}
/// Abstraction over `collation_fetch`.
pub(crate) trait CollationFetch {
/// Error type used by `collation_fetch`.
type Error: std::fmt::Debug;
/// Fetch a collation for the given `parachain`.
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
client: Arc<P>,
max_block_data_size: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static;
}
#[derive(Clone)]
struct DefaultCollationFetch<C, S> {
collators: C,
validation_pool: Option<ValidationPool>,
spawner: S,
}
impl<C, S> CollationFetch for DefaultCollationFetch<C, S>
where
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
S: SpawnNamed + Clone + 'static,
{
type Error = C::Error;
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
client: Arc<P>,
max_block_data_size: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
{
crate::collation::collation_fetch(
self.validation_pool,
parachain,
relay_parent,
self.collators,
client,
max_block_data_size,
n_validators,
self.spawner,
).boxed()
}
}
// finds the first key we are capable of signing with out of the given set of validators,
// if any.
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
let keystore = keystore.read();
validators.iter()
.find_map(|v| {
keystore.key_pair::<ValidatorPair>(&v).ok()
})
.map(|pair| Arc::new(pair))
}
/// A live instance that is related to a relay chain validation round.
///
/// It stores the `instance_handle` and the `_table_router`.
struct LiveInstance<TR> {
instance_handle: ValidationInstanceHandle,
/// Make sure we keep the table router alive, to respond/receive consensus messages.
_table_router: TR,
}
/// Constructs parachain-agreement instances.
pub(crate) struct ParachainValidationInstances<N: Network, P, SP, CF> {
/// The client instance.
client: Arc<P>,
/// The backing network handle.
network: N,
/// handle to spawner
spawner: SP,
/// Store for extrinsic data.
availability_store: AvailabilityStore,
/// Live agreements. Maps relay chain parent hashes to attestation
/// instances.
live_instances: HashMap<Hash, LiveInstance<N::TableRouter>>,
/// The underlying validation pool of processes to use.
/// Only `None` in tests.
validation_pool: Option<ValidationPool>,
/// Used to fetch a collation.
collation_fetch: CF,
}
impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
N: Network,
N::Error: 'static,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N::TableRouter: Send + 'static + Sync,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
N::BuildTableRouter: Unpin + Send + 'static,
SP: SpawnNamed + Send + 'static,
CF: CollationFetch + Clone + Send + Sync + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
{
/// Get an attestation table for given parent hash.
///
/// This starts a parachain agreement process on top of the parent hash if
/// one has not already started.
///
/// Additionally, this will trigger broadcast of data to the new block's duty
/// roster.
async fn get_or_instantiate(
&mut self,
parent_hash: Hash,
keystore: &KeyStorePtr,
max_block_data_size: Option<u64>,
) -> Result<ValidationInstanceHandle, Error> {
use primitives::Pair;
if let Some(instance) = self.live_instances.get(&parent_hash) {
return Ok(instance.instance_handle.clone());
}
let id = BlockId::hash(parent_hash);
let validators = self.client.runtime_api().validators(&id)?;
let sign_with = signing_key(&validators[..], keystore);
let duty_roster = self.client.runtime_api().duty_roster(&id)?;
let (group_info, local_duty) = crate::make_group_info(
duty_roster,
&validators,
sign_with.as_ref().map(|k| k.public()),
)?;
if let Some(ref duty) = local_duty {
info!(
"✍️ Starting parachain attestation session (parent: {}) with active duty {}",
parent_hash,
Colour::Red.bold().paint(format!("{:?}", duty)),
);
} else {
debug!(
"✍️ Starting parachain attestation session (parent: {}). No local duty..",
parent_hash,
);
}
let active_parachains = self.client.runtime_api().active_parachains(&id)?;
debug!(target: "validation", "Active parachains: {:?}", active_parachains);
// If we are a validator, we need to store our index in this round in availability store.
// This will tell which erasure chunk we should store.
if let Some(ref local_duty) = local_duty {
if let Err(e) = self.availability_store.note_validator_index_and_n_validators(
&parent_hash,
local_duty.index,
validators.len() as u32,
) {
warn!(
target: "validation",
"Failed to add validator index and n_validators to the availability-store: {:?}", e
)
}
}
let api = self.client.runtime_api();
let signing_context = if api.has_api_with::<dyn ParachainHost<Block, Error = ()>, _>(
&BlockId::hash(parent_hash),
|version| version >= 3,
)? {
api.signing_context(&id)?
} else {
trace!(
target: "validation",
"Expected runtime with ParachainHost version >= 3",
);
SigningContext {
session_index: 0,
parent_hash,
}
};
let table = Arc::new(SharedTable::new(
validators.clone(),
group_info,
sign_with,
signing_context,
self.availability_store.clone(),
max_block_data_size,
self.validation_pool.clone(),
));
// The router will join the consensus gossip network. This is important
// to receive messages sent for the current round.
let router = match self.network.build_table_router(
table.clone(),
&validators,
).await {
Ok(res) => res,
Err(e) => {
warn!(target: "validation", "Failed to build router: {:?}", e);
return Err(Error::CouldNotBuildTableRouter(format!("{:?}", e)))
}
};
if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) {
let n_validators = validators.len();
let availability_store = self.availability_store.clone();
let client = self.client.clone();
let collation_fetch = self.collation_fetch.clone();
let router = router.clone();
self.spawner.spawn(
"polkadot-parachain-validation-work",
launch_work(
move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
availability_store,
router,
n_validators,
index,
).boxed(),
);
}
let tracker = ValidationInstanceHandle {
table,
started: Instant::now(),
};
let live_instance = LiveInstance {
instance_handle: tracker.clone(),
_table_router: router,
};
self.live_instances.insert(parent_hash, live_instance);
Ok(tracker)
}
/// Retain validation sessions matching predicate.
fn retain<F: FnMut(&Hash) -> bool>(&mut self, mut pred: F) {
self.live_instances.retain(|k, _| pred(k))
}
}
// launch parachain work asynchronously.
async fn launch_work<CFF, E>(
collation_fetch: impl FnOnce() -> CFF,
availability_store: AvailabilityStore,
router: impl TableRouter,
n_validators: usize,
local_id: ValidatorIndex,
) where
E: std::fmt::Debug,
CFF: Future<Output = Result<(CollationInfo, FullOutput), E>> + Send,
{
// fetch a local collation from connected collators.
let (collation_info, full_output) = match collation_fetch().await {
Ok(res) => res,
Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
return
}
};
let crate::pipeline::FullOutput {
commitments,
erasure_chunks,
available_data,
..
} = full_output;
let receipt = collation_info.into_receipt(commitments);
let pov_block = available_data.pov_block.clone();
if let Err(e) = availability_store.make_available(
receipt.hash(),
available_data,
).await {
warn!(
target: "validation",
"Failed to make parachain block data available: {}",
e,
);
}
if let Err(e) = availability_store.clone().add_erasure_chunks(
receipt.clone(),
n_validators as _,
erasure_chunks.clone(),
).await {
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
}
if let Err(e) = router.local_collation(
receipt,
pov_block,
(local_id, &erasure_chunks),
).await {
warn!(target: "validation", "Failed to send local collation: {:?}", e);
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor, future::ready, channel::mpsc};
use availability_store::ErasureNetworking;
use polkadot_primitives::v0::{
PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex,
CollationInfo, DutyRoster, GlobalValidationData, LocalValidationData,
Retriable, CollatorId, BlockData, Chain, AvailableData, SigningContext, ValidationCode,
};
use runtime_primitives::traits::Block as BlockT;
use std::pin::Pin;
use sp_keyring::sr25519::Keyring;
use primitives::testing::TaskExecutor;
/// Events fired while running mock implementations to follow execution.
enum Events {
BuildTableRouter,
CollationFetch,
LocalCollation,
}
#[derive(Clone)]
struct MockNetwork(mpsc::UnboundedSender<Events>);
impl Network for MockNetwork {
type Error = String;
type TableRouter = MockTableRouter;
type BuildTableRouter = Pin<Box<dyn Future<Output = Result<MockTableRouter, String>> + Send>>;
fn build_table_router(
&self,
_: Arc<SharedTable>,
_: &[ValidatorId],
) -> Self::BuildTableRouter {
let event_sender = self.0.clone();
async move {
event_sender.unbounded_send(Events::BuildTableRouter).expect("Send `BuildTableRouter`");
Ok(MockTableRouter(event_sender))
}.boxed()
}
}
#[derive(Clone)]
struct MockTableRouter(mpsc::UnboundedSender<Events>);
impl TableRouter for MockTableRouter {
type Error = String;
type SendLocalCollation = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
type FetchValidationProof = Box<dyn Future<Output = Result<PoVBlock, String>> + Unpin>;
fn local_collation(
&self,
_: AbridgedCandidateReceipt,
_: PoVBlock,
_: (ValidatorIndex, &[ErasureChunk]),
) -> Self::SendLocalCollation {
let sender = self.0.clone();
async move {
sender.unbounded_send(Events::LocalCollation).expect("Send `LocalCollation`");
Ok(())
}.boxed()
}
fn fetch_pov_block(&self, _: &AbridgedCandidateReceipt) -> Self::FetchValidationProof {
unimplemented!("Not required in tests")
}
}
#[derive(Clone)]
struct MockErasureNetworking;
impl ErasureNetworking for MockErasureNetworking {
type Error = String;
fn fetch_erasure_chunk(
&self,
_: &Hash,
_: u32,
) -> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>> {
ready(Err("Not required in tests".to_string())).boxed()
}
fn distribute_erasure_chunk(&self, _: Hash, _: ErasureChunk) {
unimplemented!("Not required in tests")
}
}
#[derive(Clone)]
struct MockCollationFetch(mpsc::UnboundedSender<Events>);
impl CollationFetch for MockCollationFetch {
type Error = ();
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
_: Arc<P>,
_: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), ()>> + Send>> {
let info = CollationInfo {
parachain_index: parachain,
relay_parent,
collator: Default::default(),
signature: Default::default(),
head_data: Default::default(),
pov_block_hash: Default::default(),
};
let available_data = AvailableData {
pov_block: PoVBlock { block_data: BlockData(Vec::new()) },
omitted_validation: Default::default(),
};
let full_output = FullOutput {
available_data,
commitments: Default::default(),
erasure_chunks: Default::default(),
n_validators,
};
let sender = self.0;
async move {
sender.unbounded_send(Events::CollationFetch).expect("`CollationFetch` event send");
Ok((info, full_output))
}.boxed()
}
}
#[derive(Clone)]
struct MockRuntimeApi {
validators: Vec<ValidatorId>,
duty_roster: DutyRoster,
}
impl ProvideRuntimeApi<Block> for MockRuntimeApi {
type Api = Self;
fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
self.clone().into()
}
}
sp_api::mock_impl_runtime_apis! {
impl ParachainHost<Block> for MockRuntimeApi {
type Error = sp_blockchain::Error;
fn validators(&self) -> Vec<ValidatorId> { self.validators.clone() }
fn duty_roster(&self) -> DutyRoster { self.duty_roster.clone() }
fn active_parachains() -> Vec<(ParaId, Option<(CollatorId, Retriable)>)> { vec![(ParaId::from(1), None)] }
fn global_validation_data() -> GlobalValidationData { Default::default() }
fn local_validation_data(_: ParaId) -> Option<LocalValidationData> { None }
fn parachain_code(_: ParaId) -> Option<ValidationCode> { None }
fn get_heads(_: Vec<<Block as BlockT>::Extrinsic>) -> Option<Vec<AbridgedCandidateReceipt>> {
None
}
fn signing_context() -> SigningContext {
Default::default()
}
fn downward_messages(_: ParaId) -> Vec<polkadot_primitives::v0::DownwardMessage> {
Vec::new()
}
}
}
#[test]
fn launch_work_is_executed_properly() {
let executor = TaskExecutor::new();
let keystore = keystore::Store::new_in_memory();
// Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator.
keystore.write().insert_ephemeral_from_seed::<ValidatorPair>(&Keyring::Bob.to_seed())
.expect("Insert key into keystore");
let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())];
let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())];
let duty_roster = DutyRoster { validator_duty };
let (events_sender, events) = mpsc::unbounded();
let mut parachain_validation = ParachainValidationInstances {
client: Arc::new(MockRuntimeApi { validators, duty_roster }),
network: MockNetwork(events_sender.clone()),
collation_fetch: MockCollationFetch(events_sender.clone()),
spawner: executor.clone(),
availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking),
live_instances: HashMap::new(),
validation_pool: None,
};
executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None))
.expect("Creates new validation round");
assert!(parachain_validation.live_instances.contains_key(&Default::default()));
let mut events = executor::block_on_stream(events);
assert!(matches!(events.next().unwrap(), Events::BuildTableRouter));
assert!(matches!(events.next().unwrap(), Events::CollationFetch));
assert!(matches!(events.next().unwrap(), Events::LocalCollation));
drop(events_sender);
drop(parachain_validation);
assert!(events.next().is_none());
}
#[test]
fn router_is_built_on_relay_chain_validator() {
let executor = TaskExecutor::new();
let keystore = keystore::Store::new_in_memory();
// Make sure `Alice` key is in the keystore, so this mocked node will be a relay-chain validator.
keystore.write().insert_ephemeral_from_seed::<ValidatorPair>(&Keyring::Alice.to_seed())
.expect("Insert key into keystore");
let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())];
let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())];
let duty_roster = DutyRoster { validator_duty };
let (events_sender, events) = mpsc::unbounded();
let mut parachain_validation = ParachainValidationInstances {
client: Arc::new(MockRuntimeApi { validators, duty_roster }),
network: MockNetwork(events_sender.clone()),
collation_fetch: MockCollationFetch(events_sender.clone()),
spawner: executor.clone(),
availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking),
live_instances: HashMap::new(),
validation_pool: None,
};
executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None))
.expect("Creates new validation round");
assert!(parachain_validation.live_instances.contains_key(&Default::default()));
let mut events = executor::block_on_stream(events);
assert!(matches!(events.next().unwrap(), Events::BuildTableRouter));
drop(events_sender);
drop(parachain_validation);
assert!(events.next().is_none());
}
}