mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 17:31:03 +00:00
Proposal creation and evaluation to plug into BFT (#77)
* reshuffle consensus libraries * polkadot-useful type definitions for statement table * begin BftService * primary selection logic * bft service implementation without I/O * extract out `BlockImport` trait * allow bft primitives to compile on wasm * Block builder (substrate) * take polkadot-consensus down to the core. * test for preemption * fix test build * Fix wasm build * Bulid on any block * Test for block builder. * Block import tests for client. * Tidy ups * clean up block builder instantiation * justification verification logic * JustifiedHeader and import * Propert block generation for tests * network and tablerouter trait * use statement import to drive creation of further statements * Fixed rpc tests * custom error type for consensus * create proposer * asynchronous proposal evaluation * inherent transactions in polkadot runtime * fix tests to match real polkadot block constraints * implicitly generate inherent functions * add inherent transaction functionality to block body * block builder logic for polkadot * some tests for the polkadot API
This commit is contained in:
committed by
Gav Wood
parent
5f9be58d04
commit
1e6cad908e
+343
-30
@@ -29,31 +29,73 @@
|
||||
//!
|
||||
//! Groups themselves may be compromised by malicious authorities.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use codec::Slicable;
|
||||
use table::Table;
|
||||
use table::generic::Statement as GenericStatement;
|
||||
use polkadot_primitives::Hash;
|
||||
use polkadot_primitives::parachain::{Id as ParaId, CandidateReceipt};
|
||||
use primitives::block::Block as SubstrateBlock;
|
||||
use primitives::AuthorityId;
|
||||
|
||||
use parking_lot::Mutex;
|
||||
|
||||
extern crate futures;
|
||||
extern crate ed25519;
|
||||
extern crate parking_lot;
|
||||
extern crate tokio_timer;
|
||||
extern crate polkadot_api;
|
||||
extern crate polkadot_collator as collator;
|
||||
extern crate polkadot_statement_table as table;
|
||||
extern crate polkadot_primitives;
|
||||
extern crate substrate_bft as bft;
|
||||
extern crate substrate_codec as codec;
|
||||
extern crate substrate_primitives as primitives;
|
||||
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use codec::Slicable;
|
||||
use table::{Table, Context as TableContextTrait};
|
||||
use table::generic::Statement as GenericStatement;
|
||||
use polkadot_api::{PolkadotApi, BlockBuilder};
|
||||
use polkadot_primitives::{Hash, Timestamp};
|
||||
use polkadot_primitives::block::Block as PolkadotBlock;
|
||||
use polkadot_primitives::parachain::{Id as ParaId, DutyRoster, BlockData, Extrinsic, CandidateReceipt};
|
||||
use primitives::block::{Block as SubstrateBlock, Header as SubstrateHeader, HeaderHash, Id as BlockId};
|
||||
use primitives::AuthorityId;
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::future;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
pub use self::error::{ErrorKind, Error};
|
||||
|
||||
mod error;
|
||||
|
||||
/// A handle to a statement table router.
|
||||
pub trait TableRouter {
|
||||
/// Errors when fetching data from the network.
|
||||
type Error;
|
||||
/// Future that resolves when candidate data is fetched.
|
||||
type FetchCandidate: IntoFuture<Item=BlockData,Error=Self::Error>;
|
||||
/// Future that resolves when extrinsic candidate data is fetched.
|
||||
type FetchExtrinsic: IntoFuture<Item=Extrinsic,Error=Self::Error>;
|
||||
|
||||
/// Note local candidate data.
|
||||
fn local_candidate_data(&self, block_data: BlockData, extrinsic: Extrinsic);
|
||||
|
||||
/// Fetch block data for a specific candidate.
|
||||
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate;
|
||||
|
||||
/// Fetch extrinsic data for a specific candidate.
|
||||
fn fetch_extrinsic_data(&self, candidate: &CandidateReceipt) -> Self::FetchExtrinsic;
|
||||
}
|
||||
|
||||
/// A long-lived network which can create statement table routing instances.
|
||||
pub trait Network {
|
||||
/// The table router type. This should handle importing of any statements,
|
||||
/// routing statements to peers, and driving completion of any `StatementProducers`.
|
||||
type TableRouter: TableRouter;
|
||||
|
||||
/// Instantiate a table router using the given shared table.
|
||||
fn table_router(&self, table: Arc<SharedTable>) -> Self::TableRouter;
|
||||
}
|
||||
|
||||
/// Information about a specific group.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct GroupInfo {
|
||||
/// Authorities meant to check validity of candidates.
|
||||
pub validity_guarantors: HashSet<AuthorityId>,
|
||||
@@ -89,6 +131,10 @@ impl table::Context for TableContext {
|
||||
}
|
||||
|
||||
impl TableContext {
|
||||
fn local_id(&self) -> AuthorityId {
|
||||
self.key.public().0
|
||||
}
|
||||
|
||||
fn sign_statement(&self, statement: table::Statement) -> table::SignedStatement {
|
||||
let signature = sign_table_statement(&statement, &self.key, &self.parent_hash);
|
||||
let local_id = self.key.public().0;
|
||||
@@ -124,16 +170,120 @@ pub fn sign_table_statement(statement: &table::Statement, key: &ed25519::Pair, p
|
||||
struct SharedTableInner {
|
||||
table: Table<TableContext>,
|
||||
proposed_digest: Option<Hash>,
|
||||
checked_validity: HashSet<Hash>,
|
||||
checked_availability: HashSet<Hash>,
|
||||
}
|
||||
|
||||
impl SharedTableInner {
|
||||
fn import_statement(
|
||||
// Import a single statement. Provide a handle to a table router.
|
||||
fn import_statement<R: TableRouter>(
|
||||
&mut self,
|
||||
context: &TableContext,
|
||||
statement: ::table::SignedStatement,
|
||||
router: &R,
|
||||
statement: table::SignedStatement,
|
||||
received_from: Option<AuthorityId>,
|
||||
) -> Option<table::Summary> {
|
||||
self.table.import_statement(context, statement, received_from)
|
||||
) -> StatementProducer<<R::FetchCandidate as IntoFuture>::Future, <R::FetchExtrinsic as IntoFuture>::Future> {
|
||||
let mut producer = StatementProducer {
|
||||
fetch_block_data: None,
|
||||
fetch_extrinsic: None,
|
||||
produced_statements: Default::default(),
|
||||
_key: context.key.clone(),
|
||||
};
|
||||
|
||||
let summary = match self.table.import_statement(context, statement, received_from) {
|
||||
Some(summary) => summary,
|
||||
None => return producer,
|
||||
};
|
||||
|
||||
let local_id = context.local_id();
|
||||
let is_validity_member = context.is_member_of(&local_id, &summary.group_id);
|
||||
let is_availability_member =
|
||||
context.is_availability_guarantor_of(&local_id, &summary.group_id);
|
||||
|
||||
let digest = &summary.candidate;
|
||||
|
||||
// TODO: consider a strategy based on the number of candidate votes as well.
|
||||
// only check validity if this wasn't locally proposed.
|
||||
let checking_validity = is_validity_member
|
||||
&& self.proposed_digest.as_ref().map_or(true, |d| d != digest)
|
||||
&& self.checked_validity.insert(digest.clone());
|
||||
|
||||
let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone());
|
||||
|
||||
if checking_validity || checking_availability {
|
||||
match self.table.get_candidate(&digest) {
|
||||
None => {} // TODO: handle table inconsistency somehow?
|
||||
Some(candidate) => {
|
||||
if checking_validity {
|
||||
producer.fetch_block_data = Some(router.fetch_block_data(candidate).into_future().fuse());
|
||||
}
|
||||
|
||||
if checking_availability {
|
||||
producer.fetch_extrinsic = Some(router.fetch_extrinsic_data(candidate).into_future().fuse());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
producer
|
||||
}
|
||||
}
|
||||
|
||||
/// Produced statements about a specific candidate.
|
||||
/// Both may be `None`.
|
||||
#[derive(Default)]
|
||||
pub struct ProducedStatements {
|
||||
/// A statement about the validity of the candidate.
|
||||
pub validity: Option<table::Statement>,
|
||||
/// A statement about the availability of the candidate.
|
||||
pub availability: Option<table::Statement>,
|
||||
}
|
||||
|
||||
/// Future that produces statements about a specific candidate.
|
||||
pub struct StatementProducer<D: Future, E: Future> {
|
||||
fetch_block_data: Option<future::Fuse<D>>,
|
||||
fetch_extrinsic: Option<future::Fuse<E>>,
|
||||
produced_statements: ProducedStatements,
|
||||
_key: Arc<ed25519::Pair>,
|
||||
}
|
||||
|
||||
impl<D, E, Err> Future for StatementProducer<D, E>
|
||||
where
|
||||
D: Future<Item=BlockData,Error=Err>,
|
||||
E: Future<Item=Extrinsic,Error=Err>,
|
||||
{
|
||||
type Item = ProducedStatements;
|
||||
type Error = Err;
|
||||
|
||||
fn poll(&mut self) -> Poll<ProducedStatements, Err> {
|
||||
let mut done = true;
|
||||
if let Some(ref mut fetch_block_data) = self.fetch_block_data {
|
||||
match fetch_block_data.poll()? {
|
||||
Async::Ready(_block_data) => {
|
||||
// TODO [PoC-2] : validate block data here and make statement.
|
||||
},
|
||||
Async::NotReady => {
|
||||
done = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref mut fetch_extrinsic) = self.fetch_extrinsic {
|
||||
match fetch_extrinsic.poll()? {
|
||||
Async::Ready(_extrinsic) => {
|
||||
// TODO [PoC-2]: guarantee availability of data and make statment.
|
||||
}
|
||||
Async::NotReady => {
|
||||
done = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if done {
|
||||
Ok(Async::Ready(::std::mem::replace(&mut self.produced_statements, Default::default())))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,7 +297,7 @@ impl Clone for SharedTable {
|
||||
fn clone(&self) -> Self {
|
||||
SharedTable {
|
||||
context: self.context.clone(),
|
||||
inner: self.inner.clone()
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -163,24 +313,34 @@ impl SharedTable {
|
||||
inner: Arc::new(Mutex::new(SharedTableInner {
|
||||
table: Table::default(),
|
||||
proposed_digest: None,
|
||||
checked_validity: HashSet::new(),
|
||||
checked_availability: HashSet::new(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Import a single statement.
|
||||
pub fn import_statement(
|
||||
/// Get group info.
|
||||
pub fn group_info(&self) -> &HashMap<ParaId, GroupInfo> {
|
||||
&self.context.groups
|
||||
}
|
||||
|
||||
/// Import a single statement. Provide a handle to a table router
|
||||
/// for dispatching any other requests which come up.
|
||||
pub fn import_statement<R: TableRouter>(
|
||||
&self,
|
||||
router: &R,
|
||||
statement: table::SignedStatement,
|
||||
received_from: Option<AuthorityId>,
|
||||
) -> Option<table::Summary> {
|
||||
self.inner.lock().import_statement(&*self.context, statement, received_from)
|
||||
) -> StatementProducer<<R::FetchCandidate as IntoFuture>::Future, <R::FetchExtrinsic as IntoFuture>::Future> {
|
||||
self.inner.lock().import_statement(&*self.context, router, statement, received_from)
|
||||
}
|
||||
|
||||
/// Sign and import a local statement.
|
||||
pub fn sign_and_import(
|
||||
pub fn sign_and_import<R: TableRouter>(
|
||||
&self,
|
||||
router: &R,
|
||||
statement: table::Statement,
|
||||
) -> Option<table::Summary> {
|
||||
) -> StatementProducer<<R::FetchCandidate as IntoFuture>::Future, <R::FetchExtrinsic as IntoFuture>::Future> {
|
||||
let proposed_digest = match statement {
|
||||
GenericStatement::Candidate(ref c) => Some(c.hash()),
|
||||
_ => None,
|
||||
@@ -193,21 +353,25 @@ impl SharedTable {
|
||||
inner.proposed_digest = proposed_digest;
|
||||
}
|
||||
|
||||
inner.import_statement(&*self.context, signed_statement, None)
|
||||
inner.import_statement(&*self.context, router, signed_statement, None)
|
||||
}
|
||||
|
||||
/// Import many statements at once.
|
||||
///
|
||||
/// Provide an iterator yielding pairs of (statement, received_from).
|
||||
pub fn import_statements<I, U>(&self, iterable: I) -> U
|
||||
pub fn import_statements<R, I, U>(&self, router: &R, iterable: I) -> U
|
||||
where
|
||||
R: TableRouter,
|
||||
I: IntoIterator<Item=(table::SignedStatement, Option<AuthorityId>)>,
|
||||
U: ::std::iter::FromIterator<table::Summary>,
|
||||
U: ::std::iter::FromIterator<StatementProducer<
|
||||
<R::FetchCandidate as IntoFuture>::Future,
|
||||
<R::FetchExtrinsic as IntoFuture>::Future>
|
||||
>,
|
||||
{
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
iterable.into_iter().filter_map(move |(statement, received_from)| {
|
||||
inner.import_statement(&*self.context, statement, received_from)
|
||||
iterable.into_iter().map(move |(statement, received_from)| {
|
||||
inner.import_statement(&*self.context, router, statement, received_from)
|
||||
}).collect()
|
||||
}
|
||||
|
||||
@@ -241,3 +405,152 @@ impl SharedTable {
|
||||
self.inner.lock().proposed_digest.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId]) -> Result<HashMap<ParaId, GroupInfo>, Error> {
|
||||
if roster.validator_duty.len() != authorities.len() {
|
||||
bail!(ErrorKind::InvalidDutyRosterLength(authorities.len(), roster.validator_duty.len()))
|
||||
}
|
||||
|
||||
if roster.guarantor_duty.len() != authorities.len() {
|
||||
bail!(ErrorKind::InvalidDutyRosterLength(authorities.len(), roster.guarantor_duty.len()))
|
||||
}
|
||||
|
||||
let mut map = HashMap::new();
|
||||
|
||||
let duty_iter = authorities.iter().zip(&roster.validator_duty).zip(&roster.guarantor_duty);
|
||||
for ((authority, v_duty), a_duty) in duty_iter {
|
||||
use polkadot_primitives::parachain::Chain;
|
||||
|
||||
match *v_duty {
|
||||
Chain::Relay => {}, // does nothing for now.
|
||||
Chain::Parachain(ref id) => {
|
||||
map.entry(id.clone()).or_insert_with(GroupInfo::default)
|
||||
.validity_guarantors
|
||||
.insert(authority.clone());
|
||||
}
|
||||
}
|
||||
|
||||
match *a_duty {
|
||||
Chain::Relay => {}, // does nothing for now.
|
||||
Chain::Parachain(ref id) => {
|
||||
map.entry(id.clone()).or_insert_with(GroupInfo::default)
|
||||
.availability_guarantors
|
||||
.insert(authority.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for live_group in map.values_mut() {
|
||||
let validity_len = live_group.validity_guarantors.len();
|
||||
let availability_len = live_group.availability_guarantors.len();
|
||||
|
||||
live_group.needed_validity = validity_len / 2 + validity_len % 2;
|
||||
live_group.needed_availability = availability_len / 2 + availability_len % 2;
|
||||
}
|
||||
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
/// Polkadot proposer factory.
|
||||
pub struct ProposerFactory<C, N> {
|
||||
/// The client instance.
|
||||
pub client: Arc<C>,
|
||||
/// The backing network handle.
|
||||
pub network: N,
|
||||
}
|
||||
|
||||
impl<C: PolkadotApi, N: Network> bft::ProposerFactory for ProposerFactory<C, N> {
|
||||
type Proposer = Proposer<C, N::TableRouter>;
|
||||
type Error = Error;
|
||||
|
||||
fn init(&self, parent_header: &SubstrateHeader, authorities: &[AuthorityId], sign_with: Arc<ed25519::Pair>) -> Result<Self::Proposer, Error> {
|
||||
let parent_hash = parent_header.hash();
|
||||
let duty_roster = self.client.duty_roster(&BlockId::Hash(parent_hash))?;
|
||||
|
||||
let group_info = make_group_info(duty_roster, authorities)?;
|
||||
let table = Arc::new(SharedTable::new(group_info, sign_with, parent_hash));
|
||||
let router = self.network.table_router(table.clone());
|
||||
|
||||
// TODO [PoC-2]: kick off collation process.
|
||||
Ok(Proposer {
|
||||
parent_hash,
|
||||
_table: table,
|
||||
_router: router,
|
||||
client: self.client.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn current_timestamp() -> Timestamp {
|
||||
use std::time;
|
||||
|
||||
time::SystemTime::now().duration_since(time::UNIX_EPOCH)
|
||||
.expect("now always later than unix epoch; qed")
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
/// The Polkadot proposer logic.
|
||||
pub struct Proposer<C, R> {
|
||||
parent_hash: HeaderHash,
|
||||
client: Arc<C>,
|
||||
_table: Arc<SharedTable>,
|
||||
_router: R,
|
||||
}
|
||||
|
||||
impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
|
||||
type Error = Error;
|
||||
type Create = Result<SubstrateBlock, Error>;
|
||||
type Evaluate = Result<bool, Error>;
|
||||
|
||||
fn propose(&self) -> Result<SubstrateBlock, Error> {
|
||||
// TODO: handle case when current timestamp behind that in state.
|
||||
let polkadot_block = self.client.build_block(
|
||||
&BlockId::Hash(self.parent_hash),
|
||||
current_timestamp()
|
||||
)?.bake();
|
||||
|
||||
// TODO: integrate transaction queue and `push_transaction`s.
|
||||
|
||||
let substrate_block = Slicable::decode(&mut polkadot_block.encode().as_slice())
|
||||
.expect("polkadot blocks defined to serialize to substrate blocks correctly; qed");
|
||||
|
||||
Ok(substrate_block)
|
||||
}
|
||||
|
||||
// TODO: certain kinds of errors here should lead to a misbehavior report.
|
||||
fn evaluate(&self, proposal: &SubstrateBlock) -> Result<bool, Error> {
|
||||
evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash)
|
||||
}
|
||||
}
|
||||
|
||||
fn evaluate_proposal<C: PolkadotApi>(
|
||||
proposal: &SubstrateBlock,
|
||||
client: &C,
|
||||
now: Timestamp,
|
||||
parent_hash: &HeaderHash,
|
||||
) -> Result<bool, Error> {
|
||||
const MAX_TIMESTAMP_DRIFT: Timestamp = 4;
|
||||
|
||||
let encoded = Slicable::encode(proposal);
|
||||
let proposal = PolkadotBlock::decode(&mut &encoded[..])
|
||||
.ok_or_else(|| ErrorKind::ProposalNotForPolkadot)?;
|
||||
|
||||
if proposal.header.parent_hash != *parent_hash {
|
||||
bail!(ErrorKind::WrongParentHash(*parent_hash, proposal.header.parent_hash));
|
||||
}
|
||||
|
||||
// no need to check number because
|
||||
// a) we assume the parent is valid.
|
||||
// b) the runtime checks that `proposal.parent_hash` == `block_hash(proposal.number - 1)`
|
||||
|
||||
let block_timestamp = proposal.body.timestamp;
|
||||
|
||||
// TODO: just defer using `tokio_timer` to delay prepare vote.
|
||||
if block_timestamp > now + MAX_TIMESTAMP_DRIFT {
|
||||
bail!(ErrorKind::TimestampInFuture)
|
||||
}
|
||||
|
||||
// execute the block.
|
||||
client.evaluate_block(&BlockId::Hash(*parent_hash), proposal)?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user