Minimal parachain framework part 1 (#113)

* dynamic inclusion threshold calculator

* collators interface

* collation helpers

* initial proposal-creation future

* create proposer when asked to propose

* remove local_availability duty

* statement table tracks includable parachain count

* beginnings of timing future

* finish proposal logic

* remove stray println

* extract shared table to separate module

* change ordering

* includability tracking

* fix doc

* initial changes to parachains module

* initialise dummy block before API calls

* give polkadot control over round proposer based on random seed

* propose only after enough candidates

* flesh out parachains module a bit more

* set_heads

* actually introduce set_heads to runtime

* update block_builder to accept parachains

* split block validity errors from real errors in evaluation

* update WASM runtimes

* polkadot-api methods for parachains additions

* delay evaluation until candidates are ready

* comments

* fix dynamic inclusion with zero initial

* test for includability tracker

* wasm validation of parachain candidates

* move primitives to primitives crate

* remove runtime-std dependency from codec

* adjust doc

* polkadot-parachain-primitives

* kill legacy polkadot-validator crate

* basic-add test chain

* test for basic_add parachain

* move to test-chains dir

* use wasm-build

* new wasm directory layout

* reorganize a bit more

* Fix for rh-minimal-parachain (#141)

* Remove extern "C"

We already encountered such behavior (bug?) in pwasm-std, I believe.

* Fix `panic_fmt` signature by adding `_col`

Wrong `panic_fmt` signature can inhibit some optimizations in LTO mode.

* Add linker flags and use wasm-gc in build script

Pass --import-memory to LLD to emit wasm binary with imported memory.

Also use wasm-gc instead of wasm-build.

* Fix effective_max.

I'm not sure why it was the way it was actually.

* Recompile wasm.

* Fix indent

* more basic_add tests

* validate parachain WASM

* produce statements on receiving statements

* tests for reactive statement production

* fix build

* add OOM lang item to runtime-io

* use dynamic_inclusion when evaluating as well

* fix update_includable_count

* remove dead code

* grumbles

* actually defer round_proposer logic

* update wasm

* address a few more grumbles

* grumbles

* update WASM checkins

* remove dependency on tokio-timer
This commit is contained in:
Robert Habermeier
2018-05-25 16:16:01 +02:00
committed by GitHub
parent 59e8adb604
commit c473c0c734
41 changed files with 2769 additions and 872 deletions
+1
View File
@@ -13,6 +13,7 @@ log = "0.3"
exit-future = "0.1"
polkadot-api = { path = "../api" }
polkadot-collator = { path = "../collator" }
polkadot-parachain = { path = "../parachain" }
polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" }
polkadot-statement-table = { path = "../statement-table" }
+176
View File
@@ -0,0 +1,176 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Validator-side view of collation.
//!
//! This module contains type definitions, a trait for a batch of collators, and a trait for
//! attempting to fetch a collation repeatedly until a valid one is obtained.
use std::sync::Arc;
use polkadot_api::PolkadotApi;
use polkadot_primitives::{Hash, AccountId};
use polkadot_primitives::parachain::{Id as ParaId, Chain, BlockData, Extrinsic, CandidateReceipt};
use futures::prelude::*;
/// A full collation.
pub struct Collation {
/// Block data.
pub block_data: BlockData,
/// The candidate receipt itself.
pub receipt: CandidateReceipt,
}
/// Encapsulates connections to collators and allows collation on any parachain.
///
/// This is expected to be a lightweight, shared type like an `Arc`.
pub trait Collators: Clone {
/// Errors when producing collations.
type Error;
/// A full collation.
type Collation: IntoFuture<Item=Collation,Error=Self::Error>;
/// Collate on a specific parachain, building on a given relay chain parent hash.
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation;
/// Note a bad collator. TODO: take proof
fn note_bad_collator(&self, collator: AccountId);
}
/// A future which resolves when a collation is available.
///
/// This future is fused.
pub struct CollationFetch<C: Collators, P: PolkadotApi> {
parachain: Option<ParaId>,
relay_parent_hash: Hash,
relay_parent: P::CheckedBlockId,
collators: C,
live_fetch: Option<<C::Collation as IntoFuture>::Future>,
client: Arc<P>,
}
impl<C: Collators, P: PolkadotApi> CollationFetch<C, P> {
/// Create a new collation fetcher for the given chain.
pub fn new(parachain: Chain, relay_parent: P::CheckedBlockId, relay_parent_hash: Hash, collators: C, client: Arc<P>) -> Self {
CollationFetch {
relay_parent_hash,
relay_parent,
collators,
client,
parachain: match parachain {
Chain::Parachain(id) => Some(id),
Chain::Relay => None,
},
live_fetch: None,
}
}
}
impl<C: Collators, P: PolkadotApi> Future for CollationFetch<C, P> {
type Item = (Collation, Extrinsic);
type Error = C::Error;
fn poll(&mut self) -> Poll<(Collation, Extrinsic), C::Error> {
let parachain = match self.parachain.as_ref() {
Some(p) => p.clone(),
None => return Ok(Async::NotReady),
};
loop {
let x = {
let (r, c) = (self.relay_parent_hash, &self.collators);
let poll = self.live_fetch
.get_or_insert_with(move || c.collate(parachain, r).into_future())
.poll();
if let Err(_) = poll { self.parachain = None }
try_ready!(poll)
};
match validate_collation(&*self.client, &self.relay_parent, &x) {
Ok(()) => {
self.parachain = None;
// TODO: generate extrinsic while verifying.
return Ok(Async::Ready((x, Extrinsic)));
}
Err(e) => {
debug!("Failed to validate parachain due to API error: {}", e);
// just continue if we got a bad collation or failed to validate
self.live_fetch = None;
self.collators.note_bad_collator(x.receipt.collator)
}
}
}
}
}
// Errors that can occur when validating a parachain.
error_chain! {
types { Error, ErrorKind, ResultExt; }
errors {
InactiveParachain(id: ParaId) {
description("Collated for inactive parachain"),
display("Collated for inactive parachain: {:?}", id),
}
ValidationFailure {
description("Parachain candidate failed validation."),
display("Parachain candidate failed validation."),
}
WrongHeadData(expected: Vec<u8>, got: Vec<u8>) {
description("Parachain validation produced wrong head data."),
display("Parachain validation produced wrong head data (expected: {:?}, got {:?}", expected, got),
}
}
links {
PolkadotApi(::polkadot_api::Error, ::polkadot_api::ErrorKind);
}
}
/// Check whether a given collation is valid. Returns `Ok` on success, error otherwise.
pub fn validate_collation<P: PolkadotApi>(client: &P, relay_parent: &P::CheckedBlockId, collation: &Collation) -> Result<(), Error> {
use parachain::{self, ValidationParams};
let para_id = collation.receipt.parachain_index;
let validation_code = client.parachain_code(relay_parent, para_id)?
.ok_or_else(|| ErrorKind::InactiveParachain(para_id))?;
let chain_head = client.parachain_head(relay_parent, para_id)?
.ok_or_else(|| ErrorKind::InactiveParachain(para_id))?;
let params = ValidationParams {
parent_head: chain_head,
block_data: collation.block_data.0.clone(),
};
match parachain::wasm::validate_candidate(&validation_code, params) {
Ok(result) => {
if result.head_data == collation.receipt.head_data.0 {
Ok(())
} else {
Err(ErrorKind::WrongHeadData(
collation.receipt.head_data.0.clone(),
result.head_data
).into())
}
}
Err(_) => Err(ErrorKind::ValidationFailure.into())
}
}
+130
View File
@@ -0,0 +1,130 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Dynamic inclusion threshold over time.
use std::time::{Duration, Instant};
fn duration_to_micros(duration: &Duration) -> u64 {
duration.as_secs() * 1_000_000 + (duration.subsec_nanos() / 1000) as u64
}
/// Dynamic inclusion threshold over time.
///
/// The acceptable proportion of parachains which must have parachain candidates
/// reduces over time (eventually going to zero).
#[derive(Debug, Clone)]
pub struct DynamicInclusion {
start: Instant,
y: u64,
m: u64,
}
impl DynamicInclusion {
/// Constructs a new dynamic inclusion threshold calculator based on the time now,
/// how many parachain candidates are required at the beginning, and when an empty
/// block will be allowed.
pub fn new(initial: usize, start: Instant, allow_empty: Duration) -> Self {
// linear function f(n_candidates) -> valid after microseconds
// f(0) = allow_empty
// f(initial) = 0
// m is actually the negative slope to avoid using signed arithmetic.
let (y, m) = if initial != 0 {
let y = duration_to_micros(&allow_empty);
(y, y / initial as u64)
} else {
(0, 0)
};
DynamicInclusion {
start,
y,
m,
}
}
/// Returns the duration from `now` after which the amount of included parachain candidates
/// would be enough, or `None` if it is sufficient now.
///
/// Panics if `now` is earlier than the `start`.
pub fn acceptable_in(&self, now: Instant, included: usize) -> Option<Duration> {
let elapsed = now.duration_since(self.start);
let elapsed = duration_to_micros(&elapsed);
let valid_after = self.y.saturating_sub(self.m * included as u64);
if elapsed >= valid_after {
None
} else {
Some(Duration::from_millis((valid_after - elapsed) as u64 / 1000))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn full_immediately_allowed() {
let now = Instant::now();
let dynamic = DynamicInclusion::new(
10,
now,
Duration::from_millis(4000),
);
assert!(dynamic.acceptable_in(now, 10).is_none());
assert!(dynamic.acceptable_in(now, 11).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(2000), 10).is_none());
}
#[test]
fn half_allowed_halfway() {
let now = Instant::now();
let dynamic = DynamicInclusion::new(
10,
now,
Duration::from_millis(4000),
);
assert_eq!(dynamic.acceptable_in(now, 5), Some(Duration::from_millis(2000)));
assert!(dynamic.acceptable_in(now + Duration::from_millis(2000), 5).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(3000), 5).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(4000), 5).is_none());
}
#[test]
fn zero_initial_is_flat() {
let now = Instant::now();
let dynamic = DynamicInclusion::new(
0,
now,
Duration::from_secs(10_000),
);
for i in 0..10_001 {
let now = now + Duration::from_secs(i);
assert!(dynamic.acceptable_in(now, 0).is_none());
assert!(dynamic.acceptable_in(now, 1).is_none());
assert!(dynamic.acceptable_in(now, 10).is_none());
}
}
}
+11 -26
View File
@@ -16,45 +16,30 @@
//! Errors that can occur during the consensus process.
use primitives::block::{HeaderHash, Number};
use polkadot_primitives::AccountId;
error_chain! {
links {
PolkadotApi(::polkadot_api::Error, ::polkadot_api::ErrorKind);
Bft(::bft::Error, ::bft::ErrorKind);
}
foreign_links {
Io(::std::io::Error);
SharedIo(::futures::future::SharedError<::std::io::Error>);
}
errors {
InvalidDutyRosterLength(expected: usize, got: usize) {
description("Duty Roster had invalid length"),
display("Invalid duty roster length: expected {}, got {}", expected, got),
}
ProposalNotForPolkadot {
description("Proposal provided not a Polkadot block."),
display("Proposal provided not a Polkadot block."),
NotValidator(id: AccountId) {
description("Local account ID not a validator at this block."),
display("Local account ID ({:?}) not a validator at this block.", id),
}
TimestampInFuture {
description("Proposal had timestamp too far in the future."),
display("Proposal had timestamp too far in the future."),
PrematureDestruction {
description("Proposer destroyed before finishing proposing or evaluating"),
display("Proposer destroyed before finishing proposing or evaluating"),
}
WrongParentHash(expected: HeaderHash, got: HeaderHash) {
description("Proposal had wrong parent hash."),
display("Proposal had wrong parent hash. Expected {:?}, got {:?}", expected, got),
}
WrongNumber(expected: Number, got: Number) {
description("Proposal had wrong number."),
display("Proposal had wrong number. Expected {:?}, got {:?}", expected, got),
}
ProposalTooLarge(size: usize) {
description("Proposal exceeded the maximum size."),
display(
"Proposal exceeded the maximum size of {} by {} bytes.",
::MAX_TRANSACTIONS_SIZE, ::MAX_TRANSACTIONS_SIZE.saturating_sub(*size)
),
Timer(e: String) {
description("Failed to register or resolve async timer."),
display("Timer failed: {}", e),
}
Executor(e: ::futures::future::ExecuteErrorKind) {
description("Unable to dispatch agreement future"),
+135
View File
@@ -0,0 +1,135 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Polkadot block evaluation and evaluation errors.
use super::MAX_TRANSACTIONS_SIZE;
use codec::Slicable;
use polkadot_runtime::Block as PolkadotGenericBlock;
use polkadot_primitives::Timestamp;
use polkadot_primitives::parachain::Id as ParaId;
use primitives::block::{Block as SubstrateBlock, HeaderHash, Number as BlockNumber};
use transaction_pool::PolkadotBlock;
error_chain! {
links {
PolkadotApi(::polkadot_api::Error, ::polkadot_api::ErrorKind);
}
errors {
ProposalNotForPolkadot {
description("Proposal provided not a Polkadot block."),
display("Proposal provided not a Polkadot block."),
}
TimestampInFuture {
description("Proposal had timestamp too far in the future."),
display("Proposal had timestamp too far in the future."),
}
TooManyCandidates(expected: usize, got: usize) {
description("Proposal included more candidates than is possible."),
display("Proposal included {} candidates for {} parachains", got, expected),
}
ParachainOutOfOrder {
description("Proposal included parachains out of order."),
display("Proposal included parachains out of order."),
}
UnknownParachain(id: ParaId) {
description("Proposal included unregistered parachain."),
display("Proposal included unregistered parachain {:?}", id),
}
WrongParentHash(expected: HeaderHash, got: HeaderHash) {
description("Proposal had wrong parent hash."),
display("Proposal had wrong parent hash. Expected {:?}, got {:?}", expected, got),
}
WrongNumber(expected: BlockNumber, got: BlockNumber) {
description("Proposal had wrong number."),
display("Proposal had wrong number. Expected {:?}, got {:?}", expected, got),
}
ProposalTooLarge(size: usize) {
description("Proposal exceeded the maximum size."),
display(
"Proposal exceeded the maximum size of {} by {} bytes.",
MAX_TRANSACTIONS_SIZE, MAX_TRANSACTIONS_SIZE.saturating_sub(*size)
),
}
}
}
/// Attempt to evaluate a substrate block as a polkadot block, returning error
/// upon any initial validity checks failing.
pub fn evaluate_initial(
proposal: &SubstrateBlock,
now: Timestamp,
parent_hash: &HeaderHash,
parent_number: BlockNumber,
active_parachains: &[ParaId],
) -> Result<PolkadotBlock> {
const MAX_TIMESTAMP_DRIFT: Timestamp = 60;
let encoded = Slicable::encode(proposal);
let proposal = PolkadotGenericBlock::decode(&mut &encoded[..])
.and_then(|b| PolkadotBlock::from(b).ok())
.ok_or_else(|| ErrorKind::ProposalNotForPolkadot)?;
let transactions_size = proposal.extrinsics.iter().fold(0, |a, tx| {
a + Slicable::encode(tx).len()
});
if transactions_size > MAX_TRANSACTIONS_SIZE {
bail!(ErrorKind::ProposalTooLarge(transactions_size))
}
if proposal.header.parent_hash != *parent_hash {
bail!(ErrorKind::WrongParentHash(*parent_hash, proposal.header.parent_hash));
}
if proposal.header.number != parent_number + 1 {
bail!(ErrorKind::WrongNumber(parent_number + 1, proposal.header.number));
}
let block_timestamp = proposal.timestamp();
// lenient maximum -- small drifts will just be delayed using a timer.
if block_timestamp > now + MAX_TIMESTAMP_DRIFT {
bail!(ErrorKind::TimestampInFuture)
}
{
let n_parachains = active_parachains.len();
if proposal.parachain_heads().len() > n_parachains {
bail!(ErrorKind::TooManyCandidates(n_parachains, proposal.parachain_heads().len()));
}
let mut last_id = None;
let mut iter = active_parachains.iter();
for head in proposal.parachain_heads() {
// proposed heads must be ascending order by parachain ID without duplicate.
if last_id.as_ref().map_or(false, |x| x >= &head.parachain_index) {
bail!(ErrorKind::ParachainOutOfOrder);
}
if !iter.any(|x| x == &head.parachain_index) {
// must be unknown since active parachains are always sorted.
bail!(ErrorKind::UnknownParachain(head.parachain_index))
}
last_id = Some(head.parachain_index);
}
}
Ok(proposal)
}
+424 -434
View File
@@ -29,15 +29,16 @@
//!
//! Groups themselves may be compromised by malicious authorities.
extern crate futures;
extern crate ed25519;
extern crate parking_lot;
extern crate polkadot_api;
extern crate polkadot_collator as collator;
extern crate polkadot_statement_table as table;
extern crate polkadot_parachain as parachain;
extern crate polkadot_primitives;
extern crate polkadot_transaction_pool as transaction_pool;
extern crate polkadot_runtime;
extern crate substrate_bft as bft;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
@@ -46,46 +47,60 @@ extern crate substrate_network;
extern crate exit_future;
extern crate tokio_core;
extern crate substrate_keyring;
extern crate substrate_client as client;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate log;
#[cfg(test)]
extern crate substrate_keyring;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use codec::Slicable;
use table::{Table, Context as TableContextTrait};
use table::generic::Statement as GenericStatement;
use runtime_support::Hashable;
use polkadot_api::{PolkadotApi, BlockBuilder};
use polkadot_primitives::{Hash, Timestamp};
use polkadot_primitives::parachain::{Id as ParaId, DutyRoster, BlockData, Extrinsic, CandidateReceipt};
use polkadot_runtime::Block as PolkadotGenericBlock;
use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic, CandidateReceipt};
use primitives::block::{Block as SubstrateBlock, Header as SubstrateHeader, HeaderHash, Id as BlockId, Number as BlockNumber};
use primitives::AuthorityId;
use transaction_pool::{Ready, TransactionPool, PolkadotBlock};
use transaction_pool::{Ready, TransactionPool};
use tokio_core::reactor::{Handle, Timeout, Interval};
use futures::prelude::*;
use futures::future;
use future::Shared;
use futures::future::{self, Shared};
use parking_lot::Mutex;
use tokio_core::reactor::{Handle, Timeout};
use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion;
pub use self::collation::{Collators, Collation};
pub use self::error::{ErrorKind, Error};
pub use self::shared_table::{SharedTable, StatementSource, StatementProducer, ProducedStatements};
pub use service::Service;
mod collation;
mod dynamic_inclusion;
mod evaluation;
mod error;
mod service;
mod shared_table;
// block size limit.
const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
/// A handle to a statement table router.
pub trait TableRouter {
///
/// This is expected to be a lightweight, shared type like an `Arc`.
pub trait TableRouter: Clone {
/// Errors when fetching data from the network.
type Error;
/// Future that resolves when candidate data is fetched.
@@ -93,7 +108,7 @@ pub trait TableRouter {
/// Future that resolves when extrinsic candidate data is fetched.
type FetchExtrinsic: IntoFuture<Item=Extrinsic,Error=Self::Error>;
/// Note local candidate data.
/// Note local candidate data, making it available on the network to other validators.
fn local_candidate_data(&self, hash: Hash, block_data: BlockData, extrinsic: Extrinsic);
/// Fetch block data for a specific candidate.
@@ -126,46 +141,6 @@ pub struct GroupInfo {
pub needed_availability: usize,
}
struct TableContext {
parent_hash: Hash,
key: Arc<ed25519::Pair>,
groups: HashMap<ParaId, GroupInfo>,
}
impl table::Context for TableContext {
fn is_member_of(&self, authority: &AuthorityId, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority))
}
fn is_availability_guarantor_of(&self, authority: &AuthorityId, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority))
}
fn requisite_votes(&self, group: &ParaId) -> (usize, usize) {
self.groups.get(group).map_or(
(usize::max_value(), usize::max_value()),
|g| (g.needed_validity, g.needed_availability),
)
}
}
impl TableContext {
fn local_id(&self) -> AuthorityId {
self.key.public().0
}
fn sign_statement(&self, statement: table::Statement) -> table::SignedStatement {
let signature = sign_table_statement(&statement, &self.key, &self.parent_hash).into();
let local_id = self.key.public().0;
table::SignedStatement {
statement,
signature,
sender: local_id,
}
}
}
/// Sign a table statement against a parent hash.
/// The actual message signed is the encoded statement concatenated with the
/// parent hash.
@@ -185,247 +160,7 @@ pub fn sign_table_statement(statement: &table::Statement, key: &ed25519::Pair, p
key.sign(&encoded)
}
// A shared table object.
struct SharedTableInner {
table: Table<TableContext>,
proposed_digest: Option<Hash>,
checked_validity: HashSet<Hash>,
checked_availability: HashSet<Hash>,
}
impl SharedTableInner {
// Import a single statement. Provide a handle to a table router.
fn import_statement<R: TableRouter>(
&mut self,
context: &TableContext,
router: &R,
statement: table::SignedStatement,
received_from: Option<AuthorityId>,
) -> StatementProducer<<R::FetchCandidate as IntoFuture>::Future, <R::FetchExtrinsic as IntoFuture>::Future> {
let mut producer = StatementProducer {
fetch_block_data: None,
fetch_extrinsic: None,
produced_statements: Default::default(),
_key: context.key.clone(),
};
let summary = match self.table.import_statement(context, statement, received_from) {
Some(summary) => summary,
None => return producer,
};
let local_id = context.local_id();
let is_validity_member = context.is_member_of(&local_id, &summary.group_id);
let is_availability_member =
context.is_availability_guarantor_of(&local_id, &summary.group_id);
let digest = &summary.candidate;
// TODO: consider a strategy based on the number of candidate votes as well.
// only check validity if this wasn't locally proposed.
let checking_validity = is_validity_member
&& self.proposed_digest.as_ref().map_or(true, |d| d != digest)
&& self.checked_validity.insert(digest.clone());
let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone());
if checking_validity || checking_availability {
match self.table.get_candidate(&digest) {
None => {} // TODO: handle table inconsistency somehow?
Some(candidate) => {
if checking_validity {
producer.fetch_block_data = Some(router.fetch_block_data(candidate).into_future().fuse());
}
if checking_availability {
producer.fetch_extrinsic = Some(router.fetch_extrinsic_data(candidate).into_future().fuse());
}
}
}
}
producer
}
}
/// Produced statements about a specific candidate.
/// Both may be `None`.
#[derive(Default)]
pub struct ProducedStatements {
/// A statement about the validity of the candidate.
pub validity: Option<table::Statement>,
/// A statement about the availability of the candidate.
pub availability: Option<table::Statement>,
}
/// Future that produces statements about a specific candidate.
pub struct StatementProducer<D: Future, E: Future> {
fetch_block_data: Option<future::Fuse<D>>,
fetch_extrinsic: Option<future::Fuse<E>>,
produced_statements: ProducedStatements,
_key: Arc<ed25519::Pair>,
}
impl<D, E, Err> Future for StatementProducer<D, E>
where
D: Future<Item=BlockData,Error=Err>,
E: Future<Item=Extrinsic,Error=Err>,
{
type Item = ProducedStatements;
type Error = Err;
fn poll(&mut self) -> Poll<ProducedStatements, Err> {
let mut done = true;
if let Some(ref mut fetch_block_data) = self.fetch_block_data {
match fetch_block_data.poll()? {
Async::Ready(_block_data) => {
// TODO [PoC-2] : validate block data here and make statement.
},
Async::NotReady => {
done = false;
}
}
}
if let Some(ref mut fetch_extrinsic) = self.fetch_extrinsic {
match fetch_extrinsic.poll()? {
Async::Ready(_extrinsic) => {
// TODO [PoC-2]: guarantee availability of data and make statment.
}
Async::NotReady => {
done = false;
}
}
}
if done {
Ok(Async::Ready(::std::mem::replace(&mut self.produced_statements, Default::default())))
} else {
Ok(Async::NotReady)
}
}
}
/// A shared table object.
pub struct SharedTable {
context: Arc<TableContext>,
inner: Arc<Mutex<SharedTableInner>>,
}
impl Clone for SharedTable {
fn clone(&self) -> Self {
SharedTable {
context: self.context.clone(),
inner: self.inner.clone(),
}
}
}
impl SharedTable {
/// Create a new shared table.
///
/// Provide the key to sign with, and the parent hash of the relay chain
/// block being built.
pub fn new(groups: HashMap<ParaId, GroupInfo>, key: Arc<ed25519::Pair>, parent_hash: Hash) -> Self {
SharedTable {
context: Arc::new(TableContext { groups, key, parent_hash }),
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
proposed_digest: None,
checked_validity: HashSet::new(),
checked_availability: HashSet::new(),
}))
}
}
/// Get group info.
pub fn group_info(&self) -> &HashMap<ParaId, GroupInfo> {
&self.context.groups
}
/// Import a single statement. Provide a handle to a table router
/// for dispatching any other requests which come up.
pub fn import_statement<R: TableRouter>(
&self,
router: &R,
statement: table::SignedStatement,
received_from: Option<AuthorityId>,
) -> StatementProducer<<R::FetchCandidate as IntoFuture>::Future, <R::FetchExtrinsic as IntoFuture>::Future> {
self.inner.lock().import_statement(&*self.context, router, statement, received_from)
}
/// Sign and import a local statement.
pub fn sign_and_import<R: TableRouter>(
&self,
router: &R,
statement: table::Statement,
) -> StatementProducer<<R::FetchCandidate as IntoFuture>::Future, <R::FetchExtrinsic as IntoFuture>::Future> {
let proposed_digest = match statement {
GenericStatement::Candidate(ref c) => Some(c.hash()),
_ => None,
};
let signed_statement = self.context.sign_statement(statement);
let mut inner = self.inner.lock();
if proposed_digest.is_some() {
inner.proposed_digest = proposed_digest;
}
inner.import_statement(&*self.context, router, signed_statement, None)
}
/// Import many statements at once.
///
/// Provide an iterator yielding pairs of (statement, received_from).
pub fn import_statements<R, I, U>(&self, router: &R, iterable: I) -> U
where
R: TableRouter,
I: IntoIterator<Item=(table::SignedStatement, Option<AuthorityId>)>,
U: ::std::iter::FromIterator<StatementProducer<
<R::FetchCandidate as IntoFuture>::Future,
<R::FetchExtrinsic as IntoFuture>::Future>
>,
{
let mut inner = self.inner.lock();
iterable.into_iter().map(move |(statement, received_from)| {
inner.import_statement(&*self.context, router, statement, received_from)
}).collect()
}
/// Check if a proposal is valid.
pub fn proposal_valid(&self, _proposal: &SubstrateBlock) -> bool {
false // TODO
}
/// Execute a closure using a specific candidate.
///
/// Deadlocks if called recursively.
pub fn with_candidate<F, U>(&self, digest: &Hash, f: F) -> U
where F: FnOnce(Option<&CandidateReceipt>) -> U
{
let inner = self.inner.lock();
f(inner.table.get_candidate(digest))
}
/// Get all witnessed misbehavior.
pub fn get_misbehavior(&self) -> HashMap<AuthorityId, table::Misbehavior> {
self.inner.lock().table.get_misbehavior().clone()
}
/// Fill a statement batch.
pub fn fill_batch<B: table::StatementBatch>(&self, batch: &mut B) {
self.inner.lock().table.fill_batch(batch);
}
/// Get the local proposed block's hash.
pub fn proposed_hash(&self) -> Option<Hash> {
self.inner.lock().proposed_digest.clone()
}
}
fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId]) -> Result<HashMap<ParaId, GroupInfo>, Error> {
fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId], local_id: AuthorityId) -> Result<(HashMap<ParaId, GroupInfo>, LocalDuty), Error> {
if roster.validator_duty.len() != authorities.len() {
bail!(ErrorKind::InvalidDutyRosterLength(authorities.len(), roster.validator_duty.len()))
}
@@ -434,11 +169,14 @@ fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId]) -> Result<Ha
bail!(ErrorKind::InvalidDutyRosterLength(authorities.len(), roster.guarantor_duty.len()))
}
let mut local_validation = None;
let mut map = HashMap::new();
let duty_iter = authorities.iter().zip(&roster.validator_duty).zip(&roster.guarantor_duty);
for ((authority, v_duty), a_duty) in duty_iter {
use polkadot_primitives::parachain::Chain;
if authority == &local_id {
local_validation = Some(v_duty.clone());
}
match *v_duty {
Chain::Relay => {}, // does nothing for now.
@@ -467,23 +205,45 @@ fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId]) -> Result<Ha
live_group.needed_availability = availability_len / 2 + availability_len % 2;
}
Ok(map)
match local_validation {
Some(local_validation) => {
let local_duty = LocalDuty {
validation: local_validation,
};
Ok((map, local_duty))
}
None => bail!(ErrorKind::NotValidator(local_id)),
}
}
fn timer_error(e: &::std::io::Error) -> Error {
ErrorKind::Timer(format!("{}", e)).into()
}
/// Polkadot proposer factory.
pub struct ProposerFactory<C, N> {
pub struct ProposerFactory<C, N, P> {
/// The client instance.
pub client: Arc<C>,
/// The transaction pool.
pub transaction_pool: Arc<Mutex<TransactionPool>>,
/// The backing network handle.
pub network: N,
/// Handle to the underlying tokio-core.
/// Parachain collators.
pub collators: P,
/// The timer used to schedule proposal intervals.
pub handle: Handle,
/// The duration after which parachain-empty blocks will be allowed.
pub parachain_empty_duration: Duration,
}
impl<C: PolkadotApi, N: Network> bft::ProposerFactory for ProposerFactory<C, N> {
type Proposer = Proposer<C, N::TableRouter>;
impl<C, N, P> bft::ProposerFactory for ProposerFactory<C, N, P>
where
C: PolkadotApi,
N: Network,
P: Collators,
{
type Proposer = Proposer<C, N::TableRouter, P>;
type Error = Error;
fn init(&self, parent_header: &SubstrateHeader, authorities: &[AuthorityId], sign_with: Arc<ed25519::Pair>) -> Result<Self::Proposer, Error> {
@@ -497,135 +257,224 @@ impl<C: PolkadotApi, N: Network> bft::ProposerFactory for ProposerFactory<C, N>
let duty_roster = self.client.duty_roster(&checked_id)?;
let random_seed = self.client.random_seed(&checked_id)?;
let group_info = make_group_info(duty_roster, authorities)?;
let (group_info, local_duty) = make_group_info(
duty_roster,
authorities,
sign_with.public().0,
)?;
let active_parachains = self.client.active_parachains(&checked_id)?;
let n_parachains = active_parachains.len();
let table = Arc::new(SharedTable::new(group_info, sign_with.clone(), parent_hash));
let router = self.network.table_router(table.clone());
let dynamic_inclusion = DynamicInclusion::new(
n_parachains,
Instant::now(),
self.parachain_empty_duration.clone(),
);
let timeout = Timeout::new(DELAY_UNTIL, &self.handle)?;
let timeout = Timeout::new(DELAY_UNTIL, &self.handle)
.map_err(|e| timer_error(&e))?;
debug!(target: "bft", "Initialising consensus proposer. Refusing to evaluate for {:?} from now.",
DELAY_UNTIL);
// TODO [PoC-2]: kick off collation process.
Ok(Proposer {
parent_hash,
parent_number: parent_header.number,
parent_id: checked_id,
random_seed,
local_key: sign_with,
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
collators: self.collators.clone(),
delay: timeout.shared(),
_table: table,
_router: router,
handle: self.handle.clone(),
dynamic_inclusion,
local_duty,
local_key: sign_with,
parent_hash,
parent_id: checked_id,
parent_number: parent_header.number,
random_seed,
router,
table,
transaction_pool: self.transaction_pool.clone(),
})
}
}
fn current_timestamp() -> Timestamp {
use std::time;
time::SystemTime::now().duration_since(time::UNIX_EPOCH)
.expect("now always later than unix epoch; qed")
.as_secs()
struct LocalDuty {
validation: Chain,
}
/// The Polkadot proposer logic.
pub struct Proposer<C: PolkadotApi, R> {
parent_hash: HeaderHash,
parent_number: BlockNumber,
parent_id: C::CheckedBlockId,
random_seed: Hash,
pub struct Proposer<C: PolkadotApi, R, P> {
client: Arc<C>,
local_key: Arc<ed25519::Pair>,
transaction_pool: Arc<Mutex<TransactionPool>>,
collators: P,
delay: Shared<Timeout>,
_table: Arc<SharedTable>,
_router: R,
dynamic_inclusion: DynamicInclusion,
handle: Handle,
local_duty: LocalDuty,
local_key: Arc<ed25519::Pair>,
parent_hash: HeaderHash,
parent_id: C::CheckedBlockId,
parent_number: BlockNumber,
random_seed: Hash,
router: R,
table: Arc<SharedTable>,
transaction_pool: Arc<Mutex<TransactionPool>>,
}
impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
impl<C, R, P> bft::Proposer for Proposer<C, R, P>
where
C: PolkadotApi,
R: TableRouter,
P: Collators,
{
type Error = Error;
type Create = Result<SubstrateBlock, Error>;
type Create = future::Either<
CreateProposal<C, R, P>,
future::FutureResult<SubstrateBlock, Error>,
>;
type Evaluate = Box<Future<Item=bool, Error=Error>>;
fn propose(&self) -> Self::Create {
debug!(target: "bft", "proposing block on top of parent ({}, {:?})", self.parent_number, self.parent_hash);
const ATTEMPT_PROPOSE_EVERY: Duration = Duration::from_millis(100);
// TODO: handle case when current timestamp behind that in state.
let mut block_builder = self.client.build_block(
&self.parent_id,
current_timestamp()
)?;
let initial_included = self.table.includable_count();
let enough_candidates = self.dynamic_inclusion.acceptable_in(
Instant::now(),
initial_included,
).unwrap_or_default();
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
let timing = {
let delay = self.delay.clone();
let dynamic_inclusion = self.dynamic_inclusion.clone();
let make_timing = move |handle| -> Result<ProposalTiming, ::std::io::Error> {
let attempt_propose = Interval::new(ATTEMPT_PROPOSE_EVERY, handle)?;
let enough_candidates = Timeout::new(enough_candidates, handle)?;
Ok(ProposalTiming {
attempt_propose,
enough_candidates,
dynamic_inclusion,
minimum_delay: Some(delay),
last_included: initial_included,
})
};
{
let mut pool = self.transaction_pool.lock();
let mut unqueue_invalid = Vec::new();
let mut pending_size = 0;
pool.cull(None, readiness_evaluator.clone());
for pending in pool.pending(readiness_evaluator.clone()) {
// skip and cull transactions which are too large.
if pending.encoded_size() > MAX_TRANSACTIONS_SIZE {
unqueue_invalid.push(pending.hash().clone());
continue
match make_timing(&self.handle) {
Ok(timing) => timing,
Err(e) => {
return future::Either::B(future::err(timer_error(&e)));
}
if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }
match block_builder.push_extrinsic(pending.as_transaction().clone()) {
Ok(()) => {
pending_size += pending.encoded_size();
}
Err(_) => {
unqueue_invalid.push(pending.hash().clone());
}
}
}
for tx_hash in unqueue_invalid {
pool.remove(&tx_hash, false);
}
}
let polkadot_block = block_builder.bake();
info!("Proposing block [number: {}; hash: {}; parent_hash: {}; extrinsics: [{}]]",
polkadot_block.header.number,
Hash::from(polkadot_block.header.blake2_256()),
polkadot_block.header.parent_hash,
polkadot_block.extrinsics.iter()
.map(|xt| format!("{}", Hash::from(xt.blake2_256())))
.collect::<Vec<_>>()
.join(", ")
);
let substrate_block = Slicable::decode(&mut polkadot_block.encode().as_slice())
.expect("polkadot blocks defined to serialize to substrate blocks correctly; qed");
assert!(evaluate_proposal(&substrate_block, &*self.client, current_timestamp(), &self.parent_hash, self.parent_number, &self.parent_id).is_ok());
Ok(substrate_block)
}
// TODO: certain kinds of errors here should lead to a misbehavior report.
fn evaluate(&self, proposal: &SubstrateBlock) -> Self::Evaluate {
debug!(target: "bft", "evaluating block on top of parent ({}, {:?})", self.parent_number, self.parent_hash);
let evaluated = match evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash, self.parent_number, &self.parent_id) {
Ok(x) => Ok(x),
Err(e) => match *e.kind() {
ErrorKind::PolkadotApi(polkadot_api::ErrorKind::Executor(_)) => Ok(false),
ErrorKind::ProposalNotForPolkadot => Ok(false),
ErrorKind::TimestampInFuture => Ok(false),
ErrorKind::WrongParentHash(_, _) => Ok(false),
ErrorKind::ProposalTooLarge(_) => Ok(false),
_ => Err(e),
}
};
// delay casting vote until able.
Box::new(self.delay.clone().map_err(Error::from).and_then(move |_| evaluated))
future::Either::A(CreateProposal {
parent_hash: self.parent_hash.clone(),
parent_number: self.parent_number.clone(),
parent_id: self.parent_id.clone(),
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
collation: CollationFetch::new(
self.local_duty.validation,
self.parent_id.clone(),
self.parent_hash.clone(),
self.collators.clone(),
self.client.clone()
),
table: self.table.clone(),
router: self.router.clone(),
timing,
})
}
fn evaluate(&self, proposal: &SubstrateBlock) -> Self::Evaluate {
debug!(target: "bft", "evaluating block on top of parent ({}, {:?})", self.parent_number, self.parent_hash);
let active_parachains = match self.client.active_parachains(&self.parent_id) {
Ok(x) => x,
Err(e) => return Box::new(future::err(e.into())) as Box<_>,
};
let current_timestamp = current_timestamp();
// do initial serialization and structural integrity checks.
let maybe_proposal = evaluation::evaluate_initial(
proposal,
current_timestamp,
&self.parent_hash,
self.parent_number,
&active_parachains,
);
let proposal = match maybe_proposal {
Ok(p) => p,
Err(e) => {
// TODO: these errors are easily re-checked in runtime.
debug!(target: "bft", "Invalid proposal: {:?}", e);
return Box::new(future::ok(false));
}
};
let vote_delays = {
// delay casting vote until able (according to minimum block time)
let minimum_delay = self.delay.clone()
.map_err(|e| timer_error(&*e));
let included_candidate_hashes = proposal
.parachain_heads()
.iter()
.map(|candidate| candidate.hash());
// delay casting vote until we have proof that all candidates are
// includable.
let includability_tracker = self.table.track_includability(included_candidate_hashes)
.map_err(|_| ErrorKind::PrematureDestruction.into());
// the duration at which the given number of parachains is acceptable.
let count_delay = self.dynamic_inclusion.acceptable_in(
Instant::now(),
proposal.parachain_heads().len(),
);
// the duration until the given timestamp is current
let proposed_timestamp = proposal.timestamp();
let timestamp_delay = if proposed_timestamp > current_timestamp {
Some(Duration::from_secs(proposed_timestamp - current_timestamp))
} else {
None
};
// construct a future from the maximum of the two durations.
let temporary_delay = match ::std::cmp::max(timestamp_delay, count_delay) {
Some(duration) => {
let maybe_timeout = Timeout::new(duration, &self.handle);
let f = future::result(maybe_timeout)
.and_then(|timeout| timeout)
.map_err(|e| timer_error(&e));
future::Either::A(f)
}
None => future::Either::B(future::ok(())),
};
minimum_delay.join3(includability_tracker, temporary_delay)
};
// evaluate whether the block is actually valid.
// TODO: is it better to delay this until the delays are finished?
let evaluated = self.client.evaluate_block(&self.parent_id, proposal.into()).map_err(Into::into);
let future = future::result(evaluated).and_then(move |good| {
let end_result = future::ok(good);
if good {
// delay a "good" vote.
future::Either::A(vote_delays.and_then(|_| end_result))
} else {
// don't delay a "bad" evaluation.
future::Either::B(end_result)
}
});
Box::new(future) as Box<_>
}
fn round_proposer(&self, round_number: usize, authorities: &[AuthorityId]) -> AuthorityId {
@@ -697,45 +546,186 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
}
}
fn evaluate_proposal<C: PolkadotApi>(
proposal: &SubstrateBlock,
client: &C,
now: Timestamp,
parent_hash: &HeaderHash,
parent_number: BlockNumber,
parent_id: &C::CheckedBlockId,
) -> Result<bool, Error> {
const MAX_TIMESTAMP_DRIFT: Timestamp = 4;
fn current_timestamp() -> Timestamp {
use std::time;
let encoded = Slicable::encode(proposal);
let proposal = PolkadotGenericBlock::decode(&mut &encoded[..])
.and_then(|b| PolkadotBlock::from(b).ok())
.ok_or_else(|| ErrorKind::ProposalNotForPolkadot)?;
let transactions_size = proposal.extrinsics.iter().fold(0, |a, tx| {
a + Slicable::encode(tx).len()
});
if transactions_size > MAX_TRANSACTIONS_SIZE {
bail!(ErrorKind::ProposalTooLarge(transactions_size))
}
if proposal.header.parent_hash != *parent_hash {
bail!(ErrorKind::WrongParentHash(*parent_hash, proposal.header.parent_hash));
}
if proposal.header.number != parent_number + 1 {
bail!(ErrorKind::WrongNumber(parent_number + 1, proposal.header.number))
}
let block_timestamp = proposal.timestamp();
// TODO: just defer using `tokio_timer` to delay prepare vote.
if block_timestamp > now + MAX_TIMESTAMP_DRIFT {
bail!(ErrorKind::TimestampInFuture)
}
// execute the block.
client.evaluate_block(parent_id, proposal.into())?;
Ok(true)
time::SystemTime::now().duration_since(time::UNIX_EPOCH)
.expect("now always later than unix epoch; qed")
.as_secs()
}
struct ProposalTiming {
attempt_propose: Interval,
dynamic_inclusion: DynamicInclusion,
enough_candidates: Timeout,
minimum_delay: Option<Shared<Timeout>>,
last_included: usize,
}
impl ProposalTiming {
// whether it's time to attempt a proposal.
// shouldn't be called outside of the context of a task.
fn poll(&mut self, included: usize) -> Poll<(), Error> {
// first drain from the interval so when the minimum delay is up
// we don't have any notifications built up.
//
// this interval is just meant to produce periodic task wakeups
// that lead to the `dynamic_inclusion` getting updated as necessary.
if let Async::Ready(x) = self.attempt_propose.poll()
.map_err(|e| timer_error(&e))?
{
x.expect("timer still alive; intervals never end; qed");
}
if let Some(ref mut min) = self.minimum_delay {
try_ready!(min.poll().map_err(|e| timer_error(&*e)));
}
self.minimum_delay = None; // after this point, the future must have completed.
if included == self.last_included {
return self.enough_candidates.poll().map_err(|e| timer_error(&e));
}
// the amount of includable candidates has changed. schedule a wakeup
// if it's not sufficient anymore.
let now = Instant::now();
match self.dynamic_inclusion.acceptable_in(now, included) {
Some(duration) => {
self.last_included = included;
self.enough_candidates.reset(now + duration);
self.enough_candidates.poll().map_err(|e| timer_error(&e))
}
None => {
Ok(Async::Ready(()))
}
}
}
}
/// Future which resolves upon the creation of a proposal.
pub struct CreateProposal<C: PolkadotApi, R, P: Collators> {
parent_hash: HeaderHash,
parent_number: BlockNumber,
parent_id: C::CheckedBlockId,
client: Arc<C>,
transaction_pool: Arc<Mutex<TransactionPool>>,
collation: CollationFetch<P, C>,
router: R,
table: Arc<SharedTable>,
timing: ProposalTiming,
}
impl<C, R, P> CreateProposal<C, R, P>
where
C: PolkadotApi,
R: TableRouter,
P: Collators,
{
fn propose_with(&self, candidates: Vec<CandidateReceipt>) -> Result<SubstrateBlock, Error> {
// TODO: handle case when current timestamp behind that in state.
let timestamp = current_timestamp();
let mut block_builder = self.client.build_block(
&self.parent_id,
timestamp,
candidates,
)?;
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
{
let mut pool = self.transaction_pool.lock();
let mut unqueue_invalid = Vec::new();
let mut pending_size = 0;
pool.cull(None, readiness_evaluator.clone());
for pending in pool.pending(readiness_evaluator.clone()) {
// skip and cull transactions which are too large.
if pending.encoded_size() > MAX_TRANSACTIONS_SIZE {
unqueue_invalid.push(pending.hash().clone());
continue
}
if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }
match block_builder.push_extrinsic(pending.as_transaction().clone()) {
Ok(()) => {
pending_size += pending.encoded_size();
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(pending.hash().clone());
}
}
}
for tx_hash in unqueue_invalid {
pool.remove(&tx_hash, false);
}
}
let polkadot_block = block_builder.bake();
info!("Proposing block [number: {}; hash: {}; parent_hash: {}; extrinsics: [{}]]",
polkadot_block.header.number,
Hash::from(polkadot_block.header.blake2_256()),
polkadot_block.header.parent_hash,
polkadot_block.extrinsics.iter()
.map(|xt| format!("{}", Hash::from(xt.blake2_256())))
.collect::<Vec<_>>()
.join(", ")
);
let substrate_block = Slicable::decode(&mut polkadot_block.encode().as_slice())
.expect("polkadot blocks defined to serialize to substrate blocks correctly; qed");
// TODO: full re-evaluation
let active_parachains = self.client.active_parachains(&self.parent_id)?;
assert!(evaluation::evaluate_initial(
&substrate_block,
timestamp,
&self.parent_hash,
self.parent_number,
&active_parachains,
).is_ok());
Ok(substrate_block)
}
}
impl<C, R, P> Future for CreateProposal<C, R, P>
where
C: PolkadotApi,
R: TableRouter,
P: Collators,
{
type Item = SubstrateBlock;
type Error = Error;
fn poll(&mut self) -> Poll<SubstrateBlock, Error> {
// 1. poll local collation future.
match self.collation.poll() {
Ok(Async::Ready((collation, extrinsic))) => {
let hash = collation.receipt.hash();
self.router.local_candidate_data(hash, collation.block_data, extrinsic);
// TODO: if we are an availability guarantor also, we should produce an availability statement.
self.table.sign_and_import(&self.router, GenericStatement::Candidate(collation.receipt));
}
Ok(Async::NotReady) => {},
Err(_) => {}, // TODO: handle this failure to collate.
}
// 2. try to propose if we have enough includable candidates and other
// delays have concluded.
let included = self.table.includable_count();
try_ready!(self.timing.poll(included));
// 3. propose
let proposed_candidates = self.table.with_proposal(|proposed_set| {
proposed_set.into_iter().cloned().collect()
});
self.propose_with(proposed_candidates).map(Async::Ready)
}
}
+40 -15
View File
@@ -22,19 +22,23 @@
use std::thread;
use std::time::{Duration, Instant};
use std::sync::Arc;
use futures::{future, Future, Stream, Sink, Async, Canceled, Poll};
use parking_lot::Mutex;
use substrate_network as net;
use tokio_core::reactor;
use bft::{self, BftService};
use client::{BlockchainEvents, ChainHead};
use runtime_support::Hashable;
use ed25519;
use futures::prelude::*;
use futures::{future, Canceled};
use parking_lot::Mutex;
use polkadot_api::PolkadotApi;
use polkadot_primitives::AccountId;
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use primitives::{Hash, AuthorityId};
use primitives::block::{Id as BlockId, HeaderHash, Header};
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt};
use polkadot_api::PolkadotApi;
use bft::{self, BftService};
use runtime_support::Hashable;
use substrate_network as net;
use tokio_core::reactor;
use transaction_pool::TransactionPool;
use ed25519;
use super::{TableRouter, SharedTable, ProposerFactory};
use error;
@@ -174,6 +178,15 @@ impl<E> Sink for BftSink<E> {
struct Network(Arc<net::ConsensusService>);
impl super::Network for Network {
type TableRouter = Router;
fn table_router(&self, _table: Arc<SharedTable>) -> Self::TableRouter {
Router {
network: self.0.clone()
}
}
}
fn start_bft<F, C>(
header: &Header,
handle: reactor::Handle,
@@ -224,6 +237,7 @@ impl Service {
client: Arc<C>,
network: Arc<net::ConsensusService>,
transaction_pool: Arc<Mutex<TransactionPool>>,
parachain_empty_duration: Duration,
key: ed25519::Pair,
) -> Service
where
@@ -233,10 +247,13 @@ impl Service {
let thread = thread::spawn(move || {
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
let key = Arc::new(key);
let factory = ProposerFactory {
client: client.clone(),
transaction_pool: transaction_pool.clone(),
network: Network(network.clone()),
collators: NoCollators,
parachain_empty_duration,
handle: core.handle(),
};
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
@@ -312,17 +329,25 @@ impl Drop for Service {
}
}
impl super::Network for Network {
type TableRouter = Router;
fn table_router(&self, _table: Arc<SharedTable>) -> Self::TableRouter {
Router {
network: self.0.clone()
}
// Collators implementation which never collates anything.
// TODO: do a real implementation.
#[derive(Clone, Copy)]
struct NoCollators;
impl ::collation::Collators for NoCollators {
type Error = ();
type Collation = future::Empty<::collation::Collation, ()>;
fn collate(&self, _parachain: ParaId, _relay_parent: Hash) -> Self::Collation {
future::empty()
}
fn note_bad_collator(&self, _collator: AccountId) { }
}
type FetchCandidateAdapter = future::Map<net::FetchFuture, fn(Vec<u8>) -> BlockData>;
#[derive(Clone)]
struct Router {
network: Arc<net::ConsensusService>,
}
@@ -0,0 +1,137 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Implements a future which resolves when all of the candidates referenced are includable.
use std::collections::HashMap;
use futures::prelude::*;
use futures::sync::oneshot;
use polkadot_primitives::Hash;
/// Track includability of a set of candidates,
pub(super) fn track<I: IntoIterator<Item=(Hash, bool)>>(candidates: I) -> (IncludabilitySender, Includable) {
let (tx, rx) = oneshot::channel();
let tracking: HashMap<_, _> = candidates.into_iter().collect();
let includable_count = tracking.values().filter(|x| **x).count();
let mut sender = IncludabilitySender {
tracking,
includable_count,
sender: Some(tx),
};
sender.try_complete();
(
sender,
Includable(rx),
)
}
/// The sending end of the includability sender.
pub(super) struct IncludabilitySender {
tracking: HashMap<Hash, bool>,
includable_count: usize,
sender: Option<oneshot::Sender<()>>,
}
impl IncludabilitySender {
/// update the inner candidate. wakes up the task as necessary.
/// returns `Err(Canceled)` if the other end has hung up.
///
/// returns `true` when this is completed and should be destroyed.
pub fn update_candidate(&mut self, candidate: Hash, includable: bool) -> bool {
use std::collections::hash_map::Entry;
match self.tracking.entry(candidate) {
Entry::Vacant(_) => {}
Entry::Occupied(mut entry) => {
let old = entry.insert(includable);
if !old && includable {
self.includable_count += 1;
} else if old && !includable {
self.includable_count -= 1;
}
}
}
self.try_complete()
}
/// whether the sender is completed.
pub fn is_complete(&self) -> bool {
self.sender.is_none()
}
fn try_complete(&mut self) -> bool {
if self.includable_count == self.tracking.len() {
if let Some(sender) = self.sender.take() {
let _ = sender.send(());
}
true
} else {
false
}
}
}
/// Future that resolves when all the candidates within are includable.
pub struct Includable(oneshot::Receiver<()>);
impl Future for Includable {
type Item = ();
type Error = oneshot::Canceled;
fn poll(&mut self) -> Poll<(), oneshot::Canceled> {
self.0.poll()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let hash1 = [1; 32].into();
let hash2 = [2; 32].into();
let hash3 = [3; 32].into();
let (mut sender, recv) = track([
(hash1, true),
(hash2, true),
(hash2, false), // overwrite should favor latter.
(hash3, true),
].iter().cloned());
assert!(!sender.is_complete());
// true -> false transition is possible and should be handled.
sender.update_candidate(hash1, false);
assert!(!sender.is_complete());
sender.update_candidate(hash2, true);
assert!(!sender.is_complete());
sender.update_candidate(hash1, true);
assert!(sender.is_complete());
recv.wait().unwrap();
}
}
+566
View File
@@ -0,0 +1,566 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Parachain statement table meant to to shared with a message router
//! and a consensus proposer.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use table::{self, Table, Context as TableContextTrait};
use table::generic::Statement as GenericStatement;
use collation::Collation;
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use primitives::AuthorityId;
use parking_lot::Mutex;
use futures::{future, prelude::*};
use super::{GroupInfo, TableRouter};
use self::includable::IncludabilitySender;
mod includable;
pub use self::includable::Includable;
struct TableContext {
parent_hash: Hash,
key: Arc<::ed25519::Pair>,
groups: HashMap<ParaId, GroupInfo>,
}
impl table::Context for TableContext {
fn is_member_of(&self, authority: &AuthorityId, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority))
}
fn is_availability_guarantor_of(&self, authority: &AuthorityId, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority))
}
fn requisite_votes(&self, group: &ParaId) -> (usize, usize) {
self.groups.get(group).map_or(
(usize::max_value(), usize::max_value()),
|g| (g.needed_validity, g.needed_availability),
)
}
}
impl TableContext {
fn local_id(&self) -> AuthorityId {
self.key.public().0
}
fn sign_statement(&self, statement: table::Statement) -> table::SignedStatement {
let signature = ::sign_table_statement(&statement, &self.key, &self.parent_hash).into();
let local_id = self.key.public().0;
table::SignedStatement {
statement,
signature,
sender: local_id,
}
}
}
/// Source of statements
pub enum StatementSource {
/// Locally produced statement.
Local,
/// Received statement from remote source, with optional sender.
Remote(Option<AuthorityId>),
}
// A shared table object.
struct SharedTableInner {
table: Table<TableContext>,
proposed_digest: Option<Hash>,
checked_validity: HashSet<Hash>,
checked_availability: HashSet<Hash>,
trackers: Vec<IncludabilitySender>,
}
impl SharedTableInner {
// Import a single statement. Provide a handle to a table router and a function
// used to determine if a referenced candidate is valid.
fn import_statement<R: TableRouter, C: FnMut(Collation) -> bool>(
&mut self,
context: &TableContext,
router: &R,
statement: table::SignedStatement,
statement_source: StatementSource,
check_candidate: C,
) -> StatementProducer<
<R::FetchCandidate as IntoFuture>::Future,
<R::FetchExtrinsic as IntoFuture>::Future,
C,
> {
// this blank producer does nothing until we attach some futures
// and set a candidate digest.
let received_from = match statement_source {
StatementSource::Local => return Default::default(),
StatementSource::Remote(from) => from,
};
let summary = match self.table.import_statement(context, statement, received_from) {
Some(summary) => summary,
None => return Default::default(),
};
self.update_trackers(&summary.candidate, context);
let local_id = context.local_id();
let is_validity_member = context.is_member_of(&local_id, &summary.group_id);
let is_availability_member =
context.is_availability_guarantor_of(&local_id, &summary.group_id);
let digest = &summary.candidate;
// TODO: consider a strategy based on the number of candidate votes as well.
// only check validity if this wasn't locally proposed.
let checking_validity = is_validity_member
&& self.proposed_digest.as_ref().map_or(true, |d| d != digest)
&& self.checked_validity.insert(digest.clone());
let checking_availability = is_availability_member
&& self.checked_availability.insert(digest.clone());
let work = if checking_validity || checking_availability {
match self.table.get_candidate(&digest) {
None => None, // TODO: handle table inconsistency somehow?
Some(candidate) => {
let fetch_block_data =
router.fetch_block_data(candidate).into_future().fuse();
let fetch_extrinsic = if checking_availability {
Some(
router.fetch_extrinsic_data(candidate).into_future().fuse()
)
} else {
None
};
Some(Work {
candidate_receipt: candidate.clone(),
fetch_block_data,
fetch_extrinsic,
evaluate: checking_validity,
check_candidate,
})
}
}
} else {
None
};
StatementProducer {
produced_statements: Default::default(),
work,
}
}
fn update_trackers(&mut self, candidate: &Hash, context: &TableContext) {
let includable = self.table.candidate_includable(candidate, context);
for i in (0..self.trackers.len()).rev() {
if self.trackers[i].update_candidate(candidate.clone(), includable) {
self.trackers.swap_remove(i);
}
}
}
}
/// Produced statements about a specific candidate.
/// Both may be `None`.
#[derive(Default)]
pub struct ProducedStatements {
/// A statement about the validity of the candidate.
pub validity: Option<table::Statement>,
/// A statement about availability of data. If this is `Some`,
/// then `block_data` and `extrinsic` should be `Some` as well.
pub availability: Option<table::Statement>,
/// Block data to ensure availability of.
pub block_data: Option<BlockData>,
/// Extrinsic data to ensure availability of.
pub extrinsic: Option<Extrinsic>,
}
/// Future that produces statements about a specific candidate.
pub struct StatementProducer<D: Future, E: Future, C> {
produced_statements: ProducedStatements,
work: Option<Work<D, E, C>>,
}
struct Work<D: Future, E: Future, C> {
candidate_receipt: CandidateReceipt,
fetch_block_data: future::Fuse<D>,
fetch_extrinsic: Option<future::Fuse<E>>,
evaluate: bool,
check_candidate: C
}
impl<D: Future, E: Future, C> Default for StatementProducer<D, E, C> {
fn default() -> Self {
StatementProducer {
produced_statements: Default::default(),
work: None,
}
}
}
impl<D, E, C, Err> Future for StatementProducer<D, E, C>
where
D: Future<Item=BlockData,Error=Err>,
E: Future<Item=Extrinsic,Error=Err>,
C: FnMut(Collation) -> bool,
{
type Item = ProducedStatements;
type Error = Err;
fn poll(&mut self) -> Poll<ProducedStatements, Err> {
let work = match self.work {
Some(ref mut work) => work,
None => return Ok(Async::Ready(::std::mem::replace(&mut self.produced_statements, Default::default()))),
};
if let Async::Ready(block_data) = work.fetch_block_data.poll()? {
self.produced_statements.block_data = Some(block_data.clone());
if work.evaluate {
let is_good = (work.check_candidate)(Collation {
block_data,
receipt: work.candidate_receipt.clone(),
});
let hash = work.candidate_receipt.hash();
self.produced_statements.validity = Some(if is_good {
GenericStatement::Valid(hash)
} else {
GenericStatement::Invalid(hash)
});
}
}
if let Some(ref mut fetch_extrinsic) = work.fetch_extrinsic {
if let Async::Ready(extrinsic) = fetch_extrinsic.poll()? {
self.produced_statements.extrinsic = Some(extrinsic);
}
}
let done = self.produced_statements.block_data.is_some() && {
if work.evaluate {
true
} else if self.produced_statements.extrinsic.is_some() {
self.produced_statements.availability =
Some(GenericStatement::Available(work.candidate_receipt.hash()));
true
} else {
false
}
};
if done {
Ok(Async::Ready(::std::mem::replace(&mut self.produced_statements, Default::default())))
} else {
Ok(Async::NotReady)
}
}
}
/// A shared table object.
pub struct SharedTable {
context: Arc<TableContext>,
inner: Arc<Mutex<SharedTableInner>>,
}
impl Clone for SharedTable {
fn clone(&self) -> Self {
SharedTable {
context: self.context.clone(),
inner: self.inner.clone(),
}
}
}
impl SharedTable {
/// Create a new shared table.
///
/// Provide the key to sign with, and the parent hash of the relay chain
/// block being built.
pub fn new(groups: HashMap<ParaId, GroupInfo>, key: Arc<::ed25519::Pair>, parent_hash: Hash) -> Self {
SharedTable {
context: Arc::new(TableContext { groups, key, parent_hash }),
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
proposed_digest: None,
checked_validity: HashSet::new(),
checked_availability: HashSet::new(),
trackers: Vec::new(),
}))
}
}
/// Get group info.
pub fn group_info(&self) -> &HashMap<ParaId, GroupInfo> {
&self.context.groups
}
/// Import a single statement. Provide a handle to a table router
/// for dispatching any other requests which come up.
pub fn import_statement<R: TableRouter, C: FnMut(Collation) -> bool>(
&self,
router: &R,
statement: table::SignedStatement,
received_from: StatementSource,
check_candidate: C,
) -> StatementProducer<<R::FetchCandidate as IntoFuture>::Future, <R::FetchExtrinsic as IntoFuture>::Future, C> {
self.inner.lock().import_statement(&*self.context, router, statement, received_from, check_candidate)
}
/// Sign and import a local statement.
pub fn sign_and_import<R: TableRouter>(
&self,
router: &R,
statement: table::Statement,
) {
let proposed_digest = match statement {
GenericStatement::Candidate(ref c) => Some(c.hash()),
_ => None,
};
let signed_statement = self.context.sign_statement(statement);
let mut inner = self.inner.lock();
if proposed_digest.is_some() {
inner.proposed_digest = proposed_digest;
}
let producer = inner.import_statement(
&*self.context,
router,
signed_statement,
StatementSource::Local,
|_| true,
);
assert!(producer.work.is_none(), "local statement import never leads to additional work; qed");
}
/// Import many statements at once.
///
/// Provide an iterator yielding pairs of (statement, statement_source).
pub fn import_statements<R, I, C, U>(&self, router: &R, iterable: I) -> U
where
R: TableRouter,
I: IntoIterator<Item=(table::SignedStatement, StatementSource, C)>,
C: FnMut(Collation) -> bool,
U: ::std::iter::FromIterator<StatementProducer<
<R::FetchCandidate as IntoFuture>::Future,
<R::FetchExtrinsic as IntoFuture>::Future,
C,
>>,
{
let mut inner = self.inner.lock();
iterable.into_iter().map(move |(statement, statement_source, check_candidate)| {
inner.import_statement(&*self.context, router, statement, statement_source, check_candidate)
}).collect()
}
/// Execute a closure using a specific candidate.
///
/// Deadlocks if called recursively.
pub fn with_candidate<F, U>(&self, digest: &Hash, f: F) -> U
where F: FnOnce(Option<&CandidateReceipt>) -> U
{
let inner = self.inner.lock();
f(inner.table.get_candidate(digest))
}
/// Execute a closure using the current proposed set.
///
/// Deadlocks if called recursively.
pub fn with_proposal<F, U>(&self, f: F) -> U
where F: FnOnce(Vec<&CandidateReceipt>) -> U
{
let inner = self.inner.lock();
f(inner.table.proposed_candidates(&*self.context))
}
/// Get the number of parachains which have available candidates.
pub fn includable_count(&self) -> usize {
self.inner.lock().table.includable_count()
}
/// Get all witnessed misbehavior.
pub fn get_misbehavior(&self) -> HashMap<AuthorityId, table::Misbehavior> {
self.inner.lock().table.get_misbehavior().clone()
}
/// Fill a statement batch.
pub fn fill_batch<B: table::StatementBatch>(&self, batch: &mut B) {
self.inner.lock().table.fill_batch(batch);
}
/// Track includability of a given set of candidate hashes.
pub fn track_includability<I>(&self, iterable: I) -> Includable
where I: IntoIterator<Item=Hash>
{
let mut inner = self.inner.lock();
let (tx, rx) = includable::track(iterable.into_iter().map(|x| {
let includable = inner.table.candidate_includable(&x, &*self.context);
(x, includable)
}));
if !tx.is_complete() {
inner.trackers.push(tx);
}
rx
}
}
#[cfg(test)]
mod tests {
use super::*;
use substrate_keyring::Keyring;
#[derive(Clone)]
struct DummyRouter;
impl TableRouter for DummyRouter {
type Error = ();
type FetchCandidate = ::futures::future::Empty<BlockData,()>;
type FetchExtrinsic = ::futures::future::Empty<Extrinsic,()>;
/// Note local candidate data, making it available on the network to other validators.
fn local_candidate_data(&self, _hash: Hash, _block_data: BlockData, _extrinsic: Extrinsic) {
}
/// Fetch block data for a specific candidate.
fn fetch_block_data(&self, _candidate: &CandidateReceipt) -> Self::FetchCandidate {
::futures::future::empty()
}
/// Fetch extrinsic data for a specific candidate.
fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic {
::futures::future::empty()
}
}
#[test]
fn statement_triggers_fetch_and_evaluate() {
let mut groups = HashMap::new();
let para_id = ParaId::from(1);
let local_id = Keyring::Alice.to_raw_public();
let local_key = Arc::new(Keyring::Alice.pair());
let validity_other = Keyring::Bob.to_raw_public();
let validity_other_key = Keyring::Bob.pair();
let parent_hash = Default::default();
groups.insert(para_id, GroupInfo {
validity_guarantors: [local_id, validity_other].iter().cloned().collect(),
availability_guarantors: Default::default(),
needed_validity: 2,
needed_availability: 0,
});
let shared_table = SharedTable::new(groups, local_key.clone(), parent_hash);
let candidate = CandidateReceipt {
parachain_index: para_id,
collator: [1; 32],
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
};
let candidate_statement = GenericStatement::Candidate(candidate);
let signature = ::sign_table_statement(&candidate_statement, &validity_other_key, &parent_hash);
let signed_statement = ::table::generic::SignedStatement {
statement: candidate_statement,
signature: signature.into(),
sender: validity_other,
};
let producer = shared_table.import_statement(
&DummyRouter,
signed_statement,
StatementSource::Remote(None),
|_| true,
);
assert!(producer.work.is_some(), "candidate and local validity group are same");
assert!(producer.work.as_ref().unwrap().evaluate, "should evaluate validity");
}
#[test]
fn statement_triggers_fetch_and_availability() {
let mut groups = HashMap::new();
let para_id = ParaId::from(1);
let local_id = Keyring::Alice.to_raw_public();
let local_key = Arc::new(Keyring::Alice.pair());
let validity_other = Keyring::Bob.to_raw_public();
let validity_other_key = Keyring::Bob.pair();
let parent_hash = Default::default();
groups.insert(para_id, GroupInfo {
validity_guarantors: [validity_other].iter().cloned().collect(),
availability_guarantors: [local_id].iter().cloned().collect(),
needed_validity: 1,
needed_availability: 1,
});
let shared_table = SharedTable::new(groups, local_key.clone(), parent_hash);
let candidate = CandidateReceipt {
parachain_index: para_id,
collator: [1; 32],
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
};
let candidate_statement = GenericStatement::Candidate(candidate);
let signature = ::sign_table_statement(&candidate_statement, &validity_other_key, &parent_hash);
let signed_statement = ::table::generic::SignedStatement {
statement: candidate_statement,
signature: signature.into(),
sender: validity_other,
};
let producer = shared_table.import_statement(
&DummyRouter,
signed_statement,
StatementSource::Remote(None),
|_| true,
);
assert!(producer.work.is_some(), "candidate and local availability group are same");
assert!(producer.work.as_ref().unwrap().fetch_extrinsic.is_some(), "should fetch extrinsic when guaranteeing availability");
assert!(!producer.work.as_ref().unwrap().evaluate, "should not evaluate validity");
}
}